找回密码
 立即注册
首页 业界区 业界 SeaTunnel(2.3.12)的高级用法(四):多个source、多个s ...

SeaTunnel(2.3.12)的高级用法(四):多个source、多个sink

屋稷删 2026-1-15 15:55:00
前置知识:seatunnel配置中有数据流(Data Flow)转的概念

见:https://www.cnblogs.com/kakarotto-chen/p/19487384
demo1:两个source汇聚到一个sink


  • 关键配置:sink的:plugin_input = ["source_data1", "source_data2"]
  • 对应模型
  1. ┌──────────┐│ Source A │──┐└──────────┘  │              ├──▶  Sink┌──────────┐  ││ Source B │──┘└──────────┘
复制代码

  • 执行语句
  1. # ds-st-demo10-2-mysql2pgsql.confsh /data/tools/seatunnel/seatunnel-2.3.12/bin/seatunnel.sh --config /data/tools/seatunnel/myconf/ds-st-demo10-2-mysql2pgsql.conf -i -DJvmOption="-Xms2G -Xmx2G" -m local
复制代码

  • 建表
  1. -- ds-st-demo10-2-mysql2pgsql.confCREATE TABLE "public"."t_8_100w_imp_st_ds_demo10" (  id BIGINT PRIMARY KEY,  user_name VARCHAR(2000),  sex VARCHAR(20),  decimal_f NUMERIC(32, 6),  phone_number VARCHAR(20),  age INT,  create_time TIMESTAMP,  description TEXT,  address VARCHAR(2000) DEFAULT '未知',  my_status INT);COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."id" IS '主键';COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."user_name" IS '名字';COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."sex" IS '性别:男;女';COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."decimal_f" IS '大数字';COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."phone_number" IS '电话';COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."age" IS '字符串年龄转数字';COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."create_time" IS '新增时间';COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."description" IS '大文本';COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."address" IS '空地址转默认值:未知';COMMENT ON COLUMN "public"."t_8_100w_imp_st_ds_demo10"."my_status" IS '状态';
