国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】

這篇具有很好參考價(jià)值的文章主要介紹了【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、讀取方式

1 流讀(Streaming Query)

當(dāng)前表默認(rèn)是快照讀取,即讀取最新的全量快照數(shù)據(jù)并一次性返回。通過參數(shù) read.streaming.enabled 參數(shù)開啟流讀模式,通過 read.start-commit 參數(shù)指定起始消費(fèi)位置,支持指定 earliest 從最早消費(fèi)。

  • 1.with參數(shù)
名稱 Required 默認(rèn)值 說明
read.streaming.enabled false false 設(shè)置 true 開啟流讀模式
read.start-commit false 最新 commit 指定 ‘yyyyMMddHHmmss’ 格式的起始 commit(閉區(qū)間)
read.streaming.skip_compaction false false 流讀時(shí)是否跳過 compaction 的 commits,跳過 compaction 有兩個(gè)用途:1)避免 upsert 語義下重復(fù)消費(fèi) (compaction 的 instant 為重復(fù)數(shù)據(jù),如果不跳過,有小概率會(huì)重復(fù)消費(fèi))2) changelog 模式下保證語義正確性 0.11 開始,以上兩個(gè)問題已經(jīng)通過保留 compaction 的 instant time 修復(fù)
clean.retain_commits false 10 cleaner 最多保留的歷史 commits 數(shù),大于此數(shù)量的歷史 commits 會(huì)被清理掉,changelog 模式下,這個(gè)參數(shù)可以控制 changelog 的保留時(shí)間,例如 checkpoint 周期為 5 分鐘一次,默認(rèn)最少保留 50 分鐘的時(shí)間。

注意:當(dāng)參數(shù) read.streaming.skip_compaction 打開并且 streaming reader 消費(fèi)落后于clean.retain_commits 數(shù)時(shí),流讀可能會(huì)丟失數(shù)據(jù)。從 0.11 開始,compaction 不會(huì)再變更 record 的 instant time,因此理論上數(shù)據(jù)不會(huì)再重復(fù)消費(fèi),但是還是會(huì)重復(fù)讀取并丟棄,因此額外的開銷還是無法避免,對性能有要求的話還是可以開啟此參數(shù)。

案例展示:
CREATE TABLE t5(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t5',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'   -- 默認(rèn)60s
);


insert into t5 select * from sourceT;

select * from t5;

二、限流

限流的邏輯是,源頭數(shù)據(jù)量級(jí)很大,百億級(jí)別。
下面是數(shù)據(jù)流向圖:
全量&增量數(shù)據(jù) --> kafka --> flink --> hudi

  • 限流,是限制的flink寫出到hudi的速度。這樣就減少了flink的背壓,消費(fèi)按照給定速率消費(fèi)。
    這樣就可以提高作業(yè)的穩(wěn)定性。

如果將全量數(shù)據(jù)(百億數(shù)量級(jí)) 和增量先同步到 kafka,再通過 flink 流式消費(fèi)的方式將庫表數(shù)據(jù)直接導(dǎo)成 hoodie 表,因?yàn)橹苯酉M(fèi)全量部分?jǐn)?shù)據(jù):量大(吞吐高)、亂序嚴(yán)重(寫入的 partition 隨機(jī)),會(huì)導(dǎo)致寫入性能退化,出現(xiàn)吞吐毛刺,這時(shí)候可以開啟限速參數(shù),保證流量平穩(wěn)寫入。
WITH 參數(shù)

名稱 Required 默認(rèn)值 說明
write.rate.limit false 0 默認(rèn)關(guān)閉限速

三、寫入方式

1.CDC 數(shù)據(jù)同步

