国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用

這篇具有很好參考價(jià)值的文章主要介紹了流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

2.9 進(jìn)階使用

2.9.1 寫入性能

Paimon的寫入性能與檢查點(diǎn)密切相關(guān),因此需要更大的寫入吞吐量:

增加檢查點(diǎn)間隔,或者僅使用批處理模式。

增加寫入緩沖區(qū)大小。

啟用寫緩沖區(qū)溢出。

如果您使用固定存儲(chǔ)桶模式,請(qǐng)重新調(diào)整存儲(chǔ)桶數(shù)量。

2.9.1.1 并行度

建議sink的并行度小于等于bucket的數(shù)量,最好相等。

選項(xiàng) 必需的 默認(rèn) 類型 描述
sink.parallelism No (none) Integer 定義sink的并行度。默認(rèn)情況下,并行度由框架使用上游鏈?zhǔn)竭\(yùn)算符的相同并行度來(lái)確定。

2.9.1.2 Compaction

當(dāng)Sorted Run數(shù)量較少時(shí),Paimon writer 將在單獨(dú)的線程中異步執(zhí)行壓縮,因此記錄可以連續(xù)寫入表中。然而,為了避免Sorted Runs的無(wú)限增長(zhǎng),當(dāng)Sorted Run的數(shù)量達(dá)到閾值時(shí),writer將不得不暫停寫入。下表屬性確定閾值。

選項(xiàng) 必需的 默認(rèn) 類型 描述
num-sorted-run.stop-trigger No (none) Integer 觸發(fā)停止寫入的Sorted Runs次數(shù),默認(rèn)值為 ‘num-sorted-run.compaction-trigger’ + 1。

當(dāng) num-sorted-run.stop-trigger 變大時(shí),寫入停頓將變得不那么頻繁,從而提高寫入性能。但是,如果該值變得太大,則查詢表時(shí)將需要更多內(nèi)存和 CPU 時(shí)間。如果您擔(dān)心內(nèi)存 OOM,請(qǐng)配置sort-spill-threshold。它的值取決于你的內(nèi)存大小。

2.9.1.3 優(yōu)先考慮寫入吞吐量

如果希望某種模式具有最大寫入吞吐量,則可以緩慢而不是匆忙地進(jìn)行Compaction??梢詫?duì)表使用以下策略

num-sorted-run.stop-trigger = 2147483647

sort-spill-threshold = 10

此配置將在寫入高峰期生成更多文件,并在寫入低谷期逐漸合并到最佳讀取性能。

2.9.1.4 觸發(fā)Compaction的Sorted Run數(shù)

Paimon使用LSM樹,支持大量更新。 LSM 在多次Sorted Runs中組織文件。從 LSM 樹查詢記錄時(shí),必須組合所有Sorted Runs以生成所有記錄的完整視圖。

過(guò)多的Sorted Run會(huì)導(dǎo)致查詢性能不佳。為了將Sorted Run的數(shù)量保持在合理的范圍內(nèi),Paimon writers 將自動(dòng)執(zhí)行Compaction。下表屬性確定觸發(fā)Compaction的最小Sorted Run數(shù)。

選項(xiàng) 必需的 默認(rèn) 類型 描述
num-sorted-run.compaction-trigger No 5 Integer 觸發(fā)Compaction的Sorted Run數(shù)。包括 0 級(jí)文件(一個(gè)文件一級(jí)排序運(yùn)行)和高級(jí)運(yùn)行(一個(gè)一級(jí)排序運(yùn)行)。

2.9.1.5 寫入初始化

在write初始化時(shí),bucket的writer需要讀取所有歷史文件。如果這里出現(xiàn)瓶頸(例如同時(shí)寫入大量分區(qū)),可以使用write-manifest-cache緩存讀取的manifest數(shù)據(jù),以加速初始化。

2.9.1.6 內(nèi)存

Paimon writer中主要占用內(nèi)存的地方有3個(gè):

Writer的內(nèi)存緩沖區(qū),由單個(gè)任務(wù)的所有Writer共享和搶占。該內(nèi)存值可以通過(guò) write-buffer-size 表屬性進(jìn)行調(diào)整。

合并多個(gè)Sorted Run以進(jìn)行Compaction時(shí)會(huì)消耗內(nèi)存。可以通過(guò) num-sorted-run.compaction-trigger 選項(xiàng)進(jìn)行調(diào)整,以更改要合并的Sorted Run的數(shù)量。

如果行非常大,在進(jìn)行Compaction時(shí)一次讀取太多行數(shù)據(jù)可能會(huì)消耗大量?jī)?nèi)存。減少 read.batch-size 選項(xiàng)可以減輕這種情況的影響。

寫入列式(ORC、Parquet等)文件所消耗的內(nèi)存,不可調(diào)。

2.9.2 讀取性能

2.9.2.1 Full Compaction

配置“full-compaction.delta-commits”在Flink寫入中定期執(zhí)行full-compaction。并且可以確保在寫入結(jié)束之前分區(qū)被完全Compaction。

注意:Paimon 默認(rèn)處理小文件并提供良好的讀取性能。請(qǐng)不要在沒(méi)有任何要求的情況下配置此Full Compaction選項(xiàng),因?yàn)樗鼤?huì)對(duì)性能產(chǎn)生重大影響。

2.9.2.2 主鍵表

