ChunJun 是?款穩(wěn)定、易?、?效、批流?體的數(shù)據(jù)集成框架,基于計(jì)算引擎 Flink 實(shí)現(xiàn)多種異構(gòu)數(shù)據(jù)源之間的數(shù)據(jù)同步與計(jì)算。ChunJun 可以把不同來(lái)源、格式、特點(diǎn)性質(zhì)的數(shù)據(jù)在邏輯上或物理上有機(jī)地集中,從?為企業(yè)提供全?的數(shù)據(jù)共享,目前已在上千家公司部署且穩(wěn)定運(yùn)?。
在之前,我們?cè)?jīng)為大家介紹過(guò)如何利用 ChunJun 實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)同步(點(diǎn)擊看正文),本篇將為大家介紹姊妹篇,如何利? ChunJun 實(shí)現(xiàn)數(shù)據(jù)的離線同步。
ChunJun 離線同步案例
離線同步是 ChunJun 的?個(gè)重要特性,下?以最通?的 mysql -> hive 的同步任務(wù)來(lái)介紹離線同步。
配置環(huán)境
找?個(gè)空?錄,接下來(lái)要配置 Flink 和 ChunJun 的環(huán)境,下?以 /root/chunjun_demo/ 為例?。
● 配置 Flink
下載 Flink
wget "http://archive.apache.org/dist/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz"
tar -zxvf chunjun-dist.tar.gz
● 配置 ChunJun
#下載 chunjun, 內(nèi)部依賴 flink 1.12.7
wget https://github.com/DTStack/chunjun/releases/download/v1.12.8/chunjun-dist-1.12-SNAPSHOT.tar.gz
#新創(chuàng)建?個(gè)?錄
mkdir chunjun && cd chunjun
#解壓到指定?錄
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz
解壓好的 ChunJun 有如下?錄:
bin
chunjun-dist
chunjun-examples
lib
● 配置環(huán)境變量
#配置 Flink 環(huán)境變量
echo "FLINK_HOME=/root/chunjun_demo/flink-1.12.7" >> /etc/profile.d/sh.local
#配置 Chunjun 的環(huán)境變量
echo "CHUNJUN_DIST=/root/chunjun_demo/chunjun/chunjun-dist" >> /etc/profile.d/sh.local
#刷新?lián)Q新變量
. /etc/profile.d/sh.local
● 在 Yarn 上?啟動(dòng) Flink Session
#啟動(dòng) Flink Session
bash $FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_DIST -d
輸出如下:
echo "stop" | $FLINK_HOME/bin/yarn-session.sh -id application_1683599622970_0270
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
yarn application -kill application_1683599622970_0270
下?提交任務(wù)會(huì)?到 Flink Session 這個(gè) Yarn Application Id (application_1683599622970_0270)。
● 其他配置
如果? parquet 格式,需要把 flink-parquet_2.12-1.12.7.jar 放?到 flink/lib 下?, 在上?的例?中,需要放到 $FLINK_HOME/lib ??。
提交任務(wù)
● 在 MySQL 準(zhǔn)備數(shù)據(jù)
-- 創(chuàng)建?個(gè)名為ecommerce_db的數(shù)據(jù)庫(kù),?于存儲(chǔ)電商?站的數(shù)據(jù)
CREATE DATABASE IF NOT EXISTS chunjun;
USE chunjun;
-- 創(chuàng)建?個(gè)名為orders的表,?于存儲(chǔ)訂單信息
CREATE TABLE IF NOT EXISTS orders (
id INT AUTO_INCREMENT PRIMARY KEY, -- ?增主鍵
order_id VARCHAR(50) NOT NULL, -- 訂單編號(hào),不能為空
user_id INT NOT NULL, -- ?戶ID,不能為空
product_id INT NOT NULL, -- 產(chǎn)品ID,不能為空
quantity INT NOT NULL, -- 訂購(gòu)數(shù)量,不能為空
order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
-- 訂單?期,默認(rèn)值為當(dāng)前時(shí)間戳,不能為空
);
-- 插??些測(cè)試數(shù)據(jù)到orders表
INSERT INTO orders (order_id, user_id, product_id, quantity)
VALUES ('ORD123', 1, 101, 2),
('ORD124', 2, 102, 1),
('ORD125', 3, 103, 3),
('ORD126', 1, 104, 1),
('ORD127', 2, 105, 5);
select * from chunjun.orders;
如果沒(méi)有 MySQL 的話,可以? docker 快速創(chuàng)建?個(gè)。
docker pull mysql:8.0.12
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0.12
● 創(chuàng)建 Hive 表
CREATE DATABASE IF NOT EXISTS chunjun;
USE chunjun;
-- 創(chuàng)建?個(gè)名為orders的表,?于存儲(chǔ)訂單信息
CREATE TABLE IF NOT EXISTS chunjun.orders (
id INT,
order_id VARCHAR(50),
user_id INT,
product_id INT,
quantity INT,
order_date TIMESTAMP
)
STORED AS PARQUET;
-- 查看 hive 表,底層的 HDFS ?件位置,下?的 SQL 結(jié)果?? Location 字段,就是 HDFS ?件的位置。
desc formatted chunjun.orders;
-- Location: hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders
-- ?會(huì)配置同步任務(wù)的時(shí)候會(huì)?到 hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders
● 在當(dāng)前?錄( /root/chunjun_demo/ ) 配置?個(gè)任務(wù) mysql_hdfs.json
vim mysql_hdfs.json 輸?如下內(nèi)容:
{
"job": {
"content": [
{
"reader": {
"parameter": {
"connection": [
{
"schema": "chunjun",
"jdbcUrl": [ "jdbc:mysql://172.16.85.200:3306/chunjun" ],
"table": [ "orders" ]
}
],
"username": "root",
"password": "123456",
"column": [
{ "name": "id", "type": "INT" },
{ "name": "order_id", "type": "VARCHAR" },
{ "name": "user_id", "type": "INT" },
{ "name": "product_id", "type": "INT" },
{ "name": "quantity", "type": "INT" },
{ "name": "order_date", "type": "TIMESTAMP" }
]
},
"name": "mysqlreader"
},
"writer": {
"parameter": {
"path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders",
"defaultFS": "hdfs://ns1",
"hadoopConfig": {
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000",
"dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000",
"dfs.client.failover.proxy.provider.ns1":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"column": [
{ "name": "id", "type": "INT" },
{ "name": "order_id", "type": "VARCHAR" },
{ "name": "user_id", "type": "INT" },
{ "name": "product_id", "type": "INT" },
{ "name": "quantity", "type": "INT" },
{ "name": "order_date", "type": "TIMESTAMP" }
],
"writeMode": "overwrite",
"encoding": "utf-8",
"fileType": "parquet",
"fullColumnName":
[ "id", "order_id", "user_id", "product_id", "quantity", "order_date"],
"fullColumnType":
[ "INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP" ]
},
"name": "hdfswriter"
}
}
],
"setting": {
"errorLimit": {
"record": 0
},
"speed": {
"bytes": 0,
"channel": 1
}
}
}
}
因?yàn)槲覀円獙?MySQL 同步到 Hive ??,但是如果直接同步 Hive 的話,內(nèi)部會(huì)? jdbc,? jdbc 的效率不?,因此我們可以直接把數(shù)據(jù)同步到 Hive 底層的 HDFS 上?,所以 writer ?到了 hdfswriter。腳本解析如下:
{
"job": {
"content": [
{
"reader": {
"parameter": {
"connectionComment": "數(shù)據(jù)庫(kù)鏈接, 數(shù)據(jù)庫(kù), 表, 賬號(hào), 密碼",
"connection": [
{
"schema": "chunjun",
"jdbcUrl": [ "jdbc:mysql://172.16.85.200:3306/chunjun" ],
"table": [ "orders" ]
}
],
"username": "root",
"password": "123456",
"columnComment": "要同步的列選擇, 可以選擇部分列",
"column": [
{ "name": "id", "type": "INT" },
{ "name": "order_id", "type": "VARCHAR" },
{ "name": "user_id", "type": "INT" },
{ "name": "product_id", "type": "INT" },
{ "name": "quantity", "type": "INT" },
{ "name": "order_date", "type": "TIMESTAMP" }
]
},
"nameComment" : "source 是 mysql",
"name": "mysqlreader"
},
"writer": {
"parameter": {
"pathComment": "HDFS 上?的路徑, 通過(guò) hive 語(yǔ)句的 desc formatted 查看",
"path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders",
"defaultFS": "hdfs://ns1",
"hadoopConfigComment": "是 hdfs ?可?最基本的配置, 在 Hadoop 配置?件 hdfs-site.xml 可以找到",
"hadoopConfig": {
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000",
"dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000",
"dfs.client.failover.proxy.provider.ns1":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"columnComment": "要同步的列選擇, 可以選擇部分列",
"column": [
{ "name": "id", "type": "INT" },
{ "name": "order_id", "type": "VARCHAR" },
{ "name": "user_id", "type": "INT" },
{ "name": "product_id", "type": "INT" },
{ "name": "quantity", "type": "INT" },
{ "name": "order_date", "type": "TIMESTAMP" }
],
"writeModeComment": "覆蓋寫?到 hdfs 上?的?件, 可選 overwrite, append(默認(rèn)模式)",
"writeMode": "overwrite",
"encoding": "utf-8",
"fileTypeComment": "可選 orc, parquet, text",
"fileType": "parquet",
"fullColumnNameComment": "全部字段,有時(shí)候 column ??同步部分字段,但是?需要有全部字段的格式,例如 fileType : text ",
"fullColumnName": [ "id", "order_id", "user_id", "product_id", "quantity", "order_date"],
"fullColumnTypeComment": "全部字段的類型",
"fullColumnType": [ "INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP" ]
},
"nameComment" : "sink 是 hdfs",
"name": "hdfswriter"
}
}
],
"setting": {
"errorLimit": {
"record": 0
},
"speed": {
"bytes": 0,
"channel": 1
}
}
}
}
● 提交任務(wù)
bash chunjun/bin/chunjun-yarn-session.sh -job mysql_hdfs.json -confProp
{\"yarn.application.id\":\"application_1683599622970_0270\"}
● 查看任務(wù)
任務(wù)同步完成, 可以看?下 HDFS 上?的數(shù)據(jù)。
查看?下 Hive 表的數(shù)據(jù)。
注意, 如果是分區(qū)的 Hive 表,需要?動(dòng)刷新?下 Hive 的元數(shù)據(jù), 使? MSCK 命令。(MSCK 是 Hive 中的?個(gè)命令,?于檢查表中的分區(qū),并將其添加到 Hive 元數(shù)據(jù)中)
MSCK REPAIR TABLE my_table;
ChunJun 離線同步原理解析
HDFS 文件同步原理
· 對(duì)于?件系統(tǒng),同步的時(shí)候會(huì)先把?件寫?到 path + [filename] ?錄??的 .data 的?件??,如果任務(wù)失敗,那么 .data ??的?件不會(huì)?效。
· 在 TaskManager 上?所有 task 任務(wù)結(jié)束的時(shí)候,會(huì)在 JobManager 執(zhí)? FinalizeOnMaster 的 finalizeGlobal ?法, 最終會(huì)調(diào)?到 moveAllTmpDataFileToDir , 把 .data ??的?件移除到 .data 的上?層。
public interface FinalizeOnMaster {
/**
The method is invoked on the master (JobManager) after all (parallel) instances of an OutputFormat finished.
Params:parallelism – The parallelism with which the format or functions was run.
Throws:IOException – The finalization may throw exceptions, which may cause the job to abort.
*/
void finalizeGlobal(int parallelism) throws IOException;
}
// 在 JobManager 執(zhí)?
@Override
protected void moveAllTmpDataFileToDir() {
if (fs == null) {
openSource();
}
String currentFilePath = "";
try {
Path dir = new Path(outputFilePath);
Path tmpDir = new Path(tmpPath);
FileStatus[] dataFiles = fs.listStatus(tmpDir);
for (FileStatus dataFile : dataFiles) {
currentFilePath = dataFile.getPath().getName();
fs.rename(dataFile.getPath(), dir);
LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir);
}
fs.delete(tmpDir, true);
} catch (IOException e) {
throw new ChunJunRuntimeException(
String.format(
"can't move file:[%s] to dir:[%s]", currentFilePath, outputFilePath),
e);
}
}
增量同步
增量同步主要針對(duì)某些只有 Insert 操作的表,隨著業(yè)務(wù)增?,表內(nèi)數(shù)據(jù)越來(lái)越多。如果每次都同步整表的話,消耗的時(shí)間和資源會(huì)?較多。因此需要?個(gè)增量同步的功能,每次只讀取增加部分的數(shù)據(jù)。
● 實(shí)現(xiàn)原理
其實(shí)現(xiàn)原理實(shí)際上就是配合增量鍵在查詢的 sql 語(yǔ)句中拼接過(guò)濾條件,?如 where id > ? ,將之前已經(jīng)讀取過(guò)的數(shù)據(jù)過(guò)濾出去。
增量同步是針對(duì)于兩個(gè)及以上的同步作業(yè)來(lái)說(shuō)的。對(duì)于初次執(zhí)?增量同步的作業(yè)??,實(shí)際上是整表同步,不同于其他作業(yè)的在于增量同步作業(yè)會(huì)在作業(yè)執(zhí)?完成后記錄?個(gè) endLocation 指標(biāo),并將這個(gè)指標(biāo)上傳到 prometheus 以供后續(xù)使?。
除第?次作業(yè)外,后續(xù)的所有增量同步作業(yè)都會(huì)取上?次作業(yè)的 endLocation 做為本次作業(yè)的過(guò)濾依據(jù)(startLocation)。?如第?次作業(yè)執(zhí)?完后,endLocation 為10,那么下?個(gè)作業(yè)就會(huì)構(gòu)建出例如 SELECT id,name,age from table where id > 10 的 SQL 語(yǔ)句,達(dá)到增量讀取的?的。
● 使用限制
· 只有 RDB 的 Reader 插件可以使?
· 通過(guò)構(gòu)建SQL過(guò)濾語(yǔ)句實(shí)現(xiàn),因此只能?于RDB插件
· 增量同步只關(guān)?讀,不關(guān)?寫,因此只與Reader插件有關(guān)
· 增量字段只能為數(shù)值類型和時(shí)間類型
· 指標(biāo)需要上傳到 prometheus,? prometheus 不?持字符串類型,因此只?持?jǐn)?shù)據(jù)類型和時(shí)間類型,時(shí)間類型會(huì)轉(zhuǎn)換成時(shí)間戳后上傳
· 增量鍵的值可以重復(fù),但必須遞增
· 由于使? '>' 的緣故,要求字段必須遞增
斷點(diǎn)續(xù)傳
斷點(diǎn)續(xù)傳是為了在離線同步的時(shí)候,針對(duì)?時(shí)間同步任務(wù)如超過(guò)1天,如果在同步過(guò)程中由于某些原因?qū)е氯蝿?wù)失敗,從頭再來(lái)的話成本?常?,因此需要?個(gè)斷點(diǎn)續(xù)傳的功能從任務(wù)失敗的地?繼續(xù)。
● 實(shí)現(xiàn)原理
· 基于 Flink 的 checkpoint,在 checkpoint 的時(shí)候 會(huì)存儲(chǔ) source 端最后?條數(shù)據(jù)的某個(gè)字段值,sink 端插件執(zhí)?事務(wù)提交。
· 在任務(wù)失敗,后續(xù)通過(guò) checkpoint 重新運(yùn)?時(shí),source 端在?成 select 語(yǔ)句的時(shí)候?qū)?state ?的值作為條件拼接進(jìn)?數(shù)據(jù)的過(guò)濾,達(dá)到從上次失敗位點(diǎn)進(jìn)?恢復(fù)。
· jdbcInputFormat 在拼接讀取 SQL 時(shí),如果從 checkpoint 恢復(fù)的 state 不為空且 restoreColumn 不為空,則此時(shí)會(huì)將 checkpoint ?的 state 作為起點(diǎn)開(kāi)始讀取數(shù)據(jù)。
● 適用場(chǎng)景
通過(guò)上述原理我們可以知道 source 端必須是 RDB 類型插件,因?yàn)槭峭ㄟ^(guò) select 語(yǔ)句拼接 where 條件進(jìn)?數(shù)據(jù)過(guò)濾達(dá)到斷點(diǎn)續(xù)傳的,同時(shí)斷點(diǎn)續(xù)傳需要指定?個(gè)字段作為過(guò)濾條件,且此字段要求是遞增的。
· 任務(wù)需要開(kāi)啟 checkpoint
· reader 為 RDB 的插件均?持且 writer ?持事務(wù)的插件(如 rdb filesystem 等),如果下游是冪等性則 writer 插件也不需要?持事務(wù)
· 作為斷點(diǎn)續(xù)傳的字段在源表?的數(shù)據(jù)是遞增的,因?yàn)檫^(guò)濾條件是 >
《數(shù)棧產(chǎn)品白皮書》:https://www.dtstack.com/resources/1004?src=szsm
《數(shù)據(jù)治理行業(yè)實(shí)踐白皮書》下載地址:https://www.dtstack.com/resources/1001?src=szsm
想了解或咨詢更多有關(guān)袋鼠云大數(shù)據(jù)產(chǎn)品、行業(yè)解決方案、客戶案例的朋友,瀏覽袋鼠云官網(wǎng):https://www.dtstack.com/?src=szbky文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-450193.html
同時(shí),歡迎對(duì)大數(shù)據(jù)開(kāi)源項(xiàng)目有興趣的同學(xué)加入「袋鼠云開(kāi)源框架釘釘技術(shù)qun」,交流最新開(kāi)源技術(shù)信息,qun號(hào)碼:30537511,項(xiàng)目地址:https://github.com/DTStack文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-450193.html
到了這里,關(guān)于技術(shù)干貨|如何利用 ChunJun 實(shí)現(xiàn)數(shù)據(jù)離線同步?的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!