CDC 數(shù)據(jù)保存了完整的數(shù)據(jù)庫變更,當(dāng)前可通過兩種途徑將數(shù)據(jù)導(dǎo)入 hudi:
【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】,大數(shù)據(jù),數(shù)據(jù)湖,hudi,大數(shù)據(jù),hadoop
第一種:通過 cdc-connector 直接對接 DB 的 binlog 將數(shù)據(jù)導(dǎo)入 hudi,優(yōu)點(diǎn)是不依賴消息隊(duì)列,缺點(diǎn)是對 db server 造成壓力。
第二種:對接 cdc format 消費(fèi) kafka 數(shù)據(jù)導(dǎo)入 hudi,優(yōu)點(diǎn)是可擴(kuò)展性強(qiáng),缺點(diǎn)是依賴 kafka。

使用mysql進(jìn)行案例分析:

1.使用第二種方式 cdc+kafka進(jìn)行mysql數(shù)據(jù)同步到hudi

  • 1)準(zhǔn)備MySQL表
    (1)MySQL開啟binlog
    (2)建表
create database test;
use test;
create table stu3 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '學(xué)生名字',
  school varchar(20) not null comment '學(xué)校名字',
  nickname varchar(20) not null comment '學(xué)生小名',
  age int not null comment '學(xué)生年齡',
  class_num int not null comment '班級(jí)人數(shù)',
  phone bigint not null comment '電話號(hào)碼',
  email varchar(64) comment '家庭網(wǎng)絡(luò)郵箱',
  ip varchar(32) comment 'IP地址'
  ) engine=InnoDB default charset=utf8;
  • 2)flink讀取mysql binlog并寫入kafka
    (1)創(chuàng)建MySQL表
create table stu3_binlog(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop1',
  'port' = '3306',
  'username' = 'root',
  'password' = 'aaaaaa',
  'database-name' = 'test',
  'table-name' = 'stu3'
);

(2)創(chuàng)建Kafka表

create table stu3_binlog_sink_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'upsert-kafka'
  ,'topic' = 'cdc_mysql_stu3_sink'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'key.format' = 'json'
  ,'value.format' = 'json'
);

(3)將mysql binlog日志寫入kafka

insert into stu3_binlog_sink_kafka
select * from stu3_binlog;
  • 3)flink讀取kafka數(shù)據(jù)并寫入hudi數(shù)據(jù)湖
    (1)創(chuàng)建kafka源表
create table stu3_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_stu3_sink',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup'
  );

(2)創(chuàng)建hudi目標(biāo)表

create table stu3_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'insert',
  'write.precombine.field' = 'school'
  );

(3)將kafka數(shù)據(jù)寫入到hudi中

insert into stu3_binlog_sink_hudi
select * from  stu3_binlog_source_kafka;
  • 5)統(tǒng)計(jì)數(shù)據(jù)入Hudi情況
create table stu3_binlog_hudi_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school'
  );

select count(*) from stu3_binlog_hudi_view;  
  • 6)實(shí)時(shí)查看數(shù)據(jù)入湖情況
create table stu3_binlog_hudi_streaming_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school',
  'read.streaming.enabled' = 'true'
  );

 
select * from  stu3_binlog_hudi_streaming_view;

2.離線批量導(dǎo)入

如果存量數(shù)據(jù)來源于其他數(shù)據(jù)源,可以使用批量導(dǎo)入功能,快速將存量數(shù)據(jù)導(dǎo)成 Hoodie 表格式。

  • 1)原理
    (1)批量導(dǎo)入省去了 avro 的序列化以及數(shù)據(jù)的 merge 過程,后續(xù)不會(huì)再有去重操作,數(shù)據(jù)的唯一性需要自己來保證。
    (2)bulk_insert 需要在 Batch Execuiton Mode 下執(zhí)行更高效,Batch 模式默認(rèn)會(huì)按照 partition path 排序輸入消息再寫入 Hoodie,避免 file handle 頻繁切換導(dǎo)致性能下降。
SET execution.runtime-mode = batch; 
SET execution.checkpointing.interval = 0;

(3)bulk_insert write task 的并發(fā)通過參數(shù) write.tasks 指定,并發(fā)的數(shù)量會(huì)影響到小文件的數(shù)量,理論上,bulk_insert write task 的并發(fā)數(shù)就是劃分的 bucket 數(shù),當(dāng)然每個(gè) bucket 在寫到文件大小上限(parquet 120 MB)的時(shí)候會(huì) roll over 到新的文件句柄,所以最后:寫文件數(shù)量 >= bulk_insert write task 數(shù)。

  • 2)WITH 參數(shù)