對(duì)于主鍵表來(lái)說(shuō),這是一種“MergeOnRead”技術(shù)。讀取數(shù)據(jù)時(shí),會(huì)合并多層LSM數(shù)據(jù),并行數(shù)會(huì)受到桶數(shù)的限制。雖然Paimon的merge會(huì)高效,但是還是趕不上普通的AppendOnly表。

如果你想在某些場(chǎng)景下查詢得足夠快,但只能找到較舊的數(shù)據(jù),你可以:

配置full-compaction.delta-commits,寫入數(shù)據(jù)時(shí)(目前只有Flink)會(huì)定期進(jìn)行full Compaction。

配置“scan.mode”為“compacted-full”,讀取數(shù)據(jù)時(shí),選擇full-compaction的快照。讀取性能良好。

2.9.2.3 僅追加表

小文件會(huì)降低讀取速度并影響 DFS 穩(wěn)定性。默認(rèn)情況下,當(dāng)單個(gè)存儲(chǔ)桶中的小文件超過(guò)“compaction.max.file-num”(默認(rèn)50個(gè))時(shí),就會(huì)觸發(fā)compaction。但是當(dāng)有多個(gè)桶時(shí),就會(huì)產(chǎn)生很多小文件。

您可以使用full-compaction來(lái)減少小文件。full-compaction將消除大多數(shù)小文件。

2.9.2.4 格式

Paimon 對(duì) parquet 讀取進(jìn)行了一些查詢優(yōu)化,因此 parquet 會(huì)比 orc 稍快一些。

2.9.3 多Writer并發(fā)寫入

Paimon的快照管理支持向多個(gè)writer寫入。

默認(rèn)情況下,Paimon支持對(duì)不同分區(qū)的并發(fā)寫入。推薦的方式是streaming job將記錄寫入Paimon的最新分區(qū);同時(shí)批處理作業(yè)(覆蓋)將記錄寫入歷史分區(qū)。

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

如果需要多個(gè)Writer寫到同一個(gè)分區(qū),事情就會(huì)變得有點(diǎn)復(fù)雜。例如,不想使用 UNION ALL,那就需要有多個(gè)流作業(yè)來(lái)寫入“partial-update”表。參考如下的“Dedicated Compaction Job”。

2.9.3.1 Dedicated Compaction Job

默認(rèn)情況下,Paimon writer 在寫入記錄時(shí)會(huì)根據(jù)需要執(zhí)行Compaction。這對(duì)于大多數(shù)用例來(lái)說(shuō)已經(jīng)足夠了,但有兩個(gè)缺點(diǎn):

這可能會(huì)導(dǎo)致寫入吞吐量不穩(wěn)定,因?yàn)閳?zhí)行壓縮時(shí)吞吐量可能會(huì)暫時(shí)下降。

Compaction會(huì)將某些數(shù)據(jù)文件標(biāo)記為“已刪除”(并未真正刪除)。如果多個(gè)writer標(biāo)記同一個(gè)文件,則在提交更改時(shí)會(huì)發(fā)生沖突。 Paimon 會(huì)自動(dòng)解決沖突,但這可能會(huì)導(dǎo)致作業(yè)重新啟動(dòng)。

為了避免這些缺點(diǎn),用戶還可以選擇在writer中跳過(guò)Compaction,并僅運(yùn)行專門的作業(yè)來(lái)進(jìn)行Compaction。由于Compaction僅由專用作業(yè)執(zhí)行,因此writer可以連續(xù)寫入記錄而無(wú)需暫停,并且不會(huì)發(fā)生沖突。

選項(xiàng) 必需的 默認(rèn) 類型 描述
write-only No false Boolean 如果設(shè)置為 true,將跳過(guò)Compaction和快照過(guò)期。此選項(xiàng)與獨(dú)立Compaction一起使用。

Flink SQL目前不支持compaction相關(guān)的語(yǔ)句,所以我們必須通過(guò)flink run來(lái)提交compaction作業(yè)。

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

compact \

–warehouse \

–database \

–table \

[–partition ] \

[–catalog-conf [–catalog-conf …]] \

如果提交一個(gè)批處理作業(yè)(execution.runtime-mode:batch),當(dāng)前所有的表文件都會(huì)被Compaction。如果您提交一個(gè)流作業(yè)(execution.runtime-mode: Streaming),該作業(yè)將持續(xù)監(jiān)視表的新更改并根據(jù)需要執(zhí)行Compaction。

2.9.4 表管理

2.9.4.1 管理快照

1)快照過(guò)期

Paimon Writer每次提交都會(huì)生成一個(gè)或兩個(gè)快照。每個(gè)快照可能會(huì)添加一些新的數(shù)據(jù)文件或?qū)⒁恍┡f的數(shù)據(jù)文件標(biāo)記為已刪除。然而,標(biāo)記的數(shù)據(jù)文件并沒(méi)有真正被刪除,因?yàn)镻aimon還支持時(shí)間旅行到更早的快照。它們僅在快照過(guò)期時(shí)被刪除。

目前,Paimon Writer在提交新更改時(shí)會(huì)自動(dòng)執(zhí)行過(guò)期操作。通過(guò)使舊快照過(guò)期,可以刪除不再使用的舊數(shù)據(jù)文件和元數(shù)據(jù)文件,以釋放磁盤空間。

設(shè)置以下表屬性:

選項(xiàng) 必需的 默認(rèn) 類型 描述
snapshot.time-retained No 1 h Duration 已完成快照的最長(zhǎng)時(shí)間保留。
snapshot.num-retained.min No 10 Integer 要保留的已完成快照的最小數(shù)量。
snapshot.num-retained.max No Integer.MAX_VALUE Integer 要保留的已完成快照的最大數(shù)量。

注意,保留時(shí)間太短或保留數(shù)量太少可能會(huì)導(dǎo)致如下問(wèn)題:

批量查詢找不到該文件。例如,表比較大,批量查詢需要10分鐘才能讀取,但是10分鐘前的快照過(guò)期了,此時(shí)批量查詢會(huì)讀取到已刪除的快照。

表文件上的流式讀取作業(yè)(沒(méi)有外部日志系統(tǒng))無(wú)法重新啟動(dòng)。當(dāng)作業(yè)重新啟動(dòng)時(shí),它記錄的快照可能已過(guò)期。 (可以使用Consumer Id來(lái)保護(hù)快照過(guò)期的小保留時(shí)間內(nèi)的流式讀取)。

2)回滾快照

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

rollback-to \

–warehouse \

–database \

–table \

–snapshot \

[–catalog-conf [–catalog-conf …]]

2.9.4.2 管理分區(qū)

創(chuàng)建分區(qū)表時(shí)可以設(shè)置partition.expiration-time。 Paimon會(huì)定期檢查分區(qū)的狀態(tài),并根據(jù)時(shí)間刪除過(guò)期的分區(qū)。

判斷分區(qū)是否過(guò)期:將分區(qū)中提取的時(shí)間與當(dāng)前時(shí)間進(jìn)行比較,看生存時(shí)間是否超過(guò)partition.expiration-time。比如:

CREATE TABLE T (…) PARTITIONED BY (dt) WITH (

‘partition.expiration-time’ = ‘7 d’,

‘partition.expiration-check-interval’ = ‘1 d’,

‘partition.timestamp-formatter’ = ‘yyyyMMdd’

);

選項(xiàng) 默認(rèn) 類型 描述
partition.expiration-check-interval 1 h Duration 分區(qū)過(guò)期的檢查間隔。
partition.expiration-time (none) Duration 分區(qū)的過(guò)期時(shí)間間隔。如果分區(qū)的生命周期超過(guò)此值,則該分區(qū)將過(guò)期。分區(qū)時(shí)間是從分區(qū)值中提取的。
partition.timestamp-formatter (none) String 用于格式化字符串時(shí)間戳的格式化程序。它可以與“partition.timestamp-pattern”一起使用來(lái)創(chuàng)建使用指定值的格式化程序。> 默認(rèn)格式化程序?yàn)椤皔yyy-MM-dd HH:mm:ss”和“yyyy-MM-dd”。> 支持多個(gè)分區(qū)字段,例如“ y e a r ? year- year?month-$day $hour:00:00”。> 時(shí)間戳格式化程序與 Java 的 DateTimeFormatter 兼容。
partition.timestamp-pattern (none) String 可以指定一種模式來(lái)從分區(qū)獲取時(shí)間戳。格式化程序模式由“partition.timestamp-formatter”定義。> 默認(rèn)情況下,從第一個(gè)字段讀取。> 如果分區(qū)中的時(shí)間戳是名為“dt”的單個(gè)字段,則可以使用“ d t ”。 > 如果它分布在年、月、日和小時(shí)的多個(gè)字段中,則可以使用“ dt”。> 如果它分布在年、月、日和小時(shí)的多個(gè)字段中,則可以使用“ dt。>如果它分布在年、月、日和小時(shí)的多個(gè)字段中,則可以使用year- m o n t h ? month- month?day h o u r : 00 : 00 ”。 > 如果時(shí)間戳位于 d t 和 h o u r 字段中,則可以使用“ hour:00:00”。> 如果時(shí)間戳位于 dt 和 hour 字段中,則可以使用“ hour:00:00”。>如果時(shí)間戳位于dthour字段中,則可以使用dt $hour:00:00”。

2.9.4.3 管理小文件

小文件可能會(huì)導(dǎo)致:

穩(wěn)定性問(wèn)題:HDFS中小文件過(guò)多,NameNode會(huì)承受過(guò)大的壓力。

成本問(wèn)題:HDFS中的小文件會(huì)暫時(shí)使用最小1個(gè)Block的大小,例如128MB。

查詢效率:小文件過(guò)多查詢效率會(huì)受到影響。

1)Flink Checkpoint的影響

使用Flink Writer,每個(gè)checkpoint會(huì)生成 1-2 個(gè)快照,并且checkpoint會(huì)強(qiáng)制在 DFS 上生成文件,因此checkpoint間隔越小,會(huì)生成越多的小文件。

默認(rèn)情況下,不僅checkpoint會(huì)導(dǎo)致文件生成,writer的內(nèi)存(write-buffer-size)耗盡也會(huì)將數(shù)據(jù)flush到DFS并生成相應(yīng)的文件??梢詥⒂?write-buffer-spillable 在 writer 中生成溢出文件,從而在 DFS 中生成更大的文件。

所以,可以設(shè)置如下:

增大checkpoint間隔

增加 write-buffer-size 或啟用 write-buffer-spillable

2)快照的影響

Paimon維護(hù)文件的多個(gè)版本,文件的Compaction和刪除是邏輯上的,并沒(méi)有真正刪除文件。文件只有在 Snapshot 過(guò)期后才會(huì)被真正刪除,因此減少文件的第一個(gè)方法就是減少 Snapshot 過(guò)期的時(shí)間。 Flink writer 會(huì)自動(dòng)使快照過(guò)期。

