我有一个10亿记录的csv文件,需要导入到mysql的一张数据表中。
需要java8及其以上的环境
bash自动换行:关放大阅读展开代码[atguigu@hadoop102 test]$ wget https://www.apache.org/dyn/closer.lua/seatunnel/2.3.11/apache-seatunnel-2.3.11-bin.tar.gz [atguigu@hadoop102 test]$ tar -zxvf ./apache-seatunnel-2.3.11-bin.tar.gz -C ./ [atguigu@hadoop102 test]$ cd ./apache-seatunnel-2.3.11/ && mkdir job && cd ./job
seatunnel是插件式的对接不同的数据源的,按需引入需要的connector,从本地文件读取,写入到mysql,故需要引入connector-file-local-2.3.11.jar,connector-jdbc-2.3.11.jar; 当然mysql 对应的jdbc驱动是必须,此处采用mysql-connector-java-8.0.13.jar
connector 插件包放置于$SEATUNNEL_HOME/connectors下,MySQL 的 jdbc驱动放置于$SEATUNNEL_HOME/lib下
命名为file-mysql.config
json自动换行:关放大阅读展开代码env { job.mode = "BATCH" parallelism = 5 } source { LocalFile { plugin_output = "f1" path = "/home/atguigu/test/files/measurements_corrected.csv" file_format_type = "csv" encoding = "UTF-8" csv_use_header_line = false schema { fields { city = "string" temperature = "string" } } } } sink { jdbc { plugin_input = "f1" url = "jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "aaaaaa" query = "insert into measurements_backup(city, temperature) values(?,?)" batch_size = 50000 } }
对单个文件配置 parallelism > 1 对于 LocalFile 组件来说,是不会生效的。即使您在 env 中设置了 parallelism = 5,SeaTunnel 在读取这个单个文件时,实际上也只会启动一个读取任务(Reader)
bash自动换行:关放大阅读展开代码[atguigu@hadoop102 apache-seatunnel-2.3.11]$ ./bin/seatunnel.sh --config ./job/file-mysql.config -m local
任务部分日志:可以看到runningJobCount为1
bash自动换行:关放大阅读展开代码2025-08-05 22:02:59,728 INFO [o.a.s.e.s.CoordinatorService ] [pool-7-thread-1] - [localhost]:5801 [seatunnel-170142] [5.1] *********************************************** Job info detail *********************************************** createdJobCount : 0 scheduledJobCount : 0 runningJobCount : 1 failingJobCount : 0 failedJobCount : 0 cancellingJobCount : 0 canceledJobCount : 0 finishedJobCount : 0 ***********************************************
考虑采用文件切分的方式分为5等分,实现localfile的并行读取
bash自动换行:关放大阅读展开代码# 1. 创建一个新目录来存放分割后的小文件 mkdir /home/atguigu/test/files/measurements_parts # 2. 使用 split 命令进行分割 # -l 200000000 : 表示每个输出文件包含 2亿行 (200,000,000) # -d : 表示使用数字作为后缀 (e.g., part_00, part_01) # --additional-suffix=.csv : 给分割后的文件加上 .csv 后缀 # 输入文件: /home/atguigu/test/files/measurements_corrected.csv # 输出文件前缀: /home/atguigu/test/files/measurements_parts/part_ split -l 200000000 -d --additional-suffix=.csv /home/atguigu/test/files/measurements_corrected.csv [atguigu@hadoop102 files]$ split -l 200000000 -d --additional-suffix=.csv /home/atguigu/test/files/measurements_corrected.csv /home/atguigu/test/files/measurements_parts/part_ [atguigu@hadoop102 files]$ cd measurements_parts/ [atguigu@hadoop102 measurements_parts]$ ll total 14292220 -rw-rw-r--. 1 atguigu atguigu 2927097348 Aug 5 22:08 part_00.csv -rw-rw-r--. 1 atguigu atguigu 2927022322 Aug 5 22:08 part_01.csv -rw-rw-r--. 1 atguigu atguigu 2927003017 Aug 5 22:08 part_02.csv -rw-rw-r--. 1 atguigu atguigu 2927071497 Aug 5 22:08 part_03.csv -rw-rw-r--. 1 atguigu atguigu 2927022146 Aug 5 22:08 part_04.csv
改动配置文件:
json自动换行:关放大阅读展开代码env { job.mode = "BATCH" parallelism = 5 } source { LocalFile { plugin_output = "f1" path = "/home/atguigu/test/files/measurements_parts/" file_format_type = "csv" encoding = "UTF-8" csv_use_header_line = false schema { fields { city = "string" temperature = "string" } } } } sink { jdbc { plugin_input = "f1" url = "jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "aaaaaa" query = "insert into measurements_backup(city, temperature) values(?,?)" batch_size = 50000 } }
任务日志:确实启动 了5个子任务读取5个不同的数据文件
任务结束日志:
官方说明:每种部署模式都有不同的使用场景和优缺点。在选择部署模式时,您应该根据您的需求和环境来选择。
Local模式:只用于测试,每个任务都会启动一个独立的进程,任务运行完成后进程会退出。
混合集群模式:SeaTunnel Engine 的Master服务和Worker服务混合在同一个进程中,所有节点都可以运行作业并参与选举成为master,即master节点也在同时运行同步任务。在该模式下,Imap(保存任务的状态信息用于为任务的容错提供支持)数据会分布在所有节点中。
分离集群模式:SeaTunnel Engine 的Master服务和Worker服务分离,每个服务单独一个进程。Master节点只负责作业调度,rest api,任务提交等,Imap数据只存储在Master节点中。Worker节点只负责任务的执行,不参与选举成为master,也不存储Imap数据。
使用建议:建议使用分离集群模式。在混合集群模式下,Master节点要同步运行任务,当任务规模较大时,会影响Master节点的稳定性,一但Master节点宕机或心跳超时,会导致Master节点切换,Master节点切换会导致所有正在运行的任务进行容错,会进一步增长集群的负载。因此,我们更建议使用分离模式。
参考官方文档:Deploy SeaTunnel Engine In Separated Cluster Mode
主要涉及的配置文件
seatunnel.yaml: seatunnel引擎相关配置hazelcast-master.yaml: master节点相关配置hazelcast-worker.yamljvm_master_options: master节点jvm进程配置jvm_worker_options: worker节点jvm进程配置hazelcast-client.yaml: SeaTunnel Engine 客户端相关的配置需要下载对应的connector放置于$SEATUNNEL_HOME/connectors目录下或者$SEATUNNEL_HOME/lib目录下
选择一个节点机器上按照上述文档搭建完毕后,使用文件目录同步命令将seatunnel的家目录在各个节点间同步即可
bash自动换行:关放大阅读展开代码# 比如我是使用rsync封装xsync命令 [atguigu@hadoop102 ~]$ xsync $SEATUNNEL_HOME
启动Master节点
bash自动换行:关放大阅读展开代码mkdir -p $SEATUNNEL_HOME/logs ./bin/seatunnel-cluster.sh -d -r master
启动成功会出现名为 SeaTunnelServer的java进程
启动Worker节点
bash自动换行:关放大阅读展开代码mkdir -p $SEATUNNEL_HOME/logs ./bin/seatunnel-cluster.sh -d -r worker
启动成功会出现名为 SeaTunnelServer的java进程
检查配置master 和 worker节点是否都已正常启动
bash自动换行:关放大阅读展开代码[atguigu@hadoop103 apache-seatunnel-2.3.11]$ sh bin/seatunnel.sh -l
部分日志截图:发现有一个master和2个worker
将一张10亿的hive分区表同步到mysql表
hive 表:city_hash为分区字段
sql自动换行:关放大阅读展开代码CREATE TABLE mydb.measurements_partitions ( city STRING, measurement DECIMAL(10,2) ) PARTITIONED BY (city_hash INT) STORED AS ORC TBLPROPERTIES ('orc.compress'='ZLIB');
mysql表:
sql自动换行:关放大阅读展开代码CREATE TABLE measurements ( id INT AUTO_INCREMENT PRIMARY KEY, city VARCHAR(255), temperature decimal(10,2) ) ENGINE=InnoDB;
hive-mysql.config.template
json自动换行:关放大阅读展开代码env { job.mode = "BATCH" parallelism = 5 } source { Hive{ plugin_output = "mp" table_name = "mydb.measurements_partitions" metastore_uri = "thrift://hadoop102:9083" } } transform { Sql { plugin_in = "mp" plugin_output = "mpo" query = "SELECT city, measurement FROM mp" } } sink { Jdbc { plugin_in = "mpo" url = "jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "aaaaaa" query = "insert into measurements(city, temperature) values(?,?)" batch_size = 50000 } }
bash自动换行:关放大阅读展开代码[atguigu@hadoop102 apache-seatunnel-2.3.11]$ bin/seatunnel.sh --config $SEATUNNEL_HOME/job/hive-mysql.config.template
任务日志:
seatunnel 客户端(提交同步任务的节点)直到同步任务结束或者任务异常,终端窗口日志才会停止
可以看到任务同步了10亿条记录,无失败记录,共耗时大概30分钟
bash自动换行:关放大阅读展开代码2025-08-06 22:42:24,874 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - *********************************************** Job Statistic Information *********************************************** Start Time : 2025-08-06 22:13:04 End Time : 2025-08-06 22:42:24 Total Time(s) : 1759 Total Read Count : 1000000000 Total Write Count : 1000000000 Total Failed Count : 0 ***********************************************
mater节点日志:可以看到将要启动5个子任务读取hive的表,可以看出读取任务并行度为5
master节点日志查看:监听对应的日志文件即可
bash自动换行:关放大阅读展开代码tail -f $SEATUNNEL_HOME/logs/seatunnel-engine-master.log
任务分配情况:
其中一个worker节点(hadoop103)日志:
worker节点日志查看:监听对应的日志文件即可
bash自动换行:关放大阅读展开代码tail -f $SEATUNNEL_HOME/logs/seatunnel-engine-worker.log
可以看出hive的不同分区分配给hadoop103节点的子任务读取,之后会读取orc文件,并按照zlib解压
查看mysql写入端是否并行写入:
sql自动换行:关放大阅读展开代码SHOW FULL PROCESSLIST;
可以看出写入数据任务并行度为5
名为 hadoop103 的节点与 test 库建立了 1 个数据库连接,并正在执行 insert 操作。 名为 hadoop104 的节点与 test 库建立了 4 个独立的数据库连接,每个连接都在各自执行自己的 insert 操作。
根据不同的同步任务需要的connector和依赖不同,需要按需下载对应到集群对应目录下,并且要求seatunnel引擎的版本和connector的版本一致,避免兼容性导致的问题
如果新加入不同的依赖,需要重新启动seatunnel集群来加载依赖并且保证集群各个节点的配置一致(即修改了配置等需要在各个节点间分发同步一下)
可以设置同步任务的并行度,但是只支持有 parallelism(并行度)标识的connector,并且按照connector说明配置正确触发可并行的条件。比如 hive connector这个连接器,在表存储为单文件时,即使设置了多并行度,依旧单个task处理
本文作者:hedeoer
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!