名稱 Required 默認(rèn)值 說明
write.operation true upsert 配置 bulk_insert 開啟該功能
write.tasks false 4 bulk_insert 寫 task 的并發(fā),最后的文件數(shù) >=write.tasks
write.bulk_insert.shuffle_by_partition write.bulk_insert.shuffle_input(從 0.11 開始) false true 是否將數(shù)據(jù)按照 partition 字段 shuffle 再通過 write task 寫入,開啟該參數(shù)將減少小文件的數(shù)量 但是可能有數(shù)據(jù)傾斜風(fēng)險(xiǎn)
write.bulk_insert.sort_by_partition write.bulk_insert.sort_input(從 0.11 開始) false true 是否將數(shù)據(jù)線按照 partition 字段排序再寫入,當(dāng)一個(gè) write task 寫多個(gè) partition,開啟可以減少小文件數(shù)量
write.sort.memory 128 sort 算子的可用 managed memory(單位 MB)
  • 3)案例
    (1)MySQL建表
create database test;
use test;
create table stu4 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '學(xué)生名字',
  school varchar(20) not null comment '學(xué)校名字',
  nickname varchar(20) not null comment '學(xué)生小名',
  age int not null comment '學(xué)生年齡',
  score decimal(4,2) not null comment '成績',
  class_num int not null comment '班級(jí)人數(shù)',
  phone bigint not null comment '電話號(hào)碼',
  email varchar(64) comment '家庭網(wǎng)絡(luò)郵箱',
  ip varchar(32) comment 'IP地址'
  ) engine=InnoDB default charset=utf8;

(4)Flink SQL client 創(chuàng)建myql數(shù)據(jù)源

create table stu4(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop1:3306/test?serverTimezone=GMT%2B8',
  'username' = 'root',
  'password' = 'aaaaaa',
  'table-name' = 'stu4'
);

(5)Flink SQL client創(chuàng)建hudi表

create table stu4_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert',
  'write.precombine.field' = 'school'
  );

(3)Flink SQL client執(zhí)行mysql數(shù)據(jù)插入到hudi中

insert into stu4_sink_hudi select * from stu4;

3.全量接增量

如果已經(jīng)有全量的離線 Hoodie 表,需要接上實(shí)時(shí)寫入,并且保證數(shù)據(jù)不重復(fù),可以開啟 index bootstrap 功能。
如果覺得流程冗長,可以在寫入全量數(shù)據(jù)的時(shí)候資源調(diào)大直接走流模式寫,全量走完接新數(shù)據(jù)再將資源調(diào)?。ɑ蛘唛_啟限流功能)。

名稱 Required 默認(rèn)值 說明
index.bootstrap.enabled true false 開啟索引加載,會(huì)將已存表的最新數(shù)據(jù)一次性加載到 state 中
index.partition.regex false * 設(shè)置正則表達(dá)式進(jìn)行分區(qū)篩選,默認(rèn)為加載全部分區(qū)

使用流程
(1) CREATE TABLE 創(chuàng)建和 Hoodie 表對應(yīng)的語句,注意 table type 要正確
(2)設(shè)置 index.bootstrap.enabled = true開啟索引加載功能
(3)重啟任務(wù)將 index.bootstrap.enabled 關(guān)閉,參數(shù)配置到合適的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并發(fā)不同,可以重啟避免 shuffle

說明:
(1)索引加載為并發(fā)加載,根據(jù)數(shù)據(jù)量大小加載時(shí)間不同,可以在log中搜索
finish loading the index under partition 和 Load records from file 日志來觀察索引加載的進(jìn)度

四、寫入模式

1、Changelog模式

