数据迁移工具 DataX 入门

在现代数据处理和分析中,数据迁移是一个不可或缺的环节。随着数据量的不断增加,如何高效、可靠地迁移数据成为了许多企业面临的挑战。DataX作为一款开源的数据迁移工具,因其灵活性和高效性而受到广泛关注。本文将介绍DataX的基本概念、安装步骤、使用方法以及一些最佳实践。

什么是DataX?

DataX是阿里巴巴开源的一款数据同步工具,旨在实现不同数据源之间的高效数据迁移。它支持多种数据源,包括关系型数据库(如MySQL、Oracle)、非关系型数据库(如MongoDB)、大数据存储(如HDFS、Hive)等。DataX的设计理念是“简单、灵活、高效”,使得用户能够轻松地进行数据迁移和同步。

为什么要使用DataX

5673257-1ff997525cd64e99.png

为了解决异构数据源的同步问题,DataX将复杂的网状同步链路优化成了星型数据链路,由DataX作为中间传输载体来负责连接各种数据源,以此来降低整个异构数据源同步链路的复杂度。当需要新接入一个数据源的时候,只需要考虑将该新的数据源对接到DataX即可,就能跟已有的所有数据源无缝同步。

5673257-95cbdec757096ec12.png

DataX由FrameWork+Plugin的形式构建,数据源的读取和写入分别Reader和Writer实现:

  • Reader,数据采集模块,负责采集数据源中的数据,并将数据发送给FrameWork;
  • Writer,数据写入模块,负责从Framework中取数据,并将数据写入到数据源中;
  • Framework,用于连接Reader和Writer,作为以上两者的数据传输通道,处理缓冲、流量控制、并发、数据转换等核心问题。

5673257-0901ba41903fd468.png

DataX的工作模式是单机多线程形式,不支持分布式的方式,这是它和其它数据同步工具的重要区别之一。

和其它大数据ETL工具相比:

企业微信截图_17303546878959.png

如果需要支持的数据源比较多,建议使用DataX,如果数据来源比较单一,且只是要导入到HDFS,流程很简单,可以考虑使用Sqoop。

DataX的主要特点

  • 多种数据源支持 :DataX支持多种数据源的读写,用户可以根据需求选择合适的源和目标。
  • 高性能 :通过并行处理和分片技术,DataX能够在大规模数据迁移时保持高性能。
  • 易于扩展 :用户可以根据自己的需求自定义插件,扩展DataX的功能。
  • 灵活的配置 :DataX使用JSON格式的配置文件,用户可以方便地进行配置和调整。

安装DataX

1. 环境准备

  • 准备好Linux服务器;
  • 安装Java运行环境(JDK 1.8及以上版本);
  • 安装Python运行环境;
  • 下载Datax工具包,解压缩到合适的目录;

进入到datax的bin目录下,运行自带的示例:

运行后控制台显示运行成功即表示DataX安装完成。

2. 下载DataX

可以从DataX的GitHub页面下载最新版本的DataX。下载完成后,解压缩到指定目录。

3. 配置环境变量

将DataX的bin目录添加到系统的环境变量中,以便在命令行中直接使用DataX命令。

export DATAX_HOME=/path/to/datax
export PATH=$DATAX_HOME/bin:$PATH

使用DataX进行数据迁移

1. 创建任务配置文件

DataX的任务配置文件是一个JSON格式的文件,定义了数据源、目标和迁移的具体参数。以下是一个简单的示例:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "username",
            "password": "password",
            "column": ["id", "name"],
            "splitPk": "id",
            "connection": [
              {
                "table": ["table"],
                "jdbcUrl": ["jdbc:mysql://localhost:3306/db"]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "path": "/path/to/hdfs",
            "fileName": "data.txt",
            "column": ["id", "name"],
            "writeMode": "append"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
      }
    }
  }
}

2. 执行数据迁移任务

使用以下命令执行数据迁移任务:

datax.py /path/to/your/config.json

3. 监控和日志

DataX在执行过程中会生成日志文件,用户可以通过查看日志来监控任务的执行情况和排查问题。

mysql-hdfs案例