复制代码

  • conf配置
  1. env {  # 任务名字:业务中可以弄表id  job.name = "ds-st-demo10.conf"  # 最大批线程数:并行度(线程数)  parallelism = 5  # 任务模式:BATCH:批处理模式;STREAMING:流处理模式  job.mode = "BATCH"}source {  # 第一个数据集  jdbc {    # 给这个数据集起个名字    plugin_output = "source_data1"      url = "jdbc:mysql://ip:port/cs1"    driver = "com.mysql.cj.jdbc.Driver"    user = "root"    password = "***"    # sql    query = "select id,name as user_name,sex,decimal_f,phone_number,CAST(age AS SIGNED) as age,create_time,description,address from t_8_100w where id < 10"        # 并行读取配置    # 分片的字段:支持:String、Number(int, bigint, decimal, ...)、Date    partition_column = "id"    # 表的分割大小(行数):每个分片的数据行(默认8096行)。最后分片数=表的总行数 / split.size    split.size = 50000    # 分片数,匹配并行度parallelism(2.3.12已不推荐配置了,用split.size来代替)    # partition_num = 5    # 最大批处理数:查询的行提取大小(指定当前任务每次执行时读取数据条数,该值(默认1000)受运行内存影响,若该值较大或单条数据量较大,需适当调整运行内存大小。)    fetch_size = 10000        # 连接参数    # 连接超时时间300ms    connection_check_timeout_sec = 300    # 其他jdbc的参数    properties = {      useUnicode = true      characterEncoding = "utf8"      # 时区,不同数据库参数不一样      serverTimezone = "Asia/Shanghai"      # 使用游标提高大结果集性能      useCursorFetch = "true"      # 每次获取行数      defaultFetchSize = "10000"    }  }    # 第二个数据集  jdbc {    # 给这个数据集起个名字    plugin_output = "source_data2"      url = "jdbc:mysql://ip:port/cs1"    driver = "com.mysql.cj.jdbc.Driver"    user = "root"    password = "***"    #     query = "select id,name as user_name,sex,decimal_f,phone_number,CAST(age AS SIGNED) as age,create_time,description,address from t_8_100w where id > 10 and id < 20"        # 并行读取配置    # 分片的字段:支持:String、Number(int, bigint, decimal, ...)、Date    partition_column = "id"    # 表的分割大小(行数):每个分片的数据行(默认8096行)。最后分片数=表的总行数 / split.size    split.size = 50000    # 分片数,匹配并行度parallelism(2.3.12已不推荐配置了,用split.size来代替)    # partition_num = 5    # 最大批处理数:查询的行提取大小(指定当前任务每次执行时读取数据条数,该值(默认1000)受运行内存影响,若该值较大或单条数据量较大,需适当调整运行内存大小。)    fetch_size = 10000        # 连接参数    # 连接超时时间300ms    connection_check_timeout_sec = 300    # 其他jdbc的参数    properties = {      useUnicode = true      characterEncoding = "utf8"      # 时区,不同数据库参数不一样      serverTimezone = "Asia/Shanghai"      # 使用游标提高大结果集性能      useCursorFetch = "true"      # 每次获取行数      defaultFetchSize = "10000"    }  }}# 清洗转换(简单的清洗转换,直接在source的query的sql中处理了就行)transform {  # 1. 字段映射:sql中做了,实际生成中不在这里处理。直接在source的query的sql中处理了就行  # 还可以用:FieldMapper 插件,来映射字段    # 转换age为数字类型(pgsql必须转)    # 2. 手机号脱敏:13812341234 -> 138****1234    # 3. 年龄转换:字符串转整数(实际生产中,不用转换,也没有内置的转换插件,可以直接保存成功)  # 4. 性别转换:1->男,2->女    # 5. 数据过滤:只保留 age > 25 的记录。    # 6. 地址默认值:空地址设为'未知'}sink {  jdbc {    # 接收的最终数据集(汇聚到一个结果中)    plugin_input = ["source_data1", "source_data2"]        url = "jdbc:postgresql://ip:5432/source_db"    driver = "org.postgresql.Driver"    user = "postgres"    password = "123456"    #     # query = ""        # 自动生成sql的配置,和query参数互斥    # 生成自动插入sql。如果目标库没有表,也会自动建表    generate_sink_sql = true    # database必须要,因为generate_sink_sql=true。    database = source_db    # 自动生成sql时,table必须要。    table = "public.t_8_100w_imp_st_ds_demo10"    # 生成类似:INSERT INTO …… ON CONFLICT ("主键") DO UPDATE SET …… 的sql    # enable_upsert = true    # 判断值唯一的健:此选项用于支持在自动生成 SQL 时进行 insert,delete 和 update 操作。    # primary_keys = ["id"]    # 表结构处理策略:表不存在时报错(任务失败),一般用:CREATE_SCHEMA_WHEN_NOT_EXIST(表不存在时创建表;表存在时跳过操作(保留数据))    schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST"    # 插入数据的处理策略    # APPEND_DATA:保留表结构和数据,追加新数据(不删除现有数据)(一般用这个)    # DROP_DATA:保留表结构,删除表中所有数据(清空表)——实现清空重灌    # CUSTOM_PROCESSING :用户定义处理。需要配合:custom_sql使用    data_save_mode = "DROP_DATA"    # 当 data_save_mode 选择 CUSTOM_PROCESSING 时,您应该填写 CUSTOM_SQL 参数。此参数通常填入可执行的 SQL。SQL 将在同步任务之前执行。    #可以实现:同步删除(执行前置update、truncate的sql等)    #这个sql未执行,不知道为啥。    #这个sql已经执行。原因:因为generate_sink_sql=true的原因。才会执行custom_sql。(只有自动生成sql的时候,这个才会执行)    custom_sql = """update "source_db"."public"."t_8_100w_imp_st_ds_demo10" set "my_status" = 23"""        # 批量写入条数    batch_size = 10000    # 批次提交间隔    batch_interval_ms = 500    # 重试次数    max_retries = 3        # 连接参数    # 连接超时时间300ms    connection_check_timeout_sec = 300    # 其他jdbc的参数    properties = {      # PostgreSQL专用参数      # PostgreSQL的批量优化(注意大小写)      reWriteBatchedInserts = "true"        # 如果需要时区设置      options = "-c timezone=Asia/Shanghai"    }  }}
复制代码

  • 结果(汇聚了19条数据)
  1. 2026-01-15 14:28:15,952 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - ***********************************************           Job Statistic Information***********************************************Start Time                : 2026-01-15 14:28:11End Time                  : 2026-01-15 14:28:15Total Time(s)             :                   4Total Read Count          :                  19Total Write Count         :                  19Total Failed Count        :                   0***********************************************
复制代码
demo2:一个source分发到两个sink

……………………未完待续

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册