如果希望 Hoodie 保留消息的所有變更(I/-U/U/D),之后接上 Flink 引擎的有狀態(tài)計(jì)算實(shí)現(xiàn)全鏈路近實(shí)時(shí)數(shù)倉生產(chǎn)(增量計(jì)算),Hoodie 的 MOR 表通過行存原生支持保留消息的所有變更(format 層面的集成),通過流讀 MOR 表可以消費(fèi)到所有的變更記錄。

  • 1)WITH 參數(shù)
名稱 Required 默認(rèn)值 說明
changelog.enabled false false 默認(rèn)是關(guān)閉狀態(tài),即 UPSERT 語義,所有的消息僅保證最后一條合并消息,中間的變更可能會(huì)被 merge 掉;改成 true 支持消費(fèi)所有變更。

批(快照)讀仍然會(huì)合并所有的中間結(jié)果,不管 format 是否已存儲(chǔ)中間狀態(tài)。
開啟 changelog.enabled 參數(shù)后,中間的變更也只是 Best Effort: 異步的壓縮任務(wù)會(huì)將中間變更合并成 1 條,所以如果流讀消費(fèi)不夠及時(shí),被壓縮后只能讀到最后一條記錄。當(dāng)然,通過調(diào)整壓縮的 buffer 時(shí)間可以預(yù)留一定的時(shí)間 buffer 給 reader,比如調(diào)整壓縮的兩個(gè)參數(shù):

compaction.delta_commits:5 
compaction.delta_seconds: 3600

說明:
Changelog 模式開啟流讀的話,要在 sql-client 里面設(shè)置參數(shù):

set sql-client.execution.result-mode=tableau; 
或者
set sql-client.execution.result-mode=changelog;

2)流讀 changelog
僅在 0.10.0 支持,本 feature 為實(shí)驗(yàn)性。
開啟 changelog 模式后,hudi 會(huì)保留一段時(shí)間的 changelog 供下游 consumer 消費(fèi),我們可以通過流讀 ODS 層 changelog 接上 ETL 邏輯寫入到 DWD 層,如下圖的 pipeline:

流讀的時(shí)候我們要注意 changelog 有可能會(huì)被 compaction 合并掉,中間記錄會(huì)消除,可能會(huì)影響計(jì)算結(jié)果,需要關(guān)注sql-client的屬性(result-mode)同上。
3)案例演示
(1)使用changelog

set sql-client.execution.result-mode=tableau; 
CREATE TABLE t6(
  id int,
  ts int,
  primary key (id) not enforced
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4',
  'changelog.enabled' = 'true'
);

insert into t6 values (1,1);
insert into t6 values (1,2);

set table.dynamic-table-options.enabled=true;
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/;
select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

(2)不使用changelog

CREATE TABLE t6_v(
  id int,
  ts int,
  primary key (id) not enforced
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'
);


select * from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;
select count(*) from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;

2 Append 模式

從 0.10 開始支持
對于 INSERT 模式:
MOR 默認(rèn)會(huì) apply 小文件策略: 會(huì)追加寫 avro log 文件
COW 每次直接寫新的 parquet 文件,沒有小文件策略
Hudi 支持豐富的 Clustering 策略,優(yōu)化 INSERT 模式下的小文件問題:

  • 1)Inline Clustering
    只有 Copy On Write 表支持該模式
    【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】,大數(shù)據(jù),數(shù)據(jù)湖,hudi,大數(shù)據(jù),hadoop
  • 2) Async Clustering
    從 0.12 開始支持

六、Bucket索引

從 0.11 開始支持
默認(rèn)的 flink 流式寫入使用 state 存儲(chǔ)索引信息:primary key 到 fileId 的映射關(guān)系。當(dāng)數(shù)據(jù)量比較大的時(shí)候,state的存儲(chǔ)開銷可能成為瓶頸,bucket 索引通過固定的 hash 策略,將相同 key 的數(shù)據(jù)分配到同一個(gè) fileGroup 中,避免了索引的存儲(chǔ)和查詢開銷。

1)WITH參數(shù)