分區(qū)和分桶的影響

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

表數(shù)據(jù)會(huì)被物理分片到不同的分區(qū),里面有不同的桶,所以如果整體數(shù)據(jù)量太小,單個(gè)桶中至少有一個(gè)文件,建議你配置較少的桶數(shù),否則會(huì)出現(xiàn)也有很多小文件。

3)主鍵表LSM的影響

LSM 樹將文件組織成Sorted Runs的運(yùn)行。Sorted Runs由一個(gè)或多個(gè)數(shù)據(jù)文件組成,并且每個(gè)數(shù)據(jù)文件恰好屬于一個(gè)Sorted Runs。

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

默認(rèn)情況下,Sorted Runs數(shù)取決于 num-sorted-run.compaction-trigger,這意味著一個(gè)桶中至少有 5 個(gè)文件。如果要減少此數(shù)量,可以保留更少的文件,但寫入性能可能會(huì)受到影響。

4)僅追加表的文件的影響

默認(rèn)情況下,Append-Only 還會(huì)進(jìn)行自動(dòng)Compaction以減少小文件的數(shù)量

對(duì)于分桶的 Append-only 表,為了排序會(huì)對(duì)bucket內(nèi)的文件行Compaction,可能會(huì)保留更多的小文件。

5)Full-Compaction的影響

主鍵表是5個(gè)文件,但是Append-Only表(桶)可能單個(gè)桶里有50個(gè)小文件,這是很難接受的。更糟糕的是,不再活動(dòng)的分區(qū)還保留了如此多的小文件。

建議配置Full-Compaction,在Flink寫入時(shí)配置‘full-compaction.delta-commits’定期進(jìn)行full-compaction。并且可以確保在寫入結(jié)束之前分區(qū)被full-compaction。

2.9.5 縮放Bucket

1)說(shuō)明

由于總桶數(shù)對(duì)性能影響很大,Paimon 允許用戶通過(guò) ALTER TABLE 命令調(diào)整桶數(shù),并通過(guò) INSERT OVERWRITE 重新組織數(shù)據(jù)布局,而無(wú)需重新創(chuàng)建表/分區(qū)。當(dāng)執(zhí)行覆蓋作業(yè)時(shí),框架會(huì)自動(dòng)掃描舊桶號(hào)的數(shù)據(jù),并根據(jù)當(dāng)前桶號(hào)對(duì)記錄進(jìn)行哈希處理。

– rescale number of total buckets

ALTER TABLE table_identifier SET (‘bucket’ = ‘…’)

– reorganize data layout of table/partition

INSERT OVERWRITE table_identifier [PARTITION (part_spec)]

SELECT …

FROM table_identifier

[WHERE part_spec]

注意:

ALTER TABLE 僅修改表的元數(shù)據(jù),不會(huì)重新組織或重新格式化現(xiàn)有數(shù)據(jù)。重新組織現(xiàn)有數(shù)據(jù)必須通過(guò)INSERT OVERWRITE來(lái)實(shí)現(xiàn)。

重新縮放桶數(shù)不會(huì)影響讀取和正在運(yùn)行的寫入作業(yè)。

一旦存儲(chǔ)桶編號(hào)更改,任何新安排的 INSERT INTO 作業(yè)寫入未重新組織的現(xiàn)有表/分區(qū)將拋出 TableException ,并顯示如下類似異常:

Try to write table/partition … with a new bucket num …,

but the previous bucket num is … Please switch to batch mode,

and perform INSERT OVERWRITE to rescale current data layout first.

對(duì)于分區(qū)表,不同的分區(qū)可以有不同的桶號(hào)。例如:

ALTER TABLE my_table SET (‘bucket’ = ‘4’);

INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-01’)

SELECT * FROM …;

ALTER TABLE my_table SET (‘bucket’ = ‘8’);

INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-02’)

SELECT * FROM …;

在覆蓋期間,確保沒(méi)有其他作業(yè)寫入同一表/分區(qū)。

注意:對(duì)于啟用日志系統(tǒng)的表(例如Kafka),請(qǐng)重新調(diào)整主題的分區(qū)以保持一致性。

重新縮放存儲(chǔ)桶有助于處理吞吐量的突然峰值。假設(shè)有一個(gè)每日流式ETL任務(wù)來(lái)同步交易數(shù)據(jù)。該表的DDL和管道如下所示。

2)官方示例:

? 如下是正在跑的一個(gè)作業(yè):

– 建表

CREATE TABLE verified_orders (

trade_order_id BIGINT,

item_id BIGINT,

item_price DOUBLE,

dt STRING,

PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED

) PARTITIONED BY (dt)

WITH (

‘bucket’ = ‘16’

);

– kafka表

CREATE temporary TABLE raw_orders(

trade_order_id BIGINT,

item_id BIGINT,

item_price BIGINT,

gmt_create STRING,

order_status STRING

) WITH (

‘connector’ = ‘kafka’,

‘topic’ = ‘…’,

‘properties.bootstrap.servers’ = ‘…’,

‘format’ = ‘csv’

);

– 流式插入16個(gè)分桶

INSERT INTO verified_orders

SELECT trade_order_id,

? item_id,

? item_price,

? DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt

FROM raw_orders

