数据迁移工具 DataX 入门
在现代数据处理和分析中,数据迁移是一个不可或缺的环节。随着数据量的不断增加,如何高效、可靠地迁移数据成为了许多企业面临的挑战。DataX作为一款开源的数据迁移工具,因其灵活性和高效性而受到广泛关注。本文将介绍DataX的基本概念、安装步骤、使用方法以及一些最佳实践。
什么是DataX?
DataX是阿里巴巴开源的一款数据同步工具,旨在实现不同数据源之间的高效数据迁移。它支持多种数据源,包括关系型数据库(如MySQL、Oracle)、非关系型数据库(如MongoDB)、大数据存储(如HDFS、Hive)等。DataX的设计理念是“简单、灵活、高效”,使得用户能够轻松地进行数据迁移和同步。
为什么要使用DataX
为了解决异构数据源的同步问题,DataX将复杂的网状同步链路优化成了星型数据链路,由DataX作为中间传输载体来负责连接各种数据源,以此来降低整个异构数据源同步链路的复杂度。当需要新接入一个数据源的时候,只需要考虑将该新的数据源对接到DataX即可,就能跟已有的所有数据源无缝同步。
DataX由FrameWork+Plugin的形式构建,数据源的读取和写入分别Reader和Writer实现:
- Reader,数据采集模块,负责采集数据源中的数据,并将数据发送给FrameWork;
- Writer,数据写入模块,负责从Framework中取数据,并将数据写入到数据源中;
- Framework,用于连接Reader和Writer,作为以上两者的数据传输通道,处理缓冲、流量控制、并发、数据转换等核心问题。
DataX的工作模式是单机多线程形式,不支持分布式的方式,这是它和其它数据同步工具的重要区别之一。
和其它大数据ETL工具相比:
如果需要支持的数据源比较多,建议使用DataX,如果数据来源比较单一,且只是要导入到HDFS,流程很简单,可以考虑使用Sqoop。
DataX的主要特点
- 多种数据源支持 :DataX支持多种数据源的读写,用户可以根据需求选择合适的源和目标。
- 高性能 :通过并行处理和分片技术,DataX能够在大规模数据迁移时保持高性能。
- 易于扩展 :用户可以根据自己的需求自定义插件,扩展DataX的功能。
- 灵活的配置 :DataX使用JSON格式的配置文件,用户可以方便地进行配置和调整。
安装DataX
1. 环境准备
- 准备好Linux服务器;
- 安装Java运行环境(JDK 1.8及以上版本);
- 安装Python运行环境;
- 下载Datax工具包,解压缩到合适的目录;
进入到datax的bin目录下,运行自带的示例:
运行后控制台显示运行成功即表示DataX安装完成。
2. 下载DataX
可以从DataX的GitHub页面下载最新版本的DataX。下载完成后,解压缩到指定目录。
3. 配置环境变量
将DataX的bin目录添加到系统的环境变量中,以便在命令行中直接使用DataX命令。
export DATAX_HOME=/path/to/datax
export PATH=$DATAX_HOME/bin:$PATH
使用DataX进行数据迁移
1. 创建任务配置文件
DataX的任务配置文件是一个JSON格式的文件,定义了数据源、目标和迁移的具体参数。以下是一个简单的示例:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "username",
"password": "password",
"column": ["id", "name"],
"splitPk": "id",
"connection": [
{
"table": ["table"],
"jdbcUrl": ["jdbc:mysql://localhost:3306/db"]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "/path/to/hdfs",
"fileName": "data.txt",
"column": ["id", "name"],
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
2. 执行数据迁移任务
使用以下命令执行数据迁移任务:
datax.py /path/to/your/config.json
3. 监控和日志
DataX在执行过程中会生成日志文件,用户可以通过查看日志来监控任务的执行情况和排查问题。
mysql-hdfs案例
前提条件:
- 准备一个MySQL数据库;
- 准备一个健康的HDFS集群;
首先,我们创建一张数据表,并创建一些数据:
CREATE TABLE `zx_user` (
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`user_name` varchar(30) DEFAULT NULL COMMENT '用户姓名',
`age` int(11) DEFAULT NULL COMMENT '用户年龄',
`user_email` varchar(50) DEFAULT NULL COMMENT '用户邮箱',
`create_by` varchar(100) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_by` varchar(100) DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
`deleted` int(11) DEFAULT '0' COMMENT '0-未删除;1-已删除',
PRIMARY KEY (`user_id`),
UNIQUE KEY `zx_user_un` (`user_id`),
UNIQUE KEY `zx_user_un2` (`user_email`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO zx_user (user_id,user_name,age,user_email,create_by,create_date,update_by,update_date,deleted) VALUES
(1,'Jone',18,'test1@baomidou.com',NULL,NULL,NULL,NULL,0),
(2,'Jack',20,'test2@baomidou.com',NULL,NULL,NULL,NULL,0),
(5,'Anna',27,'test5@baomidou.com',NULL,NULL,NULL,NULL,0),
(7,'Anna',27,'test7@baomidou.com','System','2024-10-31 12:36:03','System','2024-10-31 12:54:51',0);
然后我们需要获取一个mysql-hdfs案例的json示例:python datax.py -r mysqlreader -w hdfswriter
,并根据实际情况进行修改:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"user_id",
"user_name",
"age",
"user_email"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://x.x.x.x:3306/zhangsan"
],
"table": [
"zx_user"
]
}
],
"password": "******",
"username": "root",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name":"user_id",
"type":"BIGINT"
},
{
"name":"user_name",
"type":"STRING"
},
{
"name":"age",
"type":"INT"
},
{
"name":"user_email",
"type":"STRING"
}
],
"compress": "NONE",
"defaultFS": "hdfs://x.x.x.x:9000",
"fieldDelimiter": ",",
"fileName": "zx_user",
"fileType": "text",
"path": "/zx/datax",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
关于mysqlreader和hdfswriter的详细参数配置项可以参考官方对应插件的文档,上面写的都很详细。
然后我们执行命令开始Job:
python datax.py /root/zx-test/mysql2hdfs.json
...
2024-10-31 13:03:15.319 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
...
2024-10-31 13:03:15.422 [job-0] INFO JobContainer -
任务启动时刻 : 2024-10-31 13:03:04
任务结束时刻 : 2024-10-31 13:03:15
任务总计耗时 : 11s
任务平均流量 : 10B/s
记录写入速度 : 0rec/s
读出记录总数 : 4
读写失败总数 : 0
显示执行成功了,我们打开HDFS文件浏览器查看文件确实已经存在了:
查看其中的内容为:
[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user__d81bd99e_0d6f_45a1_9a80_04ca475fc83d
1,Jone,18,test1@baomidou.com
2,Jack,20,test2@baomidou.com
5,Anna,27,test5@baomidou.com
7,Anna,27,test7@baomidou.com
然后,我们准备一个zx_user.txt,编辑内容如下:
11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com
[root@bigdata01 hadoop]# hdfs dfs -put /root/zx-test/zx_user.txt /zx/datax
[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user.txt
11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com
如此将需要同步到MySQL数据库的数据文件准备好了,然后,我们执行同步命令:
python datax.py /root/zx-test/hdfs2mysql.json
...
2024-10-31 13:22:04.815 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2024-10-31 13:22:04.815 [job-0] INFO JobContainer - DataX Reader.Job [hdfsreader] do post work.
2024-10-31 13:22:04.816 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
...
任务启动时刻 : 2024-10-31 13:21:53
任务结束时刻 : 2024-10-31 13:22:04
任务总计耗时 : 11s
任务平均流量 : 7B/s
记录写入速度 : 0rec/s
读出记录总数 : 3
读写失败总数 : 0
-
技术
佬
阅
这就叫专业.png
佬
6
佬
不是 怎么鱼排都开始分享技术了