名稱 Required 默認(rèn)值 說明
index.type false FLINK_STATE 設(shè)置 BUCKET 開啟 Bucket 索引功能
hoodie.bucket.index.hash.field false 主鍵 可以設(shè)置成主鍵的子集
hoodie.bucket.index.num.buckets false 4 默認(rèn)每個(gè) partition 的 bucket 數(shù),當(dāng)前設(shè)置后則不可再變更。
2)和 state 索引的對比:
(1)bucket index 沒有 state 的存儲(chǔ)計(jì)算開銷,性能較好
(2)bucket index 無法擴(kuò) buckets,state index 則可以依據(jù)文件的大小動(dòng)態(tài)擴(kuò)容
(3)bucket index 不支持跨 partition 的變更(如果輸入是 cdc 流則沒有這個(gè)限制),state index 沒有限制

七、Hudi CataLog

從 0.12.0 開始支持,通過 catalog 可以管理 flink 創(chuàng)建的表,避免重復(fù)建表操作,另外 hms 模式的 catalog 支持自動(dòng)補(bǔ)全 hive 同步參數(shù)。
DFS 模式 Catalog SQL樣例:

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的默認(rèn)路徑}',
    'mode'='dfs' 
  );
Hms 模式 Catalog SQL 樣例:
CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的默認(rèn)路徑}',
    'hive.conf.dir' = '${hive-site.xml 所在的目錄}',
    'mode'='hms' -- 支持 'dfs' 模式通過文件系統(tǒng)管理表屬性
  );
  • 1)WITH 參數(shù)
名稱 Required 默認(rèn)值 說明
catalog.path true 默認(rèn)的 catalog 根路徑,用作表路徑的自動(dòng)推導(dǎo),默認(rèn)的表路徑: c a t a l o g . p a t h / {catalog.path}/ catalog.path/{db_name}/${table_name}
default-database false default 默認(rèn)的 database 名
hive.conf.dir false hive-site.xml 所在的目錄,只在 hms 模式下生效
mode false dfs 支持 hms模式通過 hive 管理元數(shù)據(jù)
table.external false false 是否創(chuàng)建外部表,只在 hms 模式下生效
  • 2)使用dfs方式
    (1)創(chuàng)建sql-client初始化sql文件
vim /opt/module/flink-1.13.6/conf/sql-client-init.sql

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '/tmp/hudi_catalog',
    'mode'='dfs' 
  );

USE CATALOG hoodie_catalog;

(2)指定sql-client啟動(dòng)時(shí)加載sql文件

hadoop fs -mkdir /tmp/hudi_catalog

bin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session

(3)建庫建表插入

create database test;
use test;

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20),
primary key (uuid) not enforced
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t2',
  'table.type' = 'MERGE_ON_READ'
);

insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a');

(4)退出sql-client,重新進(jìn)入,表信息還在

use test;
show tables;
select * from t2;

七、離線 Compaction

MOR 表的 compaction 默認(rèn)是自動(dòng)打開的,策略是 5 個(gè) commits 執(zhí)行一次壓縮。 因?yàn)閴嚎s操作比較耗費(fèi)內(nèi)存,和寫流程放在同一個(gè) pipeline,在數(shù)據(jù)量比較大的時(shí)候(10w+/s qps),容易干擾寫流程,此時(shí)采用離線定時(shí)任務(wù)的方式執(zhí)行 compaction 任務(wù)更穩(wěn)定。

  • 1 設(shè)置參數(shù)
compaction.async.enabled 為 false,關(guān)閉在線 compaction。
compaction.schedule.enabled 仍然保持開啟,由寫任務(wù)階段性觸發(fā)壓縮 plan。
  • 2 原理
    一個(gè) compaction 的任務(wù)的執(zhí)行包括兩部分:
    schedule 壓縮 plan
    該過程推薦由寫任務(wù)定時(shí)觸發(fā),寫參數(shù) compaction.schedule.enabled 默認(rèn)開啟
    執(zhí)行對應(yīng)的壓縮 plan

  • 3 使用方式
    1)執(zhí)行命令
    離線 compaction 需要手動(dòng)執(zhí)行 Java 程序,程序入口:

// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