WHERE order_status = ‘verified’;

過(guò)去幾周運(yùn)行良好。然而,最近數(shù)據(jù)量增長(zhǎng)很快,作業(yè)的延遲不斷增加。為了提高數(shù)據(jù)新鮮度,用戶可以執(zhí)行如下操作縮放分桶:

(1)使用保存點(diǎn)暫停流作業(yè)

$ ./bin/flink stop \

–savepointPath /tmp/flink-savepoints \

$JOB_ID

(2)增加桶數(shù)

ALTER TABLE verified_orders SET (‘bucket’ = ‘32’);

(3)切換到批處理模式并覆蓋流作業(yè)正在寫入的當(dāng)前分區(qū)

SET ‘execution.runtime-mode’ = ‘batch’;

– 假設(shè)今天是2022-06-22

– 情況1:沒(méi)有更新歷史分區(qū)的延遲事件,因此覆蓋今天的分區(qū)就足夠了

INSERT OVERWRITE verified_orders PARTITION (dt = ‘2022-06-22’)

SELECT trade_order_id,

? item_id,

? item_price

FROM verified_orders

WHERE dt = ‘2022-06-22’;

- 情況2:有更新歷史分區(qū)的延遲事件,但范圍不超過(guò)3天

INSERT OVERWRITE verified_orders

SELECT trade_order_id,

? item_id,

? item_price,

? dt

FROM verified_orders

WHERE dt IN (‘2022-06-20’, ‘2022-06-21’, ‘2022-06-22’);

(4)覆蓋作業(yè)完成后,切換回流模式,從保存點(diǎn)恢復(fù)(可以增加并行度=新bucket數(shù)量)。

SET ‘execution.runtime-mode’ = ‘streaming’;

SET ‘execution.savepoint.path’ = ;

INSERT INTO verified_orders

SELECT trade_order_id,

item_id,

item_price,

DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt

FROM raw_orders

WHERE order_status = ‘verified’;

2.10 文件操作理解

2.10.1 插入數(shù)據(jù)

當(dāng)我們執(zhí)行INSERT INTO

CREATE CATALOG paimon WITH (

‘type’ = ‘paimon’,

‘warehouse’ = ‘file:///tmp/paimon’

);

USE CATALOG paimon;

CREATE TABLE T (

id BIGINT,

a INT,

b STRING,

dt STRING COMMENT ‘timestamp string in format yyyyMMdd’,

PRIMARY KEY(id, dt) NOT ENFORCED

) PARTITIONED BY (dt);

INSERT INTO T VALUES (1, 10001, ‘varchar00001’, ‘20230501’);

一旦Flink作業(yè)完成,記錄就會(huì)通過(guò)成功提交寫入Paimon表中。用戶可以通過(guò)執(zhí)行查詢 SELECT * FROM T 來(lái)驗(yàn)證這些記錄的可見性,該查詢將返回單行。提交過(guò)程創(chuàng)建位于路徑 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。 snapshot-1 處生成的文件布局如下所述:

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

snapshot-1 的內(nèi)容包含快照的元數(shù)據(jù),例如清單列表(manifest list)和schema ID:

{

“version” : 3,

“id” : 1,

“schemaId” : 0,

“baseManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0”,

“deltaManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1”,

“changelogManifestList” : null,

“commitUser” : “7d758485-981d-4b1a-a0c6-d34c3eb254bf”,

“commitIdentifier” : 9223372036854775807,

“commitKind” : “APPEND”,

“timeMillis” : 1684155393354,

“l(fā)ogOffsets” : { },

“totalRecordCount” : 1,

“deltaRecordCount” : 1,

“changelogRecordCount” : 0,

“watermark” : -9223372036854775808

}

清單列表包含快照的所有更改,baseManifestList 是應(yīng)用 deltaManifestList 中的更改的基礎(chǔ)文件。第一次提交將生成 1 個(gè)清單文件(manifest file),并創(chuàng)建 2 個(gè)清單列表(manifest list):

./T/manifest:

–deltaManifestList:包含對(duì)數(shù)據(jù)文件執(zhí)行操作的清單條目列表(上圖中的 manifest-list-1-delta)

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1

?

–baseManifestList:空的(上圖中的 manifest-list-1-base)

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0

–清單文件:存儲(chǔ)快照中數(shù)據(jù)文件的信息(上圖中的manifest-1-0)

manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0

跨不同分區(qū)插入一批記錄:

INSERT INTO T VALUES

(2, 10002, ‘varchar00002’, ‘20230502’),

(3, 10003, ‘varchar00003’, ‘20230503’),

(4, 10004, ‘varchar00004’, ‘20230504’),

(5, 10005, ‘varchar00005’, ‘20230505’),

(6, 10006, ‘varchar00006’, ‘20230506’),

(7, 10007, ‘varchar00007’, ‘20230507’),

(8, 10008, ‘varchar00008’, ‘20230508’),

(9, 10009, ‘varchar00009’, ‘20230509’),

(10, 10010, ‘varchar00010’, ‘20230510’);

第二次提交發(fā)生,執(zhí)行 SELECT * FROM T 將返回 10 行。創(chuàng)建一個(gè)新快照,即 snapshot-2,并為我們提供以下物理文件布局:

% ls -atR .

./T:

dt=20230501

dt=20230502

dt=20230503

dt=20230504

dt=20230505

dt=20230506

dt=20230507

dt=20230508

dt=20230509

