数据迁移工具 DataX 入门

在现代数据处理和分析中,数据迁移是一个不可或缺的环节。随着数据量的不断增加,如何高效、可靠地迁移数据成为了许多企业面临的挑战。DataX作为一款开源的数据迁移工具,因其灵活性和高效性而受到广泛关注。本文将介绍DataX的基本概念、安装步骤、使用方法以及一些最佳实践。

什么是DataX?

DataX是阿里巴巴开源的一款数据同步工具,旨在实现不同数据源之间的高效数据迁移。它支持多种数据源,包括关系型数据库(如MySQL、Oracle)、非关系型数据库(如MongoDB)、大数据存储(如HDFS、Hive)等。DataX的设计理念是“简单、灵活、高效”,使得用户能够轻松地进行数据迁移和同步。

为什么要使用DataX

5673257-1ff997525cd64e99.png

为了解决异构数据源的同步问题,DataX将复杂的网状同步链路优化成了星型数据链路,由DataX作为中间传输载体来负责连接各种数据源,以此来降低整个异构数据源同步链路的复杂度。当需要新接入一个数据源的时候,只需要考虑将该新的数据源对接到DataX即可,就能跟已有的所有数据源无缝同步。

5673257-95cbdec757096ec12.png

DataX由FrameWork+Plugin的形式构建,数据源的读取和写入分别Reader和Writer实现:

  • Reader,数据采集模块,负责采集数据源中的数据,并将数据发送给FrameWork;
  • Writer,数据写入模块,负责从Framework中取数据,并将数据写入到数据源中;
  • Framework,用于连接Reader和Writer,作为以上两者的数据传输通道,处理缓冲、流量控制、并发、数据转换等核心问题。

5673257-0901ba41903fd468.png

DataX的工作模式是单机多线程形式,不支持分布式的方式,这是它和其它数据同步工具的重要区别之一。

和其它大数据ETL工具相比:

企业微信截图_17303546878959.png

如果需要支持的数据源比较多,建议使用DataX,如果数据来源比较单一,且只是要导入到HDFS,流程很简单,可以考虑使用Sqoop。

DataX的主要特点

  • 多种数据源支持 :DataX支持多种数据源的读写,用户可以根据需求选择合适的源和目标。
  • 高性能 :通过并行处理和分片技术,DataX能够在大规模数据迁移时保持高性能。
  • 易于扩展 :用户可以根据自己的需求自定义插件,扩展DataX的功能。
  • 灵活的配置 :DataX使用JSON格式的配置文件,用户可以方便地进行配置和调整。

安装DataX

1. 环境准备

  • 准备好Linux服务器;
  • 安装Java运行环境(JDK 1.8及以上版本);
  • 安装Python运行环境;
  • 下载Datax工具包,解压缩到合适的目录;

进入到datax的bin目录下,运行自带的示例:

运行后控制台显示运行成功即表示DataX安装完成。

2. 下载DataX

可以从DataX的

https://github.com/alibaba/DataX

下载最新版本的DataX。下载完成后,解压缩到指定目录。

3. 配置环境变量

将DataX的bin目录添加到系统的环境变量中,以便在命令行中直接使用DataX命令。

export DATAX_HOME=/path/to/datax export PATH=$DATAX_HOME/bin:$PATH

使用DataX进行数据迁移

1. 创建任务配置文件

DataX的任务配置文件是一个JSON格式的文件,定义了数据源、目标和迁移的具体参数。以下是一个简单的示例:

{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "username", "password": "password", "column": ["id", "name"], "splitPk": "id", "connection": [ { "table": ["table"], "jdbcUrl": ["jdbc:mysql://localhost:3306/db"] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "path": "/path/to/hdfs", "fileName": "data.txt", "column": ["id", "name"], "writeMode": "append" } } } ], "setting": { "speed": { "channel": 5 } } } }

2. 执行数据迁移任务

使用以下命令执行数据迁移任务:

datax.py /path/to/your/config.json

3. 监控和日志

DataX在执行过程中会生成日志文件,用户可以通过查看日志来监控任务的执行情况和排查问题。

mysql-hdfs案例

前提条件:

  1. 准备一个MySQL数据库;
  2. 准备一个健康的HDFS集群;

首先,我们创建一张数据表,并创建一些数据:

CREATE TABLE `zx_user` ( `user_id` bigint(20) NOT NULL COMMENT '用户ID', `user_name` varchar(30) DEFAULT NULL COMMENT '用户姓名', `age` int(11) DEFAULT NULL COMMENT '用户年龄', `user_email` varchar(50) DEFAULT NULL COMMENT '用户邮箱', `create_by` varchar(100) DEFAULT NULL, `create_date` datetime DEFAULT NULL, `update_by` varchar(100) DEFAULT NULL, `update_date` datetime DEFAULT NULL, `deleted` int(11) DEFAULT '0' COMMENT '0-未删除;1-已删除', PRIMARY KEY (`user_id`), UNIQUE KEY `zx_user_un` (`user_id`), UNIQUE KEY `zx_user_un2` (`user_email`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; INSERT INTO zx_user (user_id,user_name,age,user_email,create_by,create_date,update_by,update_date,deleted) VALUES (1,'Jone',18,'test1@baomidou.com',NULL,NULL,NULL,NULL,0), (2,'Jack',20,'test2@baomidou.com',NULL,NULL,NULL,NULL,0), (5,'Anna',27,'test5@baomidou.com',NULL,NULL,NULL,NULL,0), (7,'Anna',27,'test7@baomidou.com','System','2024-10-31 12:36:03','System','2024-10-31 12:54:51',0);

然后我们需要获取一个mysql-hdfs案例的json示例:python datax.py -r mysqlreader -w hdfswriter,并根据实际情况进行修改:

{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ "user_id", "user_name", "age", "user_email" ], "connection": [ { "jdbcUrl": [ "jdbc:mysql://x.x.x.x:3306/zhangsan" ], "table": [ "zx_user" ] } ], "password": "******", "username": "root", "where": "" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name":"user_id", "type":"BIGINT" }, { "name":"user_name", "type":"STRING" }, { "name":"age", "type":"INT" }, { "name":"user_email", "type":"STRING" } ], "compress": "NONE", "defaultFS": "hdfs://x.x.x.x:9000", "fieldDelimiter": ",", "fileName": "zx_user", "fileType": "text", "path": "/zx/datax", "writeMode": "append" } } } ], "setting": { "speed": { "channel": "1" } } } }

关于mysqlreader和hdfswriter的详细参数配置项可以参考官方对应插件的文档,上面写的都很详细。

然后我们执行命令开始Job:

python datax.py /root/zx-test/mysql2hdfs.json ... 2024-10-31 13:03:15.319 [job-0] INFO JobContainer - DataX jobId [0] completed successfully. ... 2024-10-31 13:03:15.422 [job-0] INFO JobContainer - 任务启动时刻 : 2024-10-31 13:03:04 任务结束时刻 : 2024-10-31 13:03:15 任务总计耗时 : 11s 任务平均流量 : 10B/s 记录写入速度 : 0rec/s 读出记录总数 : 4 读写失败总数 : 0

显示执行成功了,我们打开HDFS文件浏览器查看文件确实已经存在了:

查看其中的内容为:

[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user__d81bd99e_0d6f_45a1_9a80_04ca475fc83d 1,Jone,18,test1@baomidou.com 2,Jack,20,test2@baomidou.com 5,Anna,27,test5@baomidou.com 7,Anna,27,test7@baomidou.com

然后,我们准备一个zx_user.txt,编辑内容如下:

11,slide,21,slide@baomidou.com 21,mify,20,mify@baomidou.com 51,kitty,27,kitty@baomidou.com
[root@bigdata01 hadoop]# hdfs dfs -put /root/zx-test/zx_user.txt /zx/datax [root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user.txt 11,slide,21,slide@baomidou.com 21,mify,20,mify@baomidou.com 51,kitty,27,kitty@baomidou.com

如此将需要同步到MySQL数据库的数据文件准备好了,然后,我们执行同步命令:

python datax.py /root/zx-test/hdfs2mysql.json ... 2024-10-31 13:22:04.815 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do post work. 2024-10-31 13:22:04.815 [job-0] INFO JobContainer - DataX Reader.Job [hdfsreader] do post work. 2024-10-31 13:22:04.816 [job-0] INFO JobContainer - DataX jobId [0] completed successfully. ... 任务启动时刻 : 2024-10-31 13:21:53 任务结束时刻 : 2024-10-31 13:22:04 任务总计耗时 : 11s 任务平均流量 : 7B/s 记录写入速度 : 0rec/s 读出记录总数 : 3 读写失败总数 : 0