国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Hudi(17):Hudi集成Flink之寫入方式

這篇具有很好參考價值的文章主要介紹了Hudi(17):Hudi集成Flink之寫入方式。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

0. 相關文章鏈接

1.?CDC 數(shù)據(jù)同步

1.1.?準備MySQL表

1.2.?flink讀取mysql binlog并寫入kafka

1.3.?flink讀取kafka數(shù)據(jù)并寫入hudi數(shù)據(jù)湖

1.4.?使用datafaker插入數(shù)據(jù)

1.5.?統(tǒng)計數(shù)據(jù)入Hudi情況

1.6.?實時查看數(shù)據(jù)入湖情況

2.?離線批量導入

2.1. 原理

2.2.?WITH 參數(shù)

2.3.?案例

3.?全量接增量

3.1.?WITH 參數(shù)

3.2.?使用流程

3.3.?說明


0. 相關文章鏈接

?Hudi文章匯總?

1.?CDC 數(shù)據(jù)同步

CDC 數(shù)據(jù)保存了完整的數(shù)據(jù)庫變更,當前可通過兩種途徑將數(shù)據(jù)導入 hudi:

Hudi(17):Hudi集成Flink之寫入方式

  • 第一種:通過 cdc-connector 直接對接 DB 的 binlog 將數(shù)據(jù)導入 hudi,優(yōu)點是不依賴消息隊列,缺點是對 db server 造成壓力。
  • 第二種:對接 cdc format 消費 kafka 數(shù)據(jù)導入 hudi,優(yōu)點是可擴展性強,缺點是依賴 kafka。

注意:如果上游數(shù)據(jù)無法保證順序,需要指定 write.precombine.field 字段。

1.1.?準備MySQL表

注意:需要提前開啟MySQL的binlog

測試表建表語句如下所示:

create database test;
use test;
create table stu3 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '學生名字',
  school varchar(20) not null comment '學校名字',
  nickname varchar(20) not null comment '學生小名',
  age int not null comment '學生年齡',
  class_num int not null comment '班級人數(shù)',
  phone bigint not null comment '電話號碼',
  email varchar(64) comment '家庭網(wǎng)絡郵箱',
  ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;

1.2.?flink讀取mysql binlog并寫入kafka

步驟一:創(chuàng)建MySQL表(使用flink-sql創(chuàng)建MySQL源的sink表)

create table stu3_binlog(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop1',
  'port' = '3306',
  'username' = 'root',
  'password' = 'aaaaaa',
  'database-name' = 'test',
  'table-name' = 'stu3'
);

步驟二:創(chuàng)建Kafka表(使用flink-sql創(chuàng)建MySQL源的sink表)

create table stu3_binlog_sink_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'upsert-kafka'
  ,'topic' = 'cdc_mysql_stu3_sink'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'key.format' = 'json'
  ,'value.format' = 'json'
);

步驟三:將mysql binlog日志寫入kafka(flink-sql內(nèi)部操作)

insert into stu3_binlog_sink_kafka
select * from stu3_binlog;

1.3.?flink讀取kafka數(shù)據(jù)并寫入hudi數(shù)據(jù)湖

步驟一:創(chuàng)建kafka源表(使用flink-sql創(chuàng)建以kafka為源端的表)

create table stu3_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_stu3_sink',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup'
);

步驟二:創(chuàng)建hudi目標表(使用flink-sql創(chuàng)建以hudi為目標端的表)

create table stu3_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
partitioned by (`school`)
with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'insert',
  'write.precombine.field' = 'school'
);

步驟三:將kafka數(shù)據(jù)寫入到hudi中(flink-sql內(nèi)部操作)

insert into stu3_binlog_sink_hudi
select * from  stu3_binlog_source_kafka;

1.4.?使用datafaker插入數(shù)據(jù)

datafaker安裝及說明:datafaker --- 測試數(shù)據(jù)生成工具-阿里云開發(fā)者社區(qū)

步驟一:新建meta.txt文件

id||int||自增id[:inc(id,1)]
name||varchar(20)||學生名字
school||varchar(20)||學校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||學生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||學生年齡[:age]
class_num||int||班級人數(shù)[:int(10, 100)]
phone||bigint||電話號碼[:phone_number]
email||varchar(64)||家庭網(wǎng)絡郵箱[:email]
ip||varchar(32)||IP地址[:ipv4]