dt=20230510

snapshot

schema

manifest

./T/snapshot:

LATEST

snapshot-2

EARLIEST

snapshot-1

./T/manifest:

manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # delta manifest list for snapshot-2

manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # base manifest list for snapshot-2

manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0 # manifest file for snapshot-2

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # delta manifest list for snapshot-1

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # base manifest list for snapshot-1

manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 # manifest file for snapshot-1

./T/dt=20230501/bucket-0:

data-b75b7381-7c8b-430f-b7e5-a204cb65843c-0.orc

# each partition has the data written to bucket-0

./T/schema:

schema-0

截至 snapshot-2 的新文件布局如下所示:

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

2.10.2 刪除數(shù)據(jù)

執(zhí)行如下刪除:

DELETE FROM T WHERE dt >= ‘20230503’;

第三次提交發(fā)生,它為我們提供了 snapshot-3?,F(xiàn)在,列出表下的文件,您會(huì)發(fā)現(xiàn)沒(méi)有分區(qū)被刪除。相反,會(huì)為分區(qū) 20230503 到 20230510 創(chuàng)建一個(gè)新的數(shù)據(jù)文件:

./T/dt=20230510/bucket-0:

data-b93f468c-b56f-4a93-adc4-b250b3aa3462-0.orc # newer data file created by the delete statement

data-0fcacc70-a0cb-4976-8c88-73e92769a762-0.orc # older data file created by the insert statement

因?yàn)槲覀冊(cè)诘诙翁峤恢胁迦胍粭l記錄(由 +I[10, 10010, ‘varchar00010’, ‘20230510’] 表示),然后在第三次提交中刪除該記錄。執(zhí)行 SELECT * FROM T 將返回 2 行,即:

+I[1, 10001, ‘varchar00001’, ‘20230501’]

+I[2, 10002, ‘varchar00002’, ‘20230502’]

截至 snapshot-3 的新文件布局如下所示

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

manifest-3-0包含8個(gè)ADD操作類型的manifest條目,對(duì)應(yīng)8個(gè)新寫入的數(shù)據(jù)文件。

2.10.3 Compaction

小文件的數(shù)量會(huì)隨著連續(xù)快照的增加而增加,這可能會(huì)導(dǎo)致讀取性能下降。因此,需要進(jìn)行full compaction以減少小文件的數(shù)量。

現(xiàn)在觸發(fā)full-compaction:

./bin/flink run \

./lib/paimon-flink-action-0.5-SNAPSHOT.jar \

compact \

–path file:///tmp/paimon/default.db/T

所有當(dāng)前表文件將被壓縮,并創(chuàng)建一個(gè)新快照,即 snapshot-4,并包含以下信息:

{

“version” : 3,

“id” : 4,

“schemaId” : 0,

“baseManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-0”,

“deltaManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-1”,

“changelogManifestList” : null,

“commitUser” : “a3d951d5-aa0e-4071-a5d4-4c72a4233d48”,

“commitIdentifier” : 9223372036854775807,

“commitKind” : “COMPACT”,

“timeMillis” : 1684163217960,

“l(fā)ogOffsets” : { },

“totalRecordCount” : 38,

“deltaRecordCount” : 20,

“changelogRecordCount” : 0,

“watermark” : -9223372036854775808

}

截至 snapshot-4 的新文件布局如下所示

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

manifest-4-0 包含 20 個(gè)清單條目(18 個(gè) DELETE 操作和 2 個(gè) ADD 操作):

對(duì)于分區(qū)20230503到20230510,對(duì)兩個(gè)數(shù)據(jù)文件進(jìn)行兩次DELETE操作

對(duì)于分區(qū)20230501到20230502,對(duì)同一個(gè)數(shù)據(jù)文件進(jìn)行1次DELETE操作和1次ADD操作。

2.10.4 修改表

執(zhí)行以下語(yǔ)句來(lái)配置full-compaction:

ALTER TABLE T SET (‘full-compaction.delta-commits’ = ‘1’);

它將為 Paimon 表創(chuàng)建一個(gè)新schema,即 schema-1,但在下一次提交之前還沒(méi)有快照實(shí)際使用該schema。

2.10.5 過(guò)期快照

在快照過(guò)期的過(guò)程中,首先確定快照的范圍,然后將這些快照內(nèi)的數(shù)據(jù)文件標(biāo)記為刪除。僅當(dāng)存在引用特定數(shù)據(jù)文件的類型為 DELETE 的清單條目時(shí),數(shù)據(jù)文件才會(huì)被標(biāo)記為刪除。此標(biāo)記可確保該文件不會(huì)被后續(xù)快照使用并可以安全刪除。

假設(shè)上圖中的所有 4 個(gè)快照都即將過(guò)期。過(guò)期流程如下:

它首先刪除所有標(biāo)記的數(shù)據(jù)文件,并記錄任何更改的存儲(chǔ)桶。

然后它會(huì)刪除所有更改日志文件和關(guān)聯(lián)的清單。

最后,它刪除快照本身并寫入最早的提示文件。

如果刪除過(guò)程后有任何目錄留空,它們也將被刪除。

假設(shè)創(chuàng)建了另一個(gè)快照 snapshot-5 并觸發(fā)了快照過(guò)期。 snapshot-1 到 snapshot-4 被刪除。為簡(jiǎn)單起見,我們將只關(guān)注以前快照中的文件,快照過(guò)期后的最終布局如下所示:

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

