1、DataX簡(jiǎn)介
1.1 DataX概述
DataX 是阿里巴巴開(kāi)源的一個(gè)異構(gòu)數(shù)據(jù)源離線(xiàn)同步工具,致力于實(shí)現(xiàn)包括關(guān)系型數(shù)據(jù)庫(kù)(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構(gòu)數(shù)據(jù)源之間穩(wěn)定高效的數(shù)據(jù)同步功能。
源碼地址:https://github.com/alibaba/DataX
1.2 DataX支持的數(shù)據(jù)源
DataX目前已經(jīng)有了比較全面的插件體系,主流的RDBMS數(shù)據(jù)庫(kù)、NOSQL、大數(shù)據(jù)計(jì)算系統(tǒng)都已經(jīng)接入,目前支持?jǐn)?shù)據(jù)如下圖。
2、DataX架構(gòu)原理
2.1 DataX設(shè)計(jì)理念
為了解決異構(gòu)數(shù)據(jù)源同步問(wèn)題,DataX將復(fù)雜的網(wǎng)狀的同步鏈路變成了星型數(shù)據(jù)鏈路,DataX作為中間傳輸載體負(fù)責(zé)連接各種數(shù)據(jù)源。當(dāng)需要接入一個(gè)新的數(shù)據(jù)源的時(shí)候,只需要將此數(shù)據(jù)源對(duì)接到DataX,便能跟已有的數(shù)據(jù)源做到無(wú)縫數(shù)據(jù)同步。
2.2 DataX框架設(shè)計(jì)
DataX本身作為離線(xiàn)數(shù)據(jù)同步框架,采用Framework + plugin架構(gòu)構(gòu)建。將數(shù)據(jù)源讀取和寫(xiě)入抽象成為Reader/Writer插件,納入到整個(gè)同步框架中。
Reader:數(shù)據(jù)采集模塊,負(fù)責(zé)采集數(shù)據(jù)源的數(shù)據(jù),將數(shù)據(jù)發(fā)送給Framework。
Writer:數(shù)據(jù)寫(xiě)入模塊,負(fù)責(zé)不斷向Framework取數(shù)據(jù),并將數(shù)據(jù)寫(xiě)入到目的端。
Framework:用于連接Reader和Writer,作為兩者的數(shù)據(jù)傳輸通道,并處理緩存,流控,并發(fā),數(shù)據(jù)轉(zhuǎn)換等核心技術(shù)問(wèn)題。
2.3 DataX運(yùn)行流程
下面用一個(gè)DataX作業(yè)生命周期的時(shí)序圖說(shuō)明DataX的運(yùn)行流程、核心概念以及每個(gè)概念之間的關(guān)系。
2.4 DataX調(diào)度決策思路
舉例來(lái)說(shuō),用戶(hù)提交了一個(gè)DataX作業(yè),并且配置了總的并發(fā)度為20,目的是對(duì)一個(gè)有100張分表的mysql數(shù)據(jù)源進(jìn)行同步。DataX的調(diào)度決策思路是:
1)DataX Job根據(jù)分庫(kù)分表切分策略,將同步工作分成100個(gè)Task。
2)根據(jù)配置的總的并發(fā)度20,以及每個(gè)Task Group的并發(fā)度5,DataX計(jì)算共需要分配4個(gè)TaskGroup。
3)4個(gè)TaskGroup平分100個(gè)Task,每一個(gè)TaskGroup負(fù)責(zé)運(yùn)行25個(gè)Task。
2.5 DataX和Sqoop對(duì)比
3、DataX部署
1、下載DataX安裝包并上傳到hadoop102的/opt/software
下載地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
2、解壓datax.tar.gz到/opt/module
tar -zxvf datax.tar.gz -C /opt/module/
3、自檢,執(zhí)行如下命令
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
4、出現(xiàn)如下內(nèi)容,則表明安裝成功
4、DataX使用
4.1 DataX使用概述
4.1.1 DataX任務(wù)提交命令
Datax的使用十分簡(jiǎn)單,用戶(hù)只需要根據(jù)自己同步數(shù)據(jù)的數(shù)據(jù)源和目的地選擇相應(yīng)的Reader和Writer,并將Reader和Writer的信息配置在一個(gè)json文件中,然后執(zhí)行如下命令提交數(shù)據(jù)同步任務(wù)即可。
python bin/datax.py path/to/your/job.json
4.1.2 DataX配置文件格式
可以使用如下命名查看DataX配置文件模板。
python bin/datax.py -r mysqlreader -w hdfswriter
配置文件模板如下,json最外層是一個(gè)job,job包含setting和content兩部分,其中setting用于對(duì)整個(gè)job進(jìn)行配置,content用戶(hù)配置數(shù)據(jù)源和目的地。
4.2 同步MySQL數(shù)據(jù)到HDFS案例
案例要求:同步gmall數(shù)據(jù)庫(kù)中base_province表數(shù)據(jù)到HDFS的/base_province目錄
需求分析:要實(shí)現(xiàn)該功能,需選用MySQLReader和HDFSWriter,MySQLReader具有兩種模式分別是TableMode和QuerySQLMode,前者使用table,column,where等屬性聲明需要同步的數(shù)據(jù);后者使用一條SQL查詢(xún)語(yǔ)句聲明需要同步的數(shù)據(jù)。
下面分別使用兩種模式進(jìn)行演示。
4.2.1 MySQLReader之TableMode
1、編寫(xiě)配置文件
(1)創(chuàng)建配置文件base_province.json
vim /opt/module/datax/job/base_province.json
(2)配置文件內(nèi)容如下
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"where": "id>=3",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"table": [
"base_province"
]
}
],
"password": "000000",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2、配置文件說(shuō)明
(1)Reader參數(shù)說(shuō)明
(2)Writer參數(shù)說(shuō)明
注意事項(xiàng):
HFDS Writer并未提供nullFormat參數(shù):也就是用戶(hù)并不能自定義null值寫(xiě)到HFDS文件中的存儲(chǔ)格式。默認(rèn)情況下,HFDS Writer會(huì)將null值存儲(chǔ)為空字符串(‘’),而Hive默認(rèn)的null值存儲(chǔ)格式為\N。所以后期將DataX同步的文件導(dǎo)入Hive表就會(huì)出現(xiàn)問(wèn)題。
(3)Setting參數(shù)說(shuō)明
3、提交任務(wù)
(1)在HDFS創(chuàng)建/base_province目錄
使用DataX向HDFS同步數(shù)據(jù)時(shí),需確保目標(biāo)路徑已存在
hadoop fs -mkdir /base_province
(2)進(jìn)入DataX根目錄
(3)執(zhí)行如下命令
python bin/datax.py job/base_province.json
4、查看結(jié)果
(1)DataX打印日志
(2)查看HDFS文件
hadoop fs -cat /base_province/* | zcat
4.2.2 MySQLReader之QuerySQLMode
1、編寫(xiě)配置文件
(1)修改配置文件base_province.json
(2)配置文件內(nèi)容如下
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"querySql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password": "000000",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2、配置文件說(shuō)明
(1)Reader參數(shù)說(shuō)明
3、提交任務(wù)
(1)清空歷史數(shù)據(jù)
hadoop fs -rm -r -f /base_province/*
(2)進(jìn)入DataX根目錄
(3)執(zhí)行如下命令
python bin/datax.py job/base_province.json
4、查看結(jié)果
(1)DataX打印日志
(2)查看HDFS文件
hadoop fs -cat /base_province/* | zcat
4.2.3 DataX傳參
通常情況下,離線(xiàn)數(shù)據(jù)同步任務(wù)需要每日定時(shí)重復(fù)執(zhí)行,故HDFS上的目標(biāo)路徑通常會(huì)包含一層日期,以對(duì)每日同步的數(shù)據(jù)加以區(qū)分,也就是說(shuō)每日同步數(shù)據(jù)的目標(biāo)路徑不是固定不變的,因此DataX配置文件中HDFS Writer的path參數(shù)的值應(yīng)該是動(dòng)態(tài)的。為實(shí)現(xiàn)這一效果,就需要使用DataX傳參的功能。
DataX傳參的用法如下,在JSON配置文件中使用${param}引用參數(shù),在提交任務(wù)時(shí)使用-p"-Dparam=value"傳入?yún)?shù)值,具體示例如下。
1、編寫(xiě)配置文件
(1)修改配置文件base_province.json
(2)配置文件內(nèi)容如下
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"querySql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password": "000000",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province/${dt}",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2、提交任務(wù)
(1)創(chuàng)建目標(biāo)路徑
hadoop fs -mkdir /base_province/2020-06-14
(2)進(jìn)入DataX根目錄
(3)執(zhí)行如下命令
python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json
3、查看結(jié)果
hadoop fs -ls /base_province
4.3 同步HDFS數(shù)據(jù)到MySQL案例
案例要求:同步HDFS上的/base_province目錄下的數(shù)據(jù)到MySQL gmall 數(shù)據(jù)庫(kù)下的test_province表。
需求分析:要實(shí)現(xiàn)該功能,需選用HDFSReader和MySQLWriter。
1、編寫(xiě)配置文件
(1)創(chuàng)建配置文件test_province.json
(2)配置文件內(nèi)容如下
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"defaultFS": "hdfs://hadoop102:8020",
"path": "/base_province",
"column": [
"*"
],
"fileType": "text",
"compress": "gzip",
"encoding": "UTF-8",
"nullFormat": "\\N",
"fieldDelimiter": "\t",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "000000",
"connection": [
{
"table": [
"test_province"
],
"jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"
}
],
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"writeMode": "replace"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2、配置文件說(shuō)明
(1)Reader參數(shù)說(shuō)明
(2)Writer參數(shù)說(shuō)明
3、提交任務(wù)
(1)在MySQL中創(chuàng)建gmall.test_province表
DROP TABLE IF EXISTS `test_province`;
CREATE TABLE `test_province` (
`id` bigint(20) NOT NULL,
`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
(2)進(jìn)入DataX根目錄
(3)執(zhí)行如下命令
python bin/datax.py job/test_province.json
4、查看結(jié)果
(1)DataX打印日志
(2)查看MySQL目標(biāo)表數(shù)據(jù)
5、DataX優(yōu)化
5.1 速度控制
DataX3.0提供了包括通道(并發(fā))、記錄流、字節(jié)流三種流控模式,可以隨意控制你的作業(yè)速度,讓你的作業(yè)在數(shù)據(jù)庫(kù)可以承受的范圍內(nèi)達(dá)到最佳的同步速度。
注意事項(xiàng):
1.若配置了總record限速,則必須配置單個(gè)channel的record限速
2.若配置了總byte限速,則必須配置單個(gè)channe的byte限速
3.若配置了總record限速和總byte限速,channel并發(fā)數(shù)參數(shù)就會(huì)失效。因?yàn)榕渲昧丝俽ecord限速和總byte限速之后,實(shí)際channel并發(fā)數(shù)是通過(guò)計(jì)算得到的:
計(jì)算公式為:
min(總byte限速/單個(gè)channel的byte限速,總record限速/單個(gè)channel的record限速)文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-510480.html
5.2 內(nèi)存調(diào)整
當(dāng)提升DataX Job內(nèi)Channel并發(fā)數(shù)時(shí),內(nèi)存的占用會(huì)顯著增加,因?yàn)镈ataX作為數(shù)據(jù)交換通道,在內(nèi)存中會(huì)緩存較多的數(shù)據(jù)。例如Channel中會(huì)有一個(gè)Buffer,作為臨時(shí)的數(shù)據(jù)交換的緩沖區(qū),而在部分Reader和Writer的中,也會(huì)存在一些Buffer,為了防止OOM等錯(cuò)誤,需調(diào)大JVM的堆內(nèi)存。
建議將內(nèi)存設(shè)置為4G或者8G,這個(gè)也可以根據(jù)實(shí)際情況來(lái)調(diào)整。
調(diào)整JVM xms xmx參數(shù)的兩種方式:一種是直接更改datax.py腳本;另一種是在啟動(dòng)的時(shí)候,加上對(duì)應(yīng)的參數(shù),如下:
python datax/bin/datax.py --jvm=“-Xms8G -Xmx8G” /path/to/your/job.json文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-510480.html
到了這里,關(guān)于5、DataX(DataX簡(jiǎn)介、DataX架構(gòu)原理、DataX部署、使用、同步MySQL數(shù)據(jù)到HDFS、同步HDFS數(shù)據(jù)到MySQL)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!