目錄
0. 相關文章鏈接
1.?CDC 數(shù)據(jù)同步
1.1.?準備MySQL表
1.2.?flink讀取mysql binlog并寫入kafka
1.3.?flink讀取kafka數(shù)據(jù)并寫入hudi數(shù)據(jù)湖
1.4.?使用datafaker插入數(shù)據(jù)
1.5.?統(tǒng)計數(shù)據(jù)入Hudi情況
1.6.?實時查看數(shù)據(jù)入湖情況
2.?離線批量導入
2.1. 原理
2.2.?WITH 參數(shù)
2.3.?案例
3.?全量接增量
3.1.?WITH 參數(shù)
3.2.?使用流程
3.3.?說明
0. 相關文章鏈接
?Hudi文章匯總?
1.?CDC 數(shù)據(jù)同步
CDC 數(shù)據(jù)保存了完整的數(shù)據(jù)庫變更,當前可通過兩種途徑將數(shù)據(jù)導入 hudi:
- 第一種:通過 cdc-connector 直接對接 DB 的 binlog 將數(shù)據(jù)導入 hudi,優(yōu)點是不依賴消息隊列,缺點是對 db server 造成壓力。
- 第二種:對接 cdc format 消費 kafka 數(shù)據(jù)導入 hudi,優(yōu)點是可擴展性強,缺點是依賴 kafka。
注意:如果上游數(shù)據(jù)無法保證順序,需要指定 write.precombine.field 字段。
1.1.?準備MySQL表
注意:需要提前開啟MySQL的binlog
測試表建表語句如下所示:
create database test;
use test;
create table stu3 (
id int unsigned auto_increment primary key COMMENT '自增id',
name varchar(20) not null comment '學生名字',
school varchar(20) not null comment '學校名字',
nickname varchar(20) not null comment '學生小名',
age int not null comment '學生年齡',
class_num int not null comment '班級人數(shù)',
phone bigint not null comment '電話號碼',
email varchar(64) comment '家庭網(wǎng)絡郵箱',
ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;
1.2.?flink讀取mysql binlog并寫入kafka
步驟一:創(chuàng)建MySQL表(使用flink-sql創(chuàng)建MySQL源的sink表)
create table stu3_binlog(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
) with (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop1',
'port' = '3306',
'username' = 'root',
'password' = 'aaaaaa',
'database-name' = 'test',
'table-name' = 'stu3'
);
步驟二:創(chuàng)建Kafka表(使用flink-sql創(chuàng)建MySQL源的sink表)
create table stu3_binlog_sink_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
) with (
'connector' = 'upsert-kafka'
,'topic' = 'cdc_mysql_stu3_sink'
,'properties.zookeeper.connect' = 'hadoop1:2181'
,'properties.bootstrap.servers' = 'hadoop1:9092'
,'key.format' = 'json'
,'value.format' = 'json'
);
步驟三:將mysql binlog日志寫入kafka(flink-sql內(nèi)部操作)
insert into stu3_binlog_sink_kafka
select * from stu3_binlog;
1.3.?flink讀取kafka數(shù)據(jù)并寫入hudi數(shù)據(jù)湖
步驟一:創(chuàng)建kafka源表(使用flink-sql創(chuàng)建以kafka為源端的表)
create table stu3_binlog_source_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string
) with (
'connector' = 'kafka',
'topic' = 'cdc_mysql_stu3_sink',
'properties.bootstrap.servers' = 'hadoop1:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup'
);
步驟二:創(chuàng)建hudi目標表(使用flink-sql創(chuàng)建以hudi為目標端的表)
create table stu3_binlog_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'insert',
'write.precombine.field' = 'school'
);
步驟三:將kafka數(shù)據(jù)寫入到hudi中(flink-sql內(nèi)部操作)
insert into stu3_binlog_sink_hudi
select * from stu3_binlog_source_kafka;
1.4.?使用datafaker插入數(shù)據(jù)
datafaker安裝及說明:datafaker --- 測試數(shù)據(jù)生成工具-阿里云開發(fā)者社區(qū)
步驟一:新建meta.txt文件
id||int||自增id[:inc(id,1)]
name||varchar(20)||學生名字
school||varchar(20)||學校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||學生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||學生年齡[:age]
class_num||int||班級人數(shù)[:int(10, 100)]
phone||bigint||電話號碼[:phone_number]
email||varchar(64)||家庭網(wǎng)絡郵箱[:email]
ip||varchar(32)||IP地址[:ipv4]
步驟二:生成10000條數(shù)據(jù)并寫入到mysql中的test.stu3表
datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu3 10000 --meta meta.txt
注意:如果要再次生成測試數(shù)據(jù),則需要修改meta.txt將自增id中的1改為比10000大的數(shù),不然會出現(xiàn)主鍵沖突情況。
1.5.?統(tǒng)計數(shù)據(jù)入Hudi情況
create table stu3_binlog_hudi_view(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.precombine.field' = 'school'
);
select count(*) from stu3_binlog_hudi_view;
1.6.?實時查看數(shù)據(jù)入湖情況
create table stu3_binlog_hudi_streaming_view(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.precombine.field' = 'school',
'read.streaming.enabled' = 'true'
);
select * from stu3_binlog_hudi_streaming_view;
2.?離線批量導入
如果存量數(shù)據(jù)來源于其他數(shù)據(jù)源,可以使用批量導入功能,快速將存量數(shù)據(jù)導成 Hoodie 表格式。
2.1. 原理
- 批量導入省去了 avro 的序列化以及數(shù)據(jù)的 merge 過程,后續(xù)不會再有去重操作,數(shù)據(jù)的唯一性需要自己來保證。
- bulk_insert 需要在 Batch Execuiton Mode 下執(zhí)行更高效,Batch 模式默認會按照 partition path 排序輸入消息再寫入 Hoodie,避免 file handle 頻繁切換導致性能下降。
SET execution.runtime-mode = batch;
SET execution.checkpointing.interval = 0;
- bulk_insert write task 的并發(fā)通過參數(shù) write.tasks 指定,并發(fā)的數(shù)量會影響到小文件的數(shù)量,理論上,bulk_insert write task 的并發(fā)數(shù)就是劃分的 bucket 數(shù),當然每個 bucket 在寫到文件大小上限(parquet 120 MB)的時候會 roll over 到新的文件句柄,所以最后:寫文件數(shù)量 >= bulk_insert write task 數(shù)。
2.2.?WITH 參數(shù)
名稱 |
Required |
默認值 |
說明 |
write.operation |
true |
upsert |
配置 bulk_insert 開啟該功能 |
write.tasks |
false |
4 |
bulk_insert 寫 task 的并發(fā),最后的文件數(shù) >=write.tasks |
write.bulk_insert.shuffle_input (從 0.11 開始) |
false |
true |
是否將數(shù)據(jù)按照 partition 字段 shuffle 再通過 write task 寫入,開啟該參數(shù)將減少小文件的數(shù)量 但是可能有數(shù)據(jù)傾斜風險 |
write.bulk_insert.sort_input (從 0.11 開始) |
false |
true |
是否將數(shù)據(jù)線按照 partition 字段排序再寫入,當一個 write task 寫多個 partition,開啟可以減少小文件數(shù)量 |
write.sort.memory |
128 |
sort 算子的可用 managed memory(單位 MB) |
2.3.?案例
步驟一:MySQL建表
create database test;
use test;
create table stu4 (
id int unsigned auto_increment primary key COMMENT '自增id',
name varchar(20) not null comment '學生名字',
school varchar(20) not null comment '學校名字',
nickname varchar(20) not null comment '學生小名',
age int not null comment '學生年齡',
score decimal(4,2) not null comment '成績',
class_num int not null comment '班級人數(shù)',
phone bigint not null comment '電話號碼',
email varchar(64) comment '家庭網(wǎng)絡郵箱',
ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;
步驟二:新建meta.txt文件
id||int||自增id[:inc(id,1)]
name||varchar(20)||學生名字
school||varchar(20)||學校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||學生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||學生年齡[:age]
score||decimal(4,2)||成績[:decimal(4,2,1)]
class_num||int||班級人數(shù)[:int(10, 100)]
phone||bigint||電話號碼[:phone_number]
email||varchar(64)||家庭網(wǎng)絡郵箱[:email]
ip||varchar(32)||IP地址[:ipv4]
步驟三:使用datafaker生成10萬條數(shù)據(jù)并寫入到mysql中的test.stu4表
datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu4 100000 --meta meta.txt
注意:
如果要再次生成測試數(shù)據(jù),
則需要將meta.txt中的自增id改為比100000大的數(shù),
不然會出現(xiàn)主鍵沖突情況。
步驟四:Flink SQL client 創(chuàng)建myql數(shù)據(jù)源
create table stu4(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop1:3306/test?serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'aaaaaa',
'table-name' = 'stu4'
);
步驟五:Flink SQL client創(chuàng)建hudi表
create table stu4_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'bulk_insert',
'write.precombine.field' = 'school'
);
-- 注意:核心點是其中的 write.option 配置為 bulk_insert
步驟六:Flink SQL client執(zhí)行mysql數(shù)據(jù)插入到hudi中
insert into stu4_sink_hudi select * from stu4;
3.?全量接增量
- 如果已經(jīng)有全量的離線 Hoodie 表,需要接上實時寫入,并且保證數(shù)據(jù)不重復,可以開啟 index bootstrap 功能。
- 如果覺得流程冗長,可以在寫入全量數(shù)據(jù)的時候資源調(diào)大直接走流模式寫,全量走完接新數(shù)據(jù)再將資源調(diào)?。ɑ蛘唛_啟限流功能)。
3.1.?WITH 參數(shù)
名稱 |
Required |
默認值 |
說明 |
index.bootstrap.enabled |
true |
false |
開啟索引加載,會將已存表的最新數(shù)據(jù)一次性加載到 state 中 |
index.partition.regex |
false |
* |
設置正則表達式進行分區(qū)篩選,默認為加載全部分區(qū) |
3.2.?使用流程
- CREATE TABLE 創(chuàng)建和 Hoodie 表對應的語句,注意 table type 要正確
- 設置 index.bootstrap.enabled = true開啟索引加載功能
flink conf 中設置 checkpoint 失敗容忍 execution.checkpointing.tolerable-failed-checkpoints = n(取決于checkpoint 調(diào)度次數(shù))等待第一次 checkpoint 成功,表示索引加載完成索引加載完成后可以退出并保存 savepoint (也可以直接用 externalized checkpoint)- 重啟任務將 index.bootstrap.enabled 關閉,參數(shù)配置到合適的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并發(fā)不同,可以重啟避免 shuffle
3.3.?說明
索引加載是阻塞式,所以在索引加載過程中 checkpoint 無法完成索引加載由數(shù)據(jù)流觸發(fā),需要確保每個 partition 都至少有1條數(shù)據(jù),即上游 source 有數(shù)據(jù)進來- 索引加載為并發(fā)加載,根據(jù)數(shù)據(jù)量大小加載時間不同,可以在log中搜索 finish loading the index under partition 和 Load records from file 日志來觀察索引加載的進度
第一次checkpoint成功就表示索引已經(jīng)加載完成,后續(xù)從 checkpoint 恢復時無需再次加載索引
注意:在當前的0.12版本,以上劃橫線的部分已經(jīng)不再需要了。(0.9 cherry pick 分支之后)文章來源:http://www.zghlxwxcb.cn/news/detail-450810.html
注:其他Hudi相關文章鏈接由此進 ->??Hudi文章匯總?文章來源地址http://www.zghlxwxcb.cn/news/detail-450810.html
到了這里,關于Hudi(17):Hudi集成Flink之寫入方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!