結(jié)果,分區(qū)20230503至20230510被物理刪除。

2.10.6 Flink 流式寫入

用 CDC 攝取的示例來(lái)說(shuō)明 Flink Stream Write。本節(jié)將討論更改數(shù)據(jù)的捕獲和寫入 Paimon,以及異步Compaction和快照提交和過(guò)期背后的機(jī)制。

CDC 攝取工作流程以及所涉及的每個(gè)組件所扮演的獨(dú)特角色:

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

(1)MySQL CDC Source統(tǒng)一讀取快照和增量數(shù)據(jù),分別由SnapshotReader讀取快照數(shù)據(jù)和BinlogReader讀取增量數(shù)據(jù)。

(2)Paimon Sink將數(shù)據(jù)寫入桶級(jí)別的Paimon表中。其中的CompactManager將異步觸發(fā)Compaction。

(3)Committer Operator 是一個(gè)單例,負(fù)責(zé)提交和過(guò)期快照。

端到端數(shù)據(jù)流:

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

MySQL Cdc Source讀取快照和增量數(shù)據(jù),并在規(guī)范化后將它們發(fā)送到下游:

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

Paimon Sink 首先將新記錄緩沖在基于堆的 LSM 樹中,并在內(nèi)存緩沖區(qū)滿時(shí)將它們刷新到磁盤。請(qǐng)注意,寫入的每個(gè)數(shù)據(jù)文件都是Sorted Run。此時(shí),還沒(méi)有創(chuàng)建清單文件和快照。在 Flink 檢查點(diǎn)發(fā)生之前,Paimon Sink 將刷新所有緩沖記錄并向下游發(fā)送可提交消息,該消息在檢查點(diǎn)期間由 Committer Operator 讀取并提交:

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

在檢查點(diǎn)期間,Committer Operator 將創(chuàng)建一個(gè)新快照并將其與清單列表關(guān)聯(lián)起來(lái),以便該快照包含有關(guān)表中所有數(shù)據(jù)文件的信息:

流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用,# Paimon,apache,原力計(jì)劃

稍后可能會(huì)發(fā)生異步Compaction,CompactManager 生成的提交表包含有關(guān)先前文件和合并文件的信息,以便 Committer Operator 可以構(gòu)造相應(yīng)的清單條目。在這種情況下,Committer Operator 可能會(huì)在 Flink 檢查點(diǎn)期間生成兩個(gè)快照:

一個(gè)用于寫入數(shù)據(jù)(Append 類型的快照),

另一個(gè)用于compact(Compact 類型的快照)。

如果在檢查點(diǎn)間隔期間沒(méi)有寫入數(shù)據(jù)文件,則只會(huì)創(chuàng)建 Compact 類型的快照。 Committer Operator 將檢查快照是否過(guò)期并執(zhí)行標(biāo)記數(shù)據(jù)文件的物理刪除。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-708905.html

