2.9 進(jìn)階使用
2.9.1 寫入性能
Paimon的寫入性能與檢查點(diǎn)密切相關(guān),因此需要更大的寫入吞吐量:
增加檢查點(diǎn)間隔,或者僅使用批處理模式。
增加寫入緩沖區(qū)大小。
啟用寫緩沖區(qū)溢出。
如果您使用固定存儲(chǔ)桶模式,請(qǐng)重新調(diào)整存儲(chǔ)桶數(shù)量。
2.9.1.1 并行度
建議sink的并行度小于等于bucket的數(shù)量,最好相等。
選項(xiàng) | 必需的 | 默認(rèn) | 類型 | 描述 |
---|---|---|---|---|
sink.parallelism | No | (none) | Integer | 定義sink的并行度。默認(rèn)情況下,并行度由框架使用上游鏈?zhǔn)竭\(yùn)算符的相同并行度來(lái)確定。 |
2.9.1.2 Compaction
當(dāng)Sorted Run數(shù)量較少時(shí),Paimon writer 將在單獨(dú)的線程中異步執(zhí)行壓縮,因此記錄可以連續(xù)寫入表中。然而,為了避免Sorted Runs的無(wú)限增長(zhǎng),當(dāng)Sorted Run的數(shù)量達(dá)到閾值時(shí),writer將不得不暫停寫入。下表屬性確定閾值。
選項(xiàng) | 必需的 | 默認(rèn) | 類型 | 描述 |
---|---|---|---|---|
num-sorted-run.stop-trigger | No | (none) | Integer | 觸發(fā)停止寫入的Sorted Runs次數(shù),默認(rèn)值為 ‘num-sorted-run.compaction-trigger’ + 1。 |
當(dāng) num-sorted-run.stop-trigger 變大時(shí),寫入停頓將變得不那么頻繁,從而提高寫入性能。但是,如果該值變得太大,則查詢表時(shí)將需要更多內(nèi)存和 CPU 時(shí)間。如果您擔(dān)心內(nèi)存 OOM,請(qǐng)配置sort-spill-threshold。它的值取決于你的內(nèi)存大小。
2.9.1.3 優(yōu)先考慮寫入吞吐量
如果希望某種模式具有最大寫入吞吐量,則可以緩慢而不是匆忙地進(jìn)行Compaction??梢詫?duì)表使用以下策略
num-sorted-run.stop-trigger = 2147483647
sort-spill-threshold = 10
此配置將在寫入高峰期生成更多文件,并在寫入低谷期逐漸合并到最佳讀取性能。
2.9.1.4 觸發(fā)Compaction的Sorted Run數(shù)
Paimon使用LSM樹,支持大量更新。 LSM 在多次Sorted Runs中組織文件。從 LSM 樹查詢記錄時(shí),必須組合所有Sorted Runs以生成所有記錄的完整視圖。
過(guò)多的Sorted Run會(huì)導(dǎo)致查詢性能不佳。為了將Sorted Run的數(shù)量保持在合理的范圍內(nèi),Paimon writers 將自動(dòng)執(zhí)行Compaction。下表屬性確定觸發(fā)Compaction的最小Sorted Run數(shù)。
選項(xiàng) | 必需的 | 默認(rèn) | 類型 | 描述 |
---|---|---|---|---|
num-sorted-run.compaction-trigger | No | 5 | Integer | 觸發(fā)Compaction的Sorted Run數(shù)。包括 0 級(jí)文件(一個(gè)文件一級(jí)排序運(yùn)行)和高級(jí)運(yùn)行(一個(gè)一級(jí)排序運(yùn)行)。 |
2.9.1.5 寫入初始化
在write初始化時(shí),bucket的writer需要讀取所有歷史文件。如果這里出現(xiàn)瓶頸(例如同時(shí)寫入大量分區(qū)),可以使用write-manifest-cache緩存讀取的manifest數(shù)據(jù),以加速初始化。
2.9.1.6 內(nèi)存
Paimon writer中主要占用內(nèi)存的地方有3個(gè):
Writer的內(nèi)存緩沖區(qū),由單個(gè)任務(wù)的所有Writer共享和搶占。該內(nèi)存值可以通過(guò) write-buffer-size 表屬性進(jìn)行調(diào)整。
合并多個(gè)Sorted Run以進(jìn)行Compaction時(shí)會(huì)消耗內(nèi)存。可以通過(guò) num-sorted-run.compaction-trigger 選項(xiàng)進(jìn)行調(diào)整,以更改要合并的Sorted Run的數(shù)量。
如果行非常大,在進(jìn)行Compaction時(shí)一次讀取太多行數(shù)據(jù)可能會(huì)消耗大量?jī)?nèi)存。減少 read.batch-size 選項(xiàng)可以減輕這種情況的影響。
寫入列式(ORC、Parquet等)文件所消耗的內(nèi)存,不可調(diào)。
2.9.2 讀取性能
2.9.2.1 Full Compaction
配置“full-compaction.delta-commits”在Flink寫入中定期執(zhí)行full-compaction。并且可以確保在寫入結(jié)束之前分區(qū)被完全Compaction。
注意:Paimon 默認(rèn)處理小文件并提供良好的讀取性能。請(qǐng)不要在沒(méi)有任何要求的情況下配置此Full Compaction選項(xiàng),因?yàn)樗鼤?huì)對(duì)性能產(chǎn)生重大影響。
2.9.2.2 主鍵表
對(duì)于主鍵表來(lái)說(shuō),這是一種“MergeOnRead”技術(shù)。讀取數(shù)據(jù)時(shí),會(huì)合并多層LSM數(shù)據(jù),并行數(shù)會(huì)受到桶數(shù)的限制。雖然Paimon的merge會(huì)高效,但是還是趕不上普通的AppendOnly表。
如果你想在某些場(chǎng)景下查詢得足夠快,但只能找到較舊的數(shù)據(jù),你可以:
配置full-compaction.delta-commits,寫入數(shù)據(jù)時(shí)(目前只有Flink)會(huì)定期進(jìn)行full Compaction。
配置“scan.mode”為“compacted-full”,讀取數(shù)據(jù)時(shí),選擇full-compaction的快照。讀取性能良好。
2.9.2.3 僅追加表
小文件會(huì)降低讀取速度并影響 DFS 穩(wěn)定性。默認(rèn)情況下,當(dāng)單個(gè)存儲(chǔ)桶中的小文件超過(guò)“compaction.max.file-num”(默認(rèn)50個(gè))時(shí),就會(huì)觸發(fā)compaction。但是當(dāng)有多個(gè)桶時(shí),就會(huì)產(chǎn)生很多小文件。
您可以使用full-compaction來(lái)減少小文件。full-compaction將消除大多數(shù)小文件。
2.9.2.4 格式
Paimon 對(duì) parquet 讀取進(jìn)行了一些查詢優(yōu)化,因此 parquet 會(huì)比 orc 稍快一些。
2.9.3 多Writer并發(fā)寫入
Paimon的快照管理支持向多個(gè)writer寫入。
默認(rèn)情況下,Paimon支持對(duì)不同分區(qū)的并發(fā)寫入。推薦的方式是streaming job將記錄寫入Paimon的最新分區(qū);同時(shí)批處理作業(yè)(覆蓋)將記錄寫入歷史分區(qū)。
如果需要多個(gè)Writer寫到同一個(gè)分區(qū),事情就會(huì)變得有點(diǎn)復(fù)雜。例如,不想使用 UNION ALL,那就需要有多個(gè)流作業(yè)來(lái)寫入“partial-update”表。參考如下的“Dedicated Compaction Job”。
2.9.3.1 Dedicated Compaction Job
默認(rèn)情況下,Paimon writer 在寫入記錄時(shí)會(huì)根據(jù)需要執(zhí)行Compaction。這對(duì)于大多數(shù)用例來(lái)說(shuō)已經(jīng)足夠了,但有兩個(gè)缺點(diǎn):
這可能會(huì)導(dǎo)致寫入吞吐量不穩(wěn)定,因?yàn)閳?zhí)行壓縮時(shí)吞吐量可能會(huì)暫時(shí)下降。
Compaction會(huì)將某些數(shù)據(jù)文件標(biāo)記為“已刪除”(并未真正刪除)。如果多個(gè)writer標(biāo)記同一個(gè)文件,則在提交更改時(shí)會(huì)發(fā)生沖突。 Paimon 會(huì)自動(dòng)解決沖突,但這可能會(huì)導(dǎo)致作業(yè)重新啟動(dòng)。
為了避免這些缺點(diǎn),用戶還可以選擇在writer中跳過(guò)Compaction,并僅運(yùn)行專門的作業(yè)來(lái)進(jìn)行Compaction。由于Compaction僅由專用作業(yè)執(zhí)行,因此writer可以連續(xù)寫入記錄而無(wú)需暫停,并且不會(huì)發(fā)生沖突。
選項(xiàng) | 必需的 | 默認(rèn) | 類型 | 描述 |
---|---|---|---|---|
write-only | No | false | Boolean | 如果設(shè)置為 true,將跳過(guò)Compaction和快照過(guò)期。此選項(xiàng)與獨(dú)立Compaction一起使用。 |
Flink SQL目前不支持compaction相關(guān)的語(yǔ)句,所以我們必須通過(guò)flink run來(lái)提交compaction作業(yè)。
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
compact \
–warehouse \
–database \
–table \
[–partition ] \
[–catalog-conf [–catalog-conf …]] \
如果提交一個(gè)批處理作業(yè)(execution.runtime-mode:batch),當(dāng)前所有的表文件都會(huì)被Compaction。如果您提交一個(gè)流作業(yè)(execution.runtime-mode: Streaming),該作業(yè)將持續(xù)監(jiān)視表的新更改并根據(jù)需要執(zhí)行Compaction。
2.9.4 表管理
2.9.4.1 管理快照
1)快照過(guò)期
Paimon Writer每次提交都會(huì)生成一個(gè)或兩個(gè)快照。每個(gè)快照可能會(huì)添加一些新的數(shù)據(jù)文件或?qū)⒁恍┡f的數(shù)據(jù)文件標(biāo)記為已刪除。然而,標(biāo)記的數(shù)據(jù)文件并沒(méi)有真正被刪除,因?yàn)镻aimon還支持時(shí)間旅行到更早的快照。它們僅在快照過(guò)期時(shí)被刪除。
目前,Paimon Writer在提交新更改時(shí)會(huì)自動(dòng)執(zhí)行過(guò)期操作。通過(guò)使舊快照過(guò)期,可以刪除不再使用的舊數(shù)據(jù)文件和元數(shù)據(jù)文件,以釋放磁盤空間。
設(shè)置以下表屬性:
選項(xiàng) | 必需的 | 默認(rèn) | 類型 | 描述 |
---|---|---|---|---|
snapshot.time-retained | No | 1 h | Duration | 已完成快照的最長(zhǎng)時(shí)間保留。 |
snapshot.num-retained.min | No | 10 | Integer | 要保留的已完成快照的最小數(shù)量。 |
snapshot.num-retained.max | No | Integer.MAX_VALUE | Integer | 要保留的已完成快照的最大數(shù)量。 |
注意,保留時(shí)間太短或保留數(shù)量太少可能會(huì)導(dǎo)致如下問(wèn)題:
批量查詢找不到該文件。例如,表比較大,批量查詢需要10分鐘才能讀取,但是10分鐘前的快照過(guò)期了,此時(shí)批量查詢會(huì)讀取到已刪除的快照。
表文件上的流式讀取作業(yè)(沒(méi)有外部日志系統(tǒng))無(wú)法重新啟動(dòng)。當(dāng)作業(yè)重新啟動(dòng)時(shí),它記錄的快照可能已過(guò)期。 (可以使用Consumer Id來(lái)保護(hù)快照過(guò)期的小保留時(shí)間內(nèi)的流式讀取)。
2)回滾快照
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
rollback-to \
–warehouse \
–database \
–table \
–snapshot \
[–catalog-conf [–catalog-conf …]]
2.9.4.2 管理分區(qū)
創(chuàng)建分區(qū)表時(shí)可以設(shè)置partition.expiration-time。 Paimon會(huì)定期檢查分區(qū)的狀態(tài),并根據(jù)時(shí)間刪除過(guò)期的分區(qū)。
判斷分區(qū)是否過(guò)期:將分區(qū)中提取的時(shí)間與當(dāng)前時(shí)間進(jìn)行比較,看生存時(shí)間是否超過(guò)partition.expiration-time。比如:
CREATE TABLE T (…) PARTITIONED BY (dt) WITH (
‘partition.expiration-time’ = ‘7 d’,
‘partition.expiration-check-interval’ = ‘1 d’,
‘partition.timestamp-formatter’ = ‘yyyyMMdd’
);
選項(xiàng) | 默認(rèn) | 類型 | 描述 |
---|---|---|---|
partition.expiration-check-interval | 1 h | Duration | 分區(qū)過(guò)期的檢查間隔。 |
partition.expiration-time | (none) | Duration | 分區(qū)的過(guò)期時(shí)間間隔。如果分區(qū)的生命周期超過(guò)此值,則該分區(qū)將過(guò)期。分區(qū)時(shí)間是從分區(qū)值中提取的。 |
partition.timestamp-formatter | (none) | String | 用于格式化字符串時(shí)間戳的格式化程序。它可以與“partition.timestamp-pattern”一起使用來(lái)創(chuàng)建使用指定值的格式化程序。> 默認(rèn)格式化程序?yàn)椤皔yyy-MM-dd HH:mm:ss”和“yyyy-MM-dd”。> 支持多個(gè)分區(qū)字段,例如“ y e a r ? year- year?month-$day $hour:00:00”。> 時(shí)間戳格式化程序與 Java 的 DateTimeFormatter 兼容。 |
partition.timestamp-pattern | (none) | String | 可以指定一種模式來(lái)從分區(qū)獲取時(shí)間戳。格式化程序模式由“partition.timestamp-formatter”定義。> 默認(rèn)情況下,從第一個(gè)字段讀取。> 如果分區(qū)中的時(shí)間戳是名為“dt”的單個(gè)字段,則可以使用“ d t ”。 > 如果它分布在年、月、日和小時(shí)的多個(gè)字段中,則可以使用“ dt”。> 如果它分布在年、月、日和小時(shí)的多個(gè)字段中,則可以使用“ dt”。>如果它分布在年、月、日和小時(shí)的多個(gè)字段中,則可以使用“year- m o n t h ? month- month?day h o u r : 00 : 00 ”。 > 如果時(shí)間戳位于 d t 和 h o u r 字段中,則可以使用“ hour:00:00”。> 如果時(shí)間戳位于 dt 和 hour 字段中,則可以使用“ hour:00:00”。>如果時(shí)間戳位于dt和hour字段中,則可以使用“dt $hour:00:00”。 |
2.9.4.3 管理小文件
小文件可能會(huì)導(dǎo)致:
穩(wěn)定性問(wèn)題:HDFS中小文件過(guò)多,NameNode會(huì)承受過(guò)大的壓力。
成本問(wèn)題:HDFS中的小文件會(huì)暫時(shí)使用最小1個(gè)Block的大小,例如128MB。
查詢效率:小文件過(guò)多查詢效率會(huì)受到影響。
1)Flink Checkpoint的影響
使用Flink Writer,每個(gè)checkpoint會(huì)生成 1-2 個(gè)快照,并且checkpoint會(huì)強(qiáng)制在 DFS 上生成文件,因此checkpoint間隔越小,會(huì)生成越多的小文件。
默認(rèn)情況下,不僅checkpoint會(huì)導(dǎo)致文件生成,writer的內(nèi)存(write-buffer-size)耗盡也會(huì)將數(shù)據(jù)flush到DFS并生成相應(yīng)的文件??梢詥⒂?write-buffer-spillable 在 writer 中生成溢出文件,從而在 DFS 中生成更大的文件。
所以,可以設(shè)置如下:
增大checkpoint間隔
增加 write-buffer-size 或啟用 write-buffer-spillable
2)快照的影響
Paimon維護(hù)文件的多個(gè)版本,文件的Compaction和刪除是邏輯上的,并沒(méi)有真正刪除文件。文件只有在 Snapshot 過(guò)期后才會(huì)被真正刪除,因此減少文件的第一個(gè)方法就是減少 Snapshot 過(guò)期的時(shí)間。 Flink writer 會(huì)自動(dòng)使快照過(guò)期。
分區(qū)和分桶的影響
表數(shù)據(jù)會(huì)被物理分片到不同的分區(qū),里面有不同的桶,所以如果整體數(shù)據(jù)量太小,單個(gè)桶中至少有一個(gè)文件,建議你配置較少的桶數(shù),否則會(huì)出現(xiàn)也有很多小文件。
3)主鍵表LSM的影響
LSM 樹將文件組織成Sorted Runs的運(yùn)行。Sorted Runs由一個(gè)或多個(gè)數(shù)據(jù)文件組成,并且每個(gè)數(shù)據(jù)文件恰好屬于一個(gè)Sorted Runs。
默認(rèn)情況下,Sorted Runs數(shù)取決于 num-sorted-run.compaction-trigger,這意味著一個(gè)桶中至少有 5 個(gè)文件。如果要減少此數(shù)量,可以保留更少的文件,但寫入性能可能會(huì)受到影響。
4)僅追加表的文件的影響
默認(rèn)情況下,Append-Only 還會(huì)進(jìn)行自動(dòng)Compaction以減少小文件的數(shù)量
對(duì)于分桶的 Append-only 表,為了排序會(huì)對(duì)bucket內(nèi)的文件行Compaction,可能會(huì)保留更多的小文件。
5)Full-Compaction的影響
主鍵表是5個(gè)文件,但是Append-Only表(桶)可能單個(gè)桶里有50個(gè)小文件,這是很難接受的。更糟糕的是,不再活動(dòng)的分區(qū)還保留了如此多的小文件。
建議配置Full-Compaction,在Flink寫入時(shí)配置‘full-compaction.delta-commits’定期進(jìn)行full-compaction。并且可以確保在寫入結(jié)束之前分區(qū)被full-compaction。
2.9.5 縮放Bucket
1)說(shuō)明
由于總桶數(shù)對(duì)性能影響很大,Paimon 允許用戶通過(guò) ALTER TABLE 命令調(diào)整桶數(shù),并通過(guò) INSERT OVERWRITE 重新組織數(shù)據(jù)布局,而無(wú)需重新創(chuàng)建表/分區(qū)。當(dāng)執(zhí)行覆蓋作業(yè)時(shí),框架會(huì)自動(dòng)掃描舊桶號(hào)的數(shù)據(jù),并根據(jù)當(dāng)前桶號(hào)對(duì)記錄進(jìn)行哈希處理。
– rescale number of total buckets
ALTER TABLE table_identifier SET (‘bucket’ = ‘…’)
– reorganize data layout of table/partition
INSERT OVERWRITE table_identifier [PARTITION (part_spec)]
SELECT …
FROM table_identifier
[WHERE part_spec]
注意:
ALTER TABLE 僅修改表的元數(shù)據(jù),不會(huì)重新組織或重新格式化現(xiàn)有數(shù)據(jù)。重新組織現(xiàn)有數(shù)據(jù)必須通過(guò)INSERT OVERWRITE來(lái)實(shí)現(xiàn)。
重新縮放桶數(shù)不會(huì)影響讀取和正在運(yùn)行的寫入作業(yè)。
一旦存儲(chǔ)桶編號(hào)更改,任何新安排的 INSERT INTO 作業(yè)寫入未重新組織的現(xiàn)有表/分區(qū)將拋出 TableException ,并顯示如下類似異常:
Try to write table/partition … with a new bucket num …,
but the previous bucket num is … Please switch to batch mode,
and perform INSERT OVERWRITE to rescale current data layout first.
對(duì)于分區(qū)表,不同的分區(qū)可以有不同的桶號(hào)。例如:
ALTER TABLE my_table SET (‘bucket’ = ‘4’);
INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-01’)
SELECT * FROM …;
ALTER TABLE my_table SET (‘bucket’ = ‘8’);
INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-02’)
SELECT * FROM …;
在覆蓋期間,確保沒(méi)有其他作業(yè)寫入同一表/分區(qū)。
注意:對(duì)于啟用日志系統(tǒng)的表(例如Kafka),請(qǐng)重新調(diào)整主題的分區(qū)以保持一致性。
重新縮放存儲(chǔ)桶有助于處理吞吐量的突然峰值。假設(shè)有一個(gè)每日流式ETL任務(wù)來(lái)同步交易數(shù)據(jù)。該表的DDL和管道如下所示。
2)官方示例:
? 如下是正在跑的一個(gè)作業(yè):
– 建表
CREATE TABLE verified_orders (
trade_order_id BIGINT,
item_id BIGINT,
item_price DOUBLE,
dt STRING,
PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED
) PARTITIONED BY (dt)
WITH (
‘bucket’ = ‘16’
);
– kafka表
CREATE temporary TABLE raw_orders(
trade_order_id BIGINT,
item_id BIGINT,
item_price BIGINT,
gmt_create STRING,
order_status STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘…’,
‘properties.bootstrap.servers’ = ‘…’,
‘format’ = ‘csv’
…
);
– 流式插入16個(gè)分桶
INSERT INTO verified_orders
SELECT trade_order_id,
? item_id,
? item_price,
? DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt
FROM raw_orders
WHERE order_status = ‘verified’;
過(guò)去幾周運(yùn)行良好。然而,最近數(shù)據(jù)量增長(zhǎng)很快,作業(yè)的延遲不斷增加。為了提高數(shù)據(jù)新鮮度,用戶可以執(zhí)行如下操作縮放分桶:
(1)使用保存點(diǎn)暫停流作業(yè)
$ ./bin/flink stop \
–savepointPath /tmp/flink-savepoints \
$JOB_ID
(2)增加桶數(shù)
ALTER TABLE verified_orders SET (‘bucket’ = ‘32’);
(3)切換到批處理模式并覆蓋流作業(yè)正在寫入的當(dāng)前分區(qū)
SET ‘execution.runtime-mode’ = ‘batch’;
– 假設(shè)今天是2022-06-22
– 情況1:沒(méi)有更新歷史分區(qū)的延遲事件,因此覆蓋今天的分區(qū)就足夠了
INSERT OVERWRITE verified_orders PARTITION (dt = ‘2022-06-22’)
SELECT trade_order_id,
? item_id,
? item_price
FROM verified_orders
WHERE dt = ‘2022-06-22’;
- 情況2:有更新歷史分區(qū)的延遲事件,但范圍不超過(guò)3天
INSERT OVERWRITE verified_orders
SELECT trade_order_id,
? item_id,
? item_price,
? dt
FROM verified_orders
WHERE dt IN (‘2022-06-20’, ‘2022-06-21’, ‘2022-06-22’);
(4)覆蓋作業(yè)完成后,切換回流模式,從保存點(diǎn)恢復(fù)(可以增加并行度=新bucket數(shù)量)。
SET ‘execution.runtime-mode’ = ‘streaming’;
SET ‘execution.savepoint.path’ = ;
INSERT INTO verified_orders
SELECT trade_order_id,
item_id,
item_price,
DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt
FROM raw_orders
WHERE order_status = ‘verified’;
2.10 文件操作理解
2.10.1 插入數(shù)據(jù)
當(dāng)我們執(zhí)行INSERT INTO
CREATE CATALOG paimon WITH (
‘type’ = ‘paimon’,
‘warehouse’ = ‘file:///tmp/paimon’
);
USE CATALOG paimon;
CREATE TABLE T (
id BIGINT,
a INT,
b STRING,
dt STRING COMMENT ‘timestamp string in format yyyyMMdd’,
PRIMARY KEY(id, dt) NOT ENFORCED
) PARTITIONED BY (dt);
INSERT INTO T VALUES (1, 10001, ‘varchar00001’, ‘20230501’);
一旦Flink作業(yè)完成,記錄就會(huì)通過(guò)成功提交寫入Paimon表中。用戶可以通過(guò)執(zhí)行查詢 SELECT * FROM T 來(lái)驗(yàn)證這些記錄的可見性,該查詢將返回單行。提交過(guò)程創(chuàng)建位于路徑 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。 snapshot-1 處生成的文件布局如下所述:
snapshot-1 的內(nèi)容包含快照的元數(shù)據(jù),例如清單列表(manifest list)和schema ID:
{
“version” : 3,
“id” : 1,
“schemaId” : 0,
“baseManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0”,
“deltaManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1”,
“changelogManifestList” : null,
“commitUser” : “7d758485-981d-4b1a-a0c6-d34c3eb254bf”,
“commitIdentifier” : 9223372036854775807,
“commitKind” : “APPEND”,
“timeMillis” : 1684155393354,
“l(fā)ogOffsets” : { },
“totalRecordCount” : 1,
“deltaRecordCount” : 1,
“changelogRecordCount” : 0,
“watermark” : -9223372036854775808
}
清單列表包含快照的所有更改,baseManifestList 是應(yīng)用 deltaManifestList 中的更改的基礎(chǔ)文件。第一次提交將生成 1 個(gè)清單文件(manifest file),并創(chuàng)建 2 個(gè)清單列表(manifest list):
./T/manifest:
–deltaManifestList:包含對(duì)數(shù)據(jù)文件執(zhí)行操作的清單條目列表(上圖中的 manifest-list-1-delta)
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1
?
–baseManifestList:空的(上圖中的 manifest-list-1-base)
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0
–清單文件:存儲(chǔ)快照中數(shù)據(jù)文件的信息(上圖中的manifest-1-0)
manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0
跨不同分區(qū)插入一批記錄:
INSERT INTO T VALUES
(2, 10002, ‘varchar00002’, ‘20230502’),
(3, 10003, ‘varchar00003’, ‘20230503’),
(4, 10004, ‘varchar00004’, ‘20230504’),
(5, 10005, ‘varchar00005’, ‘20230505’),
(6, 10006, ‘varchar00006’, ‘20230506’),
(7, 10007, ‘varchar00007’, ‘20230507’),
(8, 10008, ‘varchar00008’, ‘20230508’),
(9, 10009, ‘varchar00009’, ‘20230509’),
(10, 10010, ‘varchar00010’, ‘20230510’);
第二次提交發(fā)生,執(zhí)行 SELECT * FROM T 將返回 10 行。創(chuàng)建一個(gè)新快照,即 snapshot-2,并為我們提供以下物理文件布局:
% ls -atR .
./T:
dt=20230501
dt=20230502
dt=20230503
dt=20230504
dt=20230505
dt=20230506
dt=20230507
dt=20230508
dt=20230509
dt=20230510
snapshot
schema
manifest
./T/snapshot:
LATEST
snapshot-2
EARLIEST
snapshot-1
./T/manifest:
manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # delta manifest list for snapshot-2
manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # base manifest list for snapshot-2
manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0 # manifest file for snapshot-2
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # delta manifest list for snapshot-1
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # base manifest list for snapshot-1
manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 # manifest file for snapshot-1
./T/dt=20230501/bucket-0:
data-b75b7381-7c8b-430f-b7e5-a204cb65843c-0.orc
…
# each partition has the data written to bucket-0
…
./T/schema:
schema-0
截至 snapshot-2 的新文件布局如下所示:
2.10.2 刪除數(shù)據(jù)
執(zhí)行如下刪除:
DELETE FROM T WHERE dt >= ‘20230503’;
第三次提交發(fā)生,它為我們提供了 snapshot-3?,F(xiàn)在,列出表下的文件,您會(huì)發(fā)現(xiàn)沒(méi)有分區(qū)被刪除。相反,會(huì)為分區(qū) 20230503 到 20230510 創(chuàng)建一個(gè)新的數(shù)據(jù)文件:
./T/dt=20230510/bucket-0:
data-b93f468c-b56f-4a93-adc4-b250b3aa3462-0.orc # newer data file created by the delete statement
data-0fcacc70-a0cb-4976-8c88-73e92769a762-0.orc # older data file created by the insert statement
因?yàn)槲覀冊(cè)诘诙翁峤恢胁迦胍粭l記錄(由 +I[10, 10010, ‘varchar00010’, ‘20230510’] 表示),然后在第三次提交中刪除該記錄。執(zhí)行 SELECT * FROM T 將返回 2 行,即:
+I[1, 10001, ‘varchar00001’, ‘20230501’]
+I[2, 10002, ‘varchar00002’, ‘20230502’]
截至 snapshot-3 的新文件布局如下所示
manifest-3-0包含8個(gè)ADD操作類型的manifest條目,對(duì)應(yīng)8個(gè)新寫入的數(shù)據(jù)文件。
2.10.3 Compaction
小文件的數(shù)量會(huì)隨著連續(xù)快照的增加而增加,這可能會(huì)導(dǎo)致讀取性能下降。因此,需要進(jìn)行full compaction以減少小文件的數(shù)量。
現(xiàn)在觸發(fā)full-compaction:
./bin/flink run \
./lib/paimon-flink-action-0.5-SNAPSHOT.jar \
compact \
–path file:///tmp/paimon/default.db/T
所有當(dāng)前表文件將被壓縮,并創(chuàng)建一個(gè)新快照,即 snapshot-4,并包含以下信息:
{
“version” : 3,
“id” : 4,
“schemaId” : 0,
“baseManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-0”,
“deltaManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-1”,
“changelogManifestList” : null,
“commitUser” : “a3d951d5-aa0e-4071-a5d4-4c72a4233d48”,
“commitIdentifier” : 9223372036854775807,
“commitKind” : “COMPACT”,
“timeMillis” : 1684163217960,
“l(fā)ogOffsets” : { },
“totalRecordCount” : 38,
“deltaRecordCount” : 20,
“changelogRecordCount” : 0,
“watermark” : -9223372036854775808
}
截至 snapshot-4 的新文件布局如下所示
manifest-4-0 包含 20 個(gè)清單條目(18 個(gè) DELETE 操作和 2 個(gè) ADD 操作):
對(duì)于分區(qū)20230503到20230510,對(duì)兩個(gè)數(shù)據(jù)文件進(jìn)行兩次DELETE操作
對(duì)于分區(qū)20230501到20230502,對(duì)同一個(gè)數(shù)據(jù)文件進(jìn)行1次DELETE操作和1次ADD操作。
2.10.4 修改表
執(zhí)行以下語(yǔ)句來(lái)配置full-compaction:
ALTER TABLE T SET (‘full-compaction.delta-commits’ = ‘1’);
它將為 Paimon 表創(chuàng)建一個(gè)新schema,即 schema-1,但在下一次提交之前還沒(méi)有快照實(shí)際使用該schema。
2.10.5 過(guò)期快照
在快照過(guò)期的過(guò)程中,首先確定快照的范圍,然后將這些快照內(nèi)的數(shù)據(jù)文件標(biāo)記為刪除。僅當(dāng)存在引用特定數(shù)據(jù)文件的類型為 DELETE 的清單條目時(shí),數(shù)據(jù)文件才會(huì)被標(biāo)記為刪除。此標(biāo)記可確保該文件不會(huì)被后續(xù)快照使用并可以安全刪除。
假設(shè)上圖中的所有 4 個(gè)快照都即將過(guò)期。過(guò)期流程如下:
它首先刪除所有標(biāo)記的數(shù)據(jù)文件,并記錄任何更改的存儲(chǔ)桶。
然后它會(huì)刪除所有更改日志文件和關(guān)聯(lián)的清單。
最后,它刪除快照本身并寫入最早的提示文件。
如果刪除過(guò)程后有任何目錄留空,它們也將被刪除。
假設(shè)創(chuàng)建了另一個(gè)快照 snapshot-5 并觸發(fā)了快照過(guò)期。 snapshot-1 到 snapshot-4 被刪除。為簡(jiǎn)單起見,我們將只關(guān)注以前快照中的文件,快照過(guò)期后的最終布局如下所示:
結(jié)果,分區(qū)20230503至20230510被物理刪除。
2.10.6 Flink 流式寫入
用 CDC 攝取的示例來(lái)說(shuō)明 Flink Stream Write。本節(jié)將討論更改數(shù)據(jù)的捕獲和寫入 Paimon,以及異步Compaction和快照提交和過(guò)期背后的機(jī)制。
CDC 攝取工作流程以及所涉及的每個(gè)組件所扮演的獨(dú)特角色:
(1)MySQL CDC Source統(tǒng)一讀取快照和增量數(shù)據(jù),分別由SnapshotReader讀取快照數(shù)據(jù)和BinlogReader讀取增量數(shù)據(jù)。
(2)Paimon Sink將數(shù)據(jù)寫入桶級(jí)別的Paimon表中。其中的CompactManager將異步觸發(fā)Compaction。
(3)Committer Operator 是一個(gè)單例,負(fù)責(zé)提交和過(guò)期快照。
端到端數(shù)據(jù)流:
MySQL Cdc Source讀取快照和增量數(shù)據(jù),并在規(guī)范化后將它們發(fā)送到下游:
Paimon Sink 首先將新記錄緩沖在基于堆的 LSM 樹中,并在內(nèi)存緩沖區(qū)滿時(shí)將它們刷新到磁盤。請(qǐng)注意,寫入的每個(gè)數(shù)據(jù)文件都是Sorted Run。此時(shí),還沒(méi)有創(chuàng)建清單文件和快照。在 Flink 檢查點(diǎn)發(fā)生之前,Paimon Sink 將刷新所有緩沖記錄并向下游發(fā)送可提交消息,該消息在檢查點(diǎn)期間由 Committer Operator 讀取并提交:
在檢查點(diǎn)期間,Committer Operator 將創(chuàng)建一個(gè)新快照并將其與清單列表關(guān)聯(lián)起來(lái),以便該快照包含有關(guān)表中所有數(shù)據(jù)文件的信息:
稍后可能會(huì)發(fā)生異步Compaction,CompactManager 生成的提交表包含有關(guān)先前文件和合并文件的信息,以便 Committer Operator 可以構(gòu)造相應(yīng)的清單條目。在這種情況下,Committer Operator 可能會(huì)在 Flink 檢查點(diǎn)期間生成兩個(gè)快照:
一個(gè)用于寫入數(shù)據(jù)(Append 類型的快照),
另一個(gè)用于compact(Compact 類型的快照)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-708905.html
如果在檢查點(diǎn)間隔期間沒(méi)有寫入數(shù)據(jù)文件,則只會(huì)創(chuàng)建 Compact 類型的快照。 Committer Operator 將檢查快照是否過(guò)期并執(zhí)行標(biāo)記數(shù)據(jù)文件的物理刪除。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-708905.html
到了這里,關(guān)于流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!