步驟二:生成10000條數(shù)據(jù)并寫入到mysql中的test.stu3表

datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu3 10000 --meta meta.txt

注意:如果要再次生成測試數(shù)據(jù),則需要修改meta.txt將自增id中的1改為比10000大的數(shù),不然會出現(xiàn)主鍵沖突情況。

1.5.?統(tǒng)計數(shù)據(jù)入Hudi情況

create table stu3_binlog_hudi_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school'
  );

select count(*) from stu3_binlog_hudi_view;  

1.6.?實時查看數(shù)據(jù)入湖情況

create table stu3_binlog_hudi_streaming_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school',
  'read.streaming.enabled' = 'true'
  );

 
select * from  stu3_binlog_hudi_streaming_view;

2.?離線批量導入

如果存量數(shù)據(jù)來源于其他數(shù)據(jù)源,可以使用批量導入功能,快速將存量數(shù)據(jù)導成 Hoodie 表格式。

2.1. 原理

  • 批量導入省去了 avro 的序列化以及數(shù)據(jù)的 merge 過程,后續(xù)不會再有去重操作,數(shù)據(jù)的唯一性需要自己來保證。
  • bulk_insert 需要在 Batch Execuiton Mode 下執(zhí)行更高效,Batch 模式默認會按照 partition path 排序輸入消息再寫入 Hoodie,避免 file handle 頻繁切換導致性能下降。
SET execution.runtime-mode = batch; 
SET execution.checkpointing.interval = 0;
  • bulk_insert write task 的并發(fā)通過參數(shù) write.tasks 指定,并發(fā)的數(shù)量會影響到小文件的數(shù)量,理論上,bulk_insert write task 的并發(fā)數(shù)就是劃分的 bucket 數(shù),當然每個 bucket 在寫到文件大小上限(parquet 120 MB)的時候會 roll over 到新的文件句柄,所以最后:寫文件數(shù)量 >= bulk_insert write task 數(shù)。

2.2.?WITH 參數(shù)

名稱

Required

默認值

說明

write.operation

true

upsert

配置 bulk_insert 開啟該功能

write.tasks

false

4

bulk_insert 寫 task 的并發(fā),最后的文件數(shù) >=write.tasks

write.bulk_insert.shuffle_by_partition

write.bulk_insert.shuffle_input

(從 0.11 開始)

false

true

是否將數(shù)據(jù)按照 partition 字段 shuffle 再通過 write task 寫入,開啟該參數(shù)將減少小文件的數(shù)量 但是可能有數(shù)據(jù)傾斜風險

write.bulk_insert.sort_by_partition

write.bulk_insert.sort_input

(從 0.11 開始)

false

true

是否將數(shù)據(jù)線按照 partition 字段排序再寫入,當一個 write task 寫多個 partition,開啟可以減少小文件數(shù)量

write.sort.memory

128

sort 算子的可用 managed memory(單位 MB)

2.3.?案例

步驟一:MySQL建表

create database test;
use test;
create table stu4 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '學生名字',
  school varchar(20) not null comment '學校名字',
  nickname varchar(20) not null comment '學生小名',
  age int not null comment '學生年齡',
  score decimal(4,2) not null comment '成績',
  class_num int not null comment '班級人數(shù)',
  phone bigint not null comment '電話號碼',
  email varchar(64) comment '家庭網(wǎng)絡郵箱',
  ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;

步驟二:新建meta.txt文件

id||int||自增id[:inc(id,1)]
name||varchar(20)||學生名字
school||varchar(20)||學校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||學生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||學生年齡[:age]
score||decimal(4,2)||成績[:decimal(4,2,1)]
class_num||int||班級人數(shù)[:int(10, 100)]
phone||bigint||電話號碼[:phone_number]
email||varchar(64)||家庭網(wǎng)絡郵箱[:email]
ip||varchar(32)||IP地址[:ipv4]

步驟三:使用datafaker生成10萬條數(shù)據(jù)并寫入到mysql中的test.stu4表

datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu4 100000 --meta meta.txt

注意:
    如果要再次生成測試數(shù)據(jù),
    則需要將meta.txt中的自增id改為比100000大的數(shù),
    不然會出現(xiàn)主鍵沖突情況。