2)案例演示
(1)創(chuàng)建表,關(guān)閉在線壓縮

create table t7(
  id int,
  ts int,
  primary key (id) not enforced
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t7',
  'compaction.async.enabled' = 'false',
  'compaction.schedule.enabled' = 'true',
  'table.type' = 'MERGE_ON_READ'
);


insert into t7 values(1,1);
insert into t7 values(2,2);
insert into t7 values(3,3);
insert into t7 values(4,4);
insert into t7 values(5,5);

// 命令行的方式

./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:8020/tmp/hudi_catalog/default/t7

八、離線 Clustering

異步的 clustering 相對于 online 的 async clustering 資源隔離,從而更加穩(wěn)定。

  • 1 設(shè)置參數(shù)
clustering.async.enabled 為 false,關(guān)閉在線 clustering。
clustering.schedule.enabled 仍然保持開啟,由寫任務(wù)階段性觸發(fā) clustering plan。
  • 2 原理
    一個(gè) clustering 的任務(wù)的執(zhí)行包括兩部分:

    • schedule plan
      推薦由寫任務(wù)定時(shí)觸發(fā),寫參數(shù) clustering.schedule.enabled 默認(rèn)開啟。
    • 執(zhí)行對應(yīng)的 plan
  • 3 使用方式
    1)執(zhí)行命令
    離線 clustering 需要手動(dòng)執(zhí)行 Java 程序,程序入口:

// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

注意:必須是分區(qū)表,否則報(bào)錯(cuò)空指針異常。文章來源地址http://www.zghlxwxcb.cn/news/detail-629384.html

2)案例演示
(1)創(chuàng)建表,關(guān)閉在線壓縮
create table t8(
  id int,
  age int,
  ts int,
  primary key (id) not enforced
) partitioned by (age)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t8',
  'clustering.async.enabled' = 'false',
  'clustering.schedule.enabled' = 'true',
  'table.type' = 'COPY_ON_WRITE'
);


insert into t8 values(1,18,1);
insert into t8 values(2,18,2);
insert into t8 values(3,18,3);
insert into t8 values(4,18,4);
insert into t8 values(5,18,5);
// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:8020/tmp/hudi_catalog/default/t8