到了這里,關(guān)于流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 新一代數(shù)據(jù)湖存儲(chǔ)技術(shù)Apache Paimon入門Demo

    新一代數(shù)據(jù)湖存儲(chǔ)技術(shù)Apache Paimon入門Demo

    目錄 前言 1. 什么是 Apache Paimon 一、本地環(huán)境快速上手 1、本地Flink偽集群 2、IDEA中跑Paimon Demo 2.1 代碼 2.2 IDEA中成功運(yùn)行 3、IDEA中Stream讀寫 3.1 流寫 3.2 流讀(toChangeLogStream) 二、進(jìn)階:本地(IDEA)多流拼接測(cè)試 要解決的問(wèn)題: note: 1、\\\'changelog-producer\\\' = \\\'full-compaction\\\' (1)m

    2024年02月08日
    瀏覽(21)
  • 掌握實(shí)時(shí)數(shù)據(jù)流:使用Apache Flink消費(fèi)Kafka數(shù)據(jù)

    掌握實(shí)時(shí)數(shù)據(jù)流:使用Apache Flink消費(fèi)Kafka數(shù)據(jù)

    ? ? ? ? 導(dǎo)讀:使用Flink實(shí)時(shí)消費(fèi)Kafka數(shù)據(jù)的案例是探索實(shí)時(shí)數(shù)據(jù)處理領(lǐng)域的絕佳方式。不僅非常實(shí)用,而且對(duì)于理解現(xiàn)代數(shù)據(jù)架構(gòu)和流處理技術(shù)具有重要意義。 ????????Apache Flink ?是一個(gè)在 有界 數(shù)據(jù)流和 無(wú)界 數(shù)據(jù)流上進(jìn)行有狀態(tài)計(jì)算分布式處理引擎和框架。Flink 設(shè)計(jì)旨

    2024年02月03日
    瀏覽(31)
  • Apache Paimon 文件管理

    Apache Paimon 文件管理

    管理小文件 許多用戶關(guān)注小文件問(wèn)題,可能導(dǎo)致以下情況: 穩(wěn)定性問(wèn)題:HDFS 中如果存在太多小文件的話會(huì)導(dǎo)致 NameNode 壓力過(guò)大 成本問(wèn)題:在 HDFS 中,每個(gè)小文件都會(huì)占用至少一個(gè)數(shù)據(jù)塊的大小,例如 128 MB 查詢效率:查詢過(guò)多小文件會(huì)影響查詢效率 理解 Checkpoint 假設(shè)你正

    2024年02月21日
    瀏覽(19)
  • 怎么使用 Flink 向 Apache Doris 表中寫 Bitmap 類型的數(shù)據(jù)

    Bitmap是一種經(jīng)典的數(shù)據(jù)結(jié)構(gòu),用于高效地對(duì)大量的二進(jìn)制數(shù)據(jù)進(jìn)行壓縮存儲(chǔ)和快速查詢。Doris支持bitmap數(shù)據(jù)類型,在Flink計(jì)算場(chǎng)景中,可以結(jié)合Flink doris Connector對(duì)bitmap數(shù)據(jù)做計(jì)算。 社區(qū)里很多小伙伴在是Doris Flink Connector的時(shí)候,不知道怎么寫B(tài)itmap類型的數(shù)據(jù),本文將介紹如何

    2024年02月07日
    瀏覽(18)
  • 使用 Flink CDC 實(shí)現(xiàn) MySQL 數(shù)據(jù),表結(jié)構(gòu)實(shí)時(shí)入 Apache Doris

    現(xiàn)有數(shù)據(jù)庫(kù):mysql 數(shù)據(jù):庫(kù)表較多,每個(gè)企業(yè)用戶一個(gè)分庫(kù),每個(gè)企業(yè)下的表均不同,無(wú)法做到聚合,且表可以被用戶隨意改動(dòng),增刪改列等,增加表 分析:用戶自定義分析,通過(guò)拖拽定義圖卡,要求實(shí)時(shí),點(diǎn)擊確認(rèn)即出現(xiàn)相應(yīng)結(jié)果,其中有無(wú)法預(yù)判的過(guò)濾 問(wèn)題:隨業(yè)務(wù)增長(zhǎng)

    2023年04月08日
    瀏覽(21)
  • 如何使用Docker部署Apache+Superset數(shù)據(jù)平臺(tái)并遠(yuǎn)程訪問(wèn)?

    Superset是一款由中國(guó)知名科技公司開源的“現(xiàn)代化的企業(yè)級(jí)BI(商業(yè)智能)Web應(yīng)用程序”,其通過(guò)創(chuàng)建和分享dashboard,為數(shù)據(jù)分析提供了輕量級(jí)的數(shù)據(jù)查詢和可視化方案。Superset在數(shù)據(jù)處理和可視化方面具有強(qiáng)大的功能,能夠滿足企業(yè)級(jí)的數(shù)據(jù)分析需求,并為用戶提供直觀、靈

    2024年02月04日
    瀏覽(17)
  • 數(shù)據(jù)架構(gòu)的實(shí)時(shí)分析:Apache Flink 和 Apache Storm 的比較

    實(shí)時(shí)數(shù)據(jù)處理在大數(shù)據(jù)領(lǐng)域具有重要意義,它可以幫助企業(yè)更快地獲取和分析數(shù)據(jù),從而更快地做出決策。隨著數(shù)據(jù)量的增加,傳統(tǒng)的批處理方法已經(jīng)不能滿足企業(yè)的需求,因此需要使用實(shí)時(shí)數(shù)據(jù)處理技術(shù)。 Apache Flink 和 Apache Storm 是兩個(gè)流行的實(shí)時(shí)數(shù)據(jù)處理框架,它們都可以

    2024年01月23日
    瀏覽(29)
  • 使用 Apache Flink 開發(fā)實(shí)時(shí) ETL

    使用 Apache Flink 開發(fā)實(shí)時(shí) ETL

    Apache Flink 是大數(shù)據(jù)領(lǐng)域又一新興框架。它與 Spark 的不同之處在于,它是使用流式處理來(lái)模擬批量處理的,因此能夠提供亞秒級(jí)的、符合 Exactly-once 語(yǔ)義的實(shí)時(shí)處理能力。Flink 的使用場(chǎng)景之一是構(gòu)建實(shí)時(shí)的數(shù)據(jù)通道,在不同的存儲(chǔ)之間搬運(yùn)和轉(zhuǎn)換數(shù)據(jù)。本文將介紹如何使用 F

    2024年02月05日
    瀏覽(25)
  • 【Apache Flink】Flink DataStream API的基本使用

    【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于處理無(wú)界和有界數(shù)據(jù)流 。 無(wú)界數(shù)據(jù)流 是一個(gè)持續(xù)生成數(shù)據(jù)的數(shù)據(jù)源,它沒(méi)有明確的結(jié)束點(diǎn),例如實(shí)時(shí)的交易數(shù)據(jù)或傳感器數(shù)據(jù)。這種類型的數(shù)據(jù)流需要使用Apache Flink的實(shí)時(shí)處理功能來(lái)連續(xù)地處理和分析。 有界數(shù)據(jù)流 是一個(gè)

    2024年02月06日
    瀏覽(25)
  • Apache Ranger入門與進(jìn)階使用

    Apache Ranger入門與進(jìn)階使用

    ranger是hadoop生態(tài)中的權(quán)限管理和用戶審計(jì)插件,ranger豐富的插件數(shù)量讓它的使用非常廣泛,但是苦于官方文檔非常少,學(xué)習(xí)起來(lái)就非常麻煩。本篇博客是取各文檔之精華所做的,相信你完成的看完后對(duì)于ranger會(huì)有更多的理解 ranger沒(méi)有二進(jìn)制包提供,需要自己手動(dòng)編譯下,請(qǐng)先

    2024年02月14日
    瀏覽(17)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包