步驟四:Flink SQL client 創(chuàng)建myql數(shù)據(jù)源

create table stu4(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop1:3306/test?serverTimezone=GMT%2B8',
  'username' = 'root',
  'password' = 'aaaaaa',
  'table-name' = 'stu4'
);

步驟五:Flink SQL client創(chuàng)建hudi表

 create table stu4_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert',
  'write.precombine.field' = 'school'
);

-- 注意:核心點是其中的 write.option 配置為 bulk_insert

步驟六:Flink SQL client執(zhí)行mysql數(shù)據(jù)插入到hudi中

insert into stu4_sink_hudi select * from stu4;

3.?全量接增量

  • 如果已經(jīng)有全量的離線 Hoodie 表,需要接上實時寫入,并且保證數(shù)據(jù)不重復,可以開啟 index bootstrap 功能。
  • 如果覺得流程冗長,可以在寫入全量數(shù)據(jù)的時候資源調(diào)大直接走流模式寫,全量走完接新數(shù)據(jù)再將資源調(diào)?。ɑ蛘唛_啟限流功能)。

3.1.?WITH 參數(shù)

名稱

Required

默認值

說明

index.bootstrap.enabled

true

false

開啟索引加載,會將已存表的最新數(shù)據(jù)一次性加載到 state 中

index.partition.regex

false

*

設置正則表達式進行分區(qū)篩選,默認為加載全部分區(qū)

3.2.?使用流程

  1. CREATE TABLE 創(chuàng)建和 Hoodie 表對應的語句,注意 table type 要正確
  2. 設置 index.bootstrap.enabled = true開啟索引加載功能
  3. flink conf 中設置 checkpoint 失敗容忍 execution.checkpointing.tolerable-failed-checkpoints = n(取決于checkpoint 調(diào)度次數(shù))
  4. 等待第一次 checkpoint 成功,表示索引加載完成
  5. 索引加載完成后可以退出并保存 savepoint (也可以直接用 externalized checkpoint)
  6. 重啟任務將 index.bootstrap.enabled 關閉,參數(shù)配置到合適的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并發(fā)不同,可以重啟避免 shuffle

3.3.?說明

  1. 索引加載是阻塞式,所以在索引加載過程中 checkpoint 無法完成
  2. 索引加載由數(shù)據(jù)流觸發(fā),需要確保每個 partition 都至少有1條數(shù)據(jù),即上游 source 有數(shù)據(jù)進來
  3. 索引加載為并發(fā)加載,根據(jù)數(shù)據(jù)量大小加載時間不同,可以在log中搜索 finish loading the index under partition 和 Load records from file 日志來觀察索引加載的進度
  4. 第一次checkpoint成功就表示索引已經(jīng)加載完成,后續(xù)從 checkpoint 恢復時無需再次加載索引

注意:在當前的0.12版本,以上劃橫線的部分已經(jīng)不再需要了。(0.9 cherry pick 分支之后)


注:其他Hudi相關文章鏈接由此進 ->??Hudi文章匯總?文章來源地址http://www.zghlxwxcb.cn/news/detail-450810.html


到了這里,關于Hudi(17):Hudi集成Flink之寫入方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Hudi(19):Hudi集成Flink之索引和Catalog

    目錄 0. 相關文章鏈接 1. Bucket索引(從 0.11 開始支持) 1.1.?WITH參數(shù) 1.2.?和 state 索引的對比 2.?Hudi Catalog(從 0.12.0 開始支持) 2.1. 概述 2.2.?WITH 參數(shù) 2.3.?使用dfs方式 ?Hudi文章匯總? ????????默認的 flink 流式寫入使用 state 存儲索引信息:primary key 到 fileId 的映射關系。當

    2024年02月05日
    瀏覽(24)
  • 問題:Spark SQL 讀不到 Flink 寫入 Hudi 表的新數(shù)據(jù),打開新 Session 才可見

    問題:Spark SQL 讀不到 Flink 寫入 Hudi 表的新數(shù)據(jù),打開新 Session 才可見

    博主歷時三年精心創(chuàng)作的《大數(shù)據(jù)平臺架構(gòu)與原型實現(xiàn):數(shù)據(jù)中臺建設實戰(zhàn)》一書現(xiàn)已由知名IT圖書品牌電子工業(yè)出版社博文視點出版發(fā)行,點擊《重磅推薦:建大數(shù)據(jù)平臺太難了!給我發(fā)個工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側(cè)

    2024年02月22日
    瀏覽(23)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目錄 0. 相關文章鏈接 1.?創(chuàng)建表 1.1.?啟動spark-sql 1.2.?建表參數(shù) 1.3.?創(chuàng)建非分區(qū)表 1.4.?創(chuàng)建分區(qū)表 1.5.?在已有的hudi表上創(chuàng)建新表 1.6.?通過CTAS (Create Table As Select)建表 2.?插入數(shù)據(jù) 2.1.?向非分區(qū)表插入數(shù)據(jù) 2.2.?向分區(qū)表動態(tài)分區(qū)插入數(shù)據(jù) 2.3.?向分區(qū)表靜態(tài)分區(qū)插入數(shù)據(jù) 2.4

    2024年02月06日
    瀏覽(20)
  • Hudi-集成Spark之spark-sql方式

    啟動spark-sql 創(chuàng)建表 建表參數(shù): 參數(shù)名 默認值 說明 primaryKey uuid 表的主鍵名,多個字段用逗號分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的預合并字段。同 hoodie.datasource.write.precombine.field type cow 創(chuàng)建的表類型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    瀏覽(23)
  • Hudi0.14.0 集成 Spark3.2.3(IDEA編碼方式)

    本次在IDEA下使用Scala語言進行開發(fā),具體環(huán)境搭建查看文章 IDEA 下 Scala Maven 開發(fā)環(huán)境搭建。 1.1 添加maven依賴 創(chuàng)建Maven工程,pom文件:

    2024年01月24日
    瀏覽(22)
  • Hudi0.14.0集成Spark3.2.3(Spark Shell方式)

    1.1 啟動Spark Shell

    2024年01月24日
    瀏覽(16)
  • flink1.17.0 集成kafka,并且計算

    flink1.17.0 集成kafka,并且計算

    flink是實時計算的重要集成組件,這里演示如何集成,并且使用一個小例子。例子是kafka輸入消息,用逗號隔開,統(tǒng)計每個相同單詞出現(xiàn)的次數(shù),這么一個功能。 這里我使用的kafka版本是3.2.0,部署的方法可以參考, kafka部署 啟動后查看java進程是否存在,存在后執(zhí)行下一步。

    2024年02月09日
    瀏覽(18)
  • hadoop3.2.4集成flink 1.17.0

    hadoop3.2.4集成flink 1.17.0

    flink安裝部署有三種方式 local:單機模式,盡量不使用 standalone: flink自帶集群,資源管理由flink集群管理,開發(fā)環(huán)境測試使用,不需要hadoop集群 flink on yarn: 把資源管理交給yarn實現(xiàn),計算機資源統(tǒng)一由Haoop YARN管理,生產(chǎn)環(huán)境測試,需要先啟動hadoop集群。(這里分為可以繼續(xù)細分

    2024年02月17日
    瀏覽(21)
  • 基于數(shù)據(jù)湖的流批一體:flink1.15.3與Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    基于數(shù)據(jù)湖的流批一體:flink1.15.3與Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    前言:為實現(xiàn)基于數(shù)據(jù)湖的流批一體,采用業(yè)內(nèi)主流技術棧hudi、flink、CDH(hive、spark)。flink使用sql client與hive的catalog打通,可以與hive共享元數(shù)據(jù),使用sql client可操作hive中的表,實現(xiàn)批流一體;flink與hudi集成可以實現(xiàn)數(shù)據(jù)實時入湖;hudi與hive集成可以實現(xiàn)湖倉一體,用flink實

    2024年02月12日
    瀏覽(26)
  • CDH6.3.2 集成 Flink 1.17.0 失敗過程

    CDH6.3.2 集成 Flink 1.17.0 失敗過程

    目錄 一:下載Flink,并制作parcel包 1.相關資源下載 2. 修改配置 準備工作一: 準備工作二: 3. 開始build 二:開始在CDH頁面分發(fā)激活 ?三:CDH添加Flink-yarn 服務 ?四:啟動不起來的問題解決 五:CDH6.3.2集群集成zookeeper3.6.3 六:重新適配Flink服務 環(huán)境說明: cdh版本:cdh6.3.2 組件版本信

    2024年01月17日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包