前提条件:

  1. 准备一个MySQL数据库;
  2. 准备一个健康的HDFS集群;

首先,我们创建一张数据表,并创建一些数据:

CREATE TABLE `zx_user` (
  `user_id` bigint(20) NOT NULL COMMENT '用户ID',
  `user_name` varchar(30) DEFAULT NULL COMMENT '用户姓名',
  `age` int(11) DEFAULT NULL COMMENT '用户年龄',
  `user_email` varchar(50) DEFAULT NULL COMMENT '用户邮箱',
  `create_by` varchar(100) DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `update_by` varchar(100) DEFAULT NULL,
  `update_date` datetime DEFAULT NULL,
  `deleted` int(11) DEFAULT '0' COMMENT '0-未删除;1-已删除',
  PRIMARY KEY (`user_id`),
  UNIQUE KEY `zx_user_un` (`user_id`),
  UNIQUE KEY `zx_user_un2` (`user_email`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO zx_user (user_id,user_name,age,user_email,create_by,create_date,update_by,update_date,deleted) VALUES
     (1,'Jone',18,'test1@baomidou.com',NULL,NULL,NULL,NULL,0),
     (2,'Jack',20,'test2@baomidou.com',NULL,NULL,NULL,NULL,0),
     (5,'Anna',27,'test5@baomidou.com',NULL,NULL,NULL,NULL,0),
     (7,'Anna',27,'test7@baomidou.com','System','2024-10-31 12:36:03','System','2024-10-31 12:54:51',0);

然后我们需要获取一个mysql-hdfs案例的json示例:python datax.py -r mysqlreader -w hdfswriter,并根据实际情况进行修改:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "user_id",
                            "user_name",
                            "age",
                            "user_email"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://x.x.x.x:3306/zhangsan"
                                ],
                                "table": [
                                    "zx_user"
                                ]
                            }
                        ],
                        "password": "******",
                        "username": "root",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name":"user_id",
                                "type":"BIGINT"
                            },
                            {
                                "name":"user_name",
                                "type":"STRING"
                            },
                            {
                                "name":"age",
                                "type":"INT"
                            },
                            {
                                "name":"user_email",
                                "type":"STRING"
                            }
                        ],
                        "compress": "NONE",
                        "defaultFS": "hdfs://x.x.x.x:9000",
                        "fieldDelimiter": ",",
                        "fileName": "zx_user",
                        "fileType": "text",
                        "path": "/zx/datax",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

关于mysqlreader和hdfswriter的详细参数配置项可以参考官方对应插件的文档,上面写的都很详细。

然后我们执行命令开始Job:

python datax.py /root/zx-test/mysql2hdfs.json
...
2024-10-31 13:03:15.319 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
...
2024-10-31 13:03:15.422 [job-0] INFO  JobContainer -
任务启动时刻                    : 2024-10-31 13:03:04
任务结束时刻                    : 2024-10-31 13:03:15
任务总计耗时                    :                 11s
任务平均流量                    :               10B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   4
读写失败总数                    :                   0

显示执行成功了,我们打开HDFS文件浏览器查看文件确实已经存在了:

查看其中的内容为:

[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user__d81bd99e_0d6f_45a1_9a80_04ca475fc83d
1,Jone,18,test1@baomidou.com
2,Jack,20,test2@baomidou.com
5,Anna,27,test5@baomidou.com
7,Anna,27,test7@baomidou.com

然后,我们准备一个zx_user.txt,编辑内容如下:

11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com
[root@bigdata01 hadoop]# hdfs dfs -put /root/zx-test/zx_user.txt /zx/datax
[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user.txt
11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com

如此将需要同步到MySQL数据库的数据文件准备好了,然后,我们执行同步命令:

python datax.py /root/zx-test/hdfs2mysql.json
...
2024-10-31 13:22:04.815 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2024-10-31 13:22:04.815 [job-0] INFO  JobContainer - DataX Reader.Job [hdfsreader] do post work.
2024-10-31 13:22:04.816 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
...
任务启动时刻                    : 2024-10-31 13:21:53
任务结束时刻                    : 2024-10-31 13:22:04
任务总计耗时                    :                 11s
任务平均流量                    :                7B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0