到了這里,關(guān)于【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Hudi集成Flink

    Hudi集成Flink

    安裝Maven 1)上傳apache-maven-3.6.3-bin.tar.gz到/opt/software目錄,并解壓更名 tar -zxvf apache-maven-3.6. 3 -bin.tar.gz -C /opt/module/ mv ? apache -maven-3.6. 3 ?maven 2)添加環(huán)境變量到/etc/profile中 sudo ?vim /etc/profile #MAVEN_HOME export MAVEN_HOME=/opt/module/maven export PATH=$PATH:$MAVEN_HOME/bin 3)測試安裝結(jié)果 sourc

    2023年04月13日
    瀏覽(23)
  • Hudi(四)集成Flink(2)

    Hudi(四)集成Flink(2)

    ????????當(dāng)前表 默認(rèn)是快照讀取 ,即讀取最新的全量快照數(shù)據(jù)并一次性返回。通過參數(shù) read.streaming.enabled 參數(shù)開啟流讀模式,通過 read.start-commit 參數(shù)指定起始消費(fèi)位置,支持指定 earliest 從最早消費(fèi)。 1、WITH參數(shù) 名稱 Required 默認(rèn)值 說明 read.streaming.enabled false false 設(shè)置

    2024年02月07日
    瀏覽(27)
  • Hudi(19):Hudi集成Flink之索引和Catalog

    目錄 0. 相關(guān)文章鏈接 1. Bucket索引(從 0.11 開始支持) 1.1.?WITH參數(shù) 1.2.?和 state 索引的對比 2.?Hudi Catalog(從 0.12.0 開始支持) 2.1. 概述 2.2.?WITH 參數(shù) 2.3.?使用dfs方式 ?Hudi文章匯總? ????????默認(rèn)的 flink 流式寫入使用 state 存儲(chǔ)索引信息:primary key 到 fileId 的映射關(guān)系。當(dāng)

    2024年02月05日
    瀏覽(24)
  • 基于數(shù)據(jù)湖的流批一體:flink1.15.3與Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    基于數(shù)據(jù)湖的流批一體:flink1.15.3與Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    前言:為實(shí)現(xiàn)基于數(shù)據(jù)湖的流批一體,采用業(yè)內(nèi)主流技術(shù)棧hudi、flink、CDH(hive、spark)。flink使用sql client與hive的catalog打通,可以與hive共享元數(shù)據(jù),使用sql client可操作hive中的表,實(shí)現(xiàn)批流一體;flink與hudi集成可以實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)入湖;hudi與hive集成可以實(shí)現(xiàn)湖倉一體,用flink實(shí)

    2024年02月12日
    瀏覽(26)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目錄 0. 相關(guān)文章鏈接 1.?創(chuàng)建表 1.1.?啟動(dòng)spark-sql 1.2.?建表參數(shù) 1.3.?創(chuàng)建非分區(qū)表 1.4.?創(chuàng)建分區(qū)表 1.5.?在已有的hudi表上創(chuàng)建新表 1.6.?通過CTAS (Create Table As Select)建表 2.?插入數(shù)據(jù) 2.1.?向非分區(qū)表插入數(shù)據(jù) 2.2.?向分區(qū)表動(dòng)態(tài)分區(qū)插入數(shù)據(jù) 2.3.?向分區(qū)表靜態(tài)分區(qū)插入數(shù)據(jù) 2.4

    2024年02月06日
    瀏覽(20)
  • Hudi-集成Spark之spark-sql方式

    啟動(dòng)spark-sql 創(chuàng)建表 建表參數(shù): 參數(shù)名 默認(rèn)值 說明 primaryKey uuid 表的主鍵名,多個(gè)字段用逗號(hào)分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的預(yù)合并字段。同 hoodie.datasource.write.precombine.field type cow 創(chuàng)建的表類型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    瀏覽(23)
  • Hudi0.14.0 集成 Spark3.2.3(IDEA編碼方式)

    本次在IDEA下使用Scala語言進(jìn)行開發(fā),具體環(huán)境搭建查看文章 IDEA 下 Scala Maven 開發(fā)環(huán)境搭建。 1.1 添加maven依賴 創(chuàng)建Maven工程,pom文件:

    2024年01月24日
    瀏覽(22)
  • Flink讀取數(shù)據(jù)的5種方式(文件,Socket,Kafka,MySQL,自定義數(shù)據(jù)源)

    這是最簡單的數(shù)據(jù)讀取方式。當(dāng)需要進(jìn)行功能測試時(shí),可以將數(shù)據(jù)保存在文件中,讀取后驗(yàn)證流處理的邏輯是否符合預(yù)期。 程序代碼: 輸出結(jié)果 用于驗(yàn)證一些通過Socket傳輸數(shù)據(jù)的場景非常方便。 程序代碼: 測試時(shí),需要先在 172.16.3.6 的服務(wù)器上啟動(dòng) nc ,然后再啟動(dòng)Flink讀

    2024年02月16日
    瀏覽(21)
  • Hudi0.14.0集成Spark3.2.3(Spark Shell方式)

    1.1 啟動(dòng)Spark Shell

    2024年01月24日
    瀏覽(16)
  • Apache Hudi初探(三)(與flink的結(jié)合)--flink寫hudi的操作(真正的寫數(shù)據(jù))

    在之前的文章中Apache Hudi初探(二)(與flink的結(jié)合)–flink寫hudi的操作(JobManager端的提交操作) 有說到寫hudi數(shù)據(jù)會(huì)涉及到 寫hudi真實(shí)數(shù)據(jù) 以及 寫hudi元數(shù)據(jù) ,這篇文章來說一下具體的實(shí)現(xiàn) 這里的操作就是在 HoodieFlinkWriteClient.upsert 方法: initTable 初始化HoodieFlinkTable preWrite 在這里幾乎沒

    2024年02月10日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包