Flink 系列文章
一、Flink 專欄
Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 -
2、Flink基礎(chǔ)系列
本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 -
3、Flik Table API和SQL基礎(chǔ)系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。 -
4、Flik Table API和SQL提高與應(yīng)用系列
本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。 -
5、Flink 監(jiān)控系列
本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。
兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引
本文詳細(xì)的介紹了Flink 與hive的集成、通過flink sql讀寫hive數(shù)據(jù)。
本文依賴有hadoop、hive、kafka、flink等所有環(huán)境可用。
本分分為4個(gè)部分,即讀取hive數(shù)據(jù)、時(shí)態(tài)表的應(yīng)用、寫入hive中數(shù)據(jù)和文件格式。
本示例中hive的版本是3.1.2
flink的驗(yàn)證版本是1.13.6
hadoop的版本是3.1.4
kafka的版本是2.12-3.0.0
一、Hive的讀寫介紹
Apache Flink 可通過 HiveCatalog對(duì) Apache Hive 表的統(tǒng)一 BATCH 和 STREAM 處理。這意味著 Flink 可以用作 Hive 批處理引擎的高性能替代方案,或者連續(xù)地將數(shù)據(jù)寫入和讀出 Hive 表,以支持實(shí)時(shí)數(shù)據(jù)倉庫應(yīng)用程序。
1、讀hive數(shù)據(jù)
Flink 支持在 BATCH 和 STREAMING 模式下從 Hive 讀取數(shù)據(jù)。當(dāng)作為 BATCH 應(yīng)用程序運(yùn)行時(shí),F(xiàn)link 會(huì)在執(zhí)行查詢的時(shí)間點(diǎn)對(duì)表的狀態(tài)執(zhí)行查詢。流式讀取將持續(xù)監(jiān)視表,并在新數(shù)據(jù)可用時(shí)以增量方式獲取新數(shù)據(jù)。Flink 將默認(rèn)讀取有界的表。
流式處理讀取支持同時(shí)使用分區(qū)表和非分區(qū)表。對(duì)于分區(qū)表,F(xiàn)link 會(huì)監(jiān)控新分區(qū)的生成,并在可用時(shí)增量讀取它們。對(duì)于非分區(qū)表,F(xiàn)link 會(huì)監(jiān)控文件夾中新文件的生成,并增量讀取新文件。
SQL Hints可用于將配置應(yīng)用于 Hive 表,而無需更改其在 Hive 元存儲(chǔ)中的定義。
示例如下:
SELECT *
FROM hive_table
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2023-08-20') */;
注意:
- 監(jiān)控策略是掃描當(dāng)前位置路徑中的所有目錄/文件。許多分區(qū)可能會(huì)導(dǎo)致性能下降。
- 非分區(qū)表的流式讀取要求以原子方式將每個(gè)文件寫入目標(biāo)目錄。
- 分區(qū)表的流式讀取要求在 Hive 元存儲(chǔ)視圖中以原子方式添加每個(gè)分區(qū)。否則,將使用添加到現(xiàn)有分區(qū)的新數(shù)據(jù)。
- 流式讀取不支持 Flink DDL 中的水印語法。這些表不能用于窗口運(yùn)算符。
1)、讀取hive的視圖
Flink 能夠從 Hive 定義的視圖中讀取數(shù)據(jù),但存在一些限制:
- 當(dāng)前的catalog必須被設(shè)置為hivecatalog,設(shè)置方式有兩種,即 TableAPI:tenv.useCatalog(“alan_hivecatalog”) 和SQLcli:use catalog alan_hivecatalog
- Hive 和 Flink SQL 有不同的語法,例如不同的保留關(guān)鍵字和文字。確保視圖的查詢與 Flink 語法兼容。
2)、讀取時(shí)矢量化優(yōu)化(Vectorized Optimization upon Read)
當(dāng)滿足以下條件時(shí),F(xiàn)link 將自動(dòng)使用 Hive 表的矢量化讀?。?/p>
- 數(shù)據(jù)格式是ORC或parquet時(shí)
- 字段沒有復(fù)雜數(shù)據(jù)類型時(shí),如List、Map、Struct和Union
默認(rèn)情況下啟用此功能??梢允褂靡韵屡渲媒盟?。
table.exec.hive.fallback-mapred-reader=true
3)、Source 并發(fā)推理(Source Parallelism Inference)
默認(rèn)情況下,F(xiàn)link 會(huì)根據(jù)文件數(shù)量和每個(gè)文件中的塊數(shù)來推斷其 Hive 讀取器的最佳并行性。
Flink 允許您靈活配置并行推理的策略。您可以在 TableConfig 中配置以下參數(shù)(請(qǐng)注意,這些參數(shù)會(huì)影響作業(yè)的所有源):
4)、讀 Hive 表時(shí)調(diào)整數(shù)據(jù)分片(Split) 大小
讀 Hive 表時(shí), 數(shù)據(jù)文件將會(huì)被切分為若干個(gè)分片(split), 每一個(gè)分片是要讀取的數(shù)據(jù)的一部分。 分片是 Flink 進(jìn)行任務(wù)分配和數(shù)據(jù)并行讀取的基本粒度。 用戶可以通過下面的參數(shù)來調(diào)整每個(gè)分片的大小來做一定的讀性能調(diào)優(yōu)。
為了調(diào)整數(shù)據(jù)分片的大小, Flink 首先將計(jì)算得到所有分區(qū)下的所有文件的大小。 但是這在分區(qū)數(shù)量很多的情況下會(huì)比較耗時(shí),你可以配置作業(yè)參數(shù) table.exec.hive.calculate-partition-size.thread-num(默認(rèn)為3)為一個(gè)更大的值使用更多的線程來進(jìn)行加速。
目前上述參數(shù)僅適用于 ORC 格式的 Hive 表。
5)、加載分區(qū)切片
Flink 使用多個(gè)線程并發(fā)將 Hive 分區(qū)切分成多個(gè) split 進(jìn)行讀取。你可以使用 table.exec.hive.load-partition-splits.thread-num 去配置線程數(shù)。默認(rèn)值是3,你配置的值應(yīng)該大于0。
6)、讀取帶有子目錄的分區(qū)
在某些情況下,或許會(huì)創(chuàng)建一個(gè)引用其他表的外部表,但是該表的分區(qū)列是另一張表分區(qū)字段的子集。 比如,你創(chuàng)建了一個(gè)分區(qū)表 fact_tz,分區(qū)字段是 day 和 hour:
CREATE TABLE fact_tz(x int) PARTITIONED BY (day STRING, hour STRING);
然后你基于 fact_tz 表創(chuàng)建了一個(gè)外部表 fact_daily,并使用了一個(gè)粗粒度的分區(qū)字段 day:
CREATE EXTERNAL TABLE fact_daily(x int) PARTITIONED BY (ds STRING) LOCATION '/user/hive/warehouse/test.db/fact_tz';
當(dāng)讀取外部表 fact_daily 時(shí),該表的分區(qū)目錄下存在子目錄(hour=1 到 hour=24)。
默認(rèn)情況下,可以將帶有子目錄的分區(qū)添加到外部表中。Flink SQL 會(huì)遞歸掃描所有的子目錄,并獲取所有子目錄中數(shù)據(jù)。
ALTER TABLE fact_daily ADD PARTITION (ds='2023-08-11') location '/user/hive/warehouse/test.db/fact_tz/ds=2023-08-11';
你可以設(shè)置作業(yè)屬性 table.exec.hive.read-partition-with-subdirectory.enabled (默認(rèn)為 true) 為 false 以禁止 Flink 讀取子目錄。 如果你設(shè)置成 false 并且分區(qū)目錄下不包含任何子目錄,F(xiàn)link 會(huì)拋出 java.io.IOException: Not a file: /path/to/data/* 異常。
2、時(shí)態(tài)表 Join
你可以使用 Hive 表作為時(shí)態(tài)表,然后一個(gè)數(shù)據(jù)流就可以使用 temporal join 關(guān)聯(lián) Hive 表。 請(qǐng)參照 temporal join 獲取更多關(guān)于 temporal join 的信息。
Flink 支持 processing-time temporal join Hive 表,processing-time temporal join 總是關(guān)聯(lián)最新版本的時(shí)態(tài)表。 Flink 支持 temporal join Hive 的分區(qū)表和非分區(qū)表,對(duì)于分區(qū)表,F(xiàn)link 支持自動(dòng)跟蹤 Hive 表的最新分區(qū)。
注意: Flink 還不支持 event-time temporal join Hive 表。
1)、Temporal Join 最新的分區(qū)
對(duì)于隨時(shí)變化的分區(qū)表,我們可以把它看作是一個(gè)無界流進(jìn)行讀取,如果每個(gè)分區(qū)包含完整數(shù)據(jù),則分區(qū)可以作為時(shí)態(tài)表的一個(gè)版本,時(shí)態(tài)表的版本保存分區(qū)的數(shù)據(jù)。
Flink 支持在使用 processing time temporal join 時(shí)自動(dòng)追蹤最新的分區(qū)(版本),通過 streaming-source.partition-order 定義最新的分區(qū)(版本)。 用戶最常使用的案例就是在 Flink 流作業(yè)中使用 Hive 表作為維度表。
注意: 該特性僅支持 Flink 流模式。
下面的案例演示了經(jīng)典的業(yè)務(wù) pipeline,使用 Hive 中的表作為維度表,它們由每天一次的批任務(wù)或者 Flink 任務(wù)來更新(為方便驗(yàn)證改為每小時(shí)更新一次)。 Kafka 數(shù)據(jù)流來自實(shí)時(shí)在線業(yè)務(wù)數(shù)據(jù)或者日志,該流需要關(guān)聯(lián)維度表以豐富數(shù)據(jù)流。
1、代碼示例
-- 假設(shè) Hive 表中的數(shù)據(jù)每天更新, 每天包含最新和完整的維度數(shù)據(jù)
SET table.sql-dialect=hive;
CREATE TABLE alan_dim_user_table (
u_id STRING,
u_name STRING,
balance DECIMAL(10, 4),
age INT
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
-- 使用默認(rèn)的 partition-name 每12小時(shí)加載最新分區(qū)數(shù)據(jù)(推薦)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-name', -- 有默認(rèn)的配置項(xiàng),可以不填。
-- 使用分區(qū)文件create-time 每12小時(shí)加載最新分區(qū)數(shù)據(jù)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.partition-order' = 'create-time',
'streaming-source.monitor-interval' = '12 h'
-- 使用 partition-time 每12小時(shí)加載最新分區(qū)數(shù)據(jù)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-time',
'partition.time-extractor.kind' = 'default',
'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00'
);
SET table.sql-dialect=hive;
CREATE TABLE alan_dim_user_table (
u_id BIGINT,
u_name STRING,
balance DECIMAL(10, 4),
age INT
) PARTITIONED BY (t_year STRING, t_month STRING, t_day STRING)
row format delimited
fields terminated by ","
TBLPROPERTIES (
-- 使用默認(rèn)的 partition-name 每1小時(shí)加載最新分區(qū)數(shù)據(jù)(推薦)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '1 h',
'streaming-source.partition-order' = 'partition-name'--默認(rèn)的,可以不設(shè)置
);
-- streaming sql, kafka temporal join Hive 維度表. Flink 將在 'streaming-source.monitor-interval' 的間隔內(nèi)自動(dòng)加載最新分區(qū)的數(shù)據(jù)。
SELECT * FROM orders_table AS o
JOIN alan_dim_user_table FOR SYSTEM_TIME AS OF o.proctime AS u
ON o.u_id = u.u_id;
2、flink 驗(yàn)證步驟
-------------------------flink、kafka、hive操作示例----------------------------------
----本示例是在flink版本為1.13.6的環(huán)境驗(yàn)證的----------------------------------
---------1、創(chuàng)建flink 的維表,每小時(shí)更新一次數(shù)據(jù)----------------------------------
Flink SQL> SET table.sql-dialect=hive;
[INFO] Session property has been set.
Flink SQL> show tables;
+--------------+
| table name |
+--------------+
| alan_student |
| student_ext |
| tbl |
| test_change |
| user_dept |
+--------------+
5 rows in set
Flink SQL> CREATE TABLE alan_dim_user_table (
> u_id BIGINT,
> u_name STRING,
> balance DECIMAL(10, 4),
> age INT
> ) PARTITIONED BY (t_year STRING, t_month STRING, t_day STRING)
> row format delimited
> fields terminated by ","
> TBLPROPERTIES (
> -- 使用默認(rèn)的 partition-name 每1小時(shí)加載最新分區(qū)數(shù)據(jù)(推薦)
> 'streaming-source.enable' = 'true',
> 'streaming-source.partition.include' = 'latest',
> 'streaming-source.monitor-interval' = '1 h',
> 'streaming-source.partition-order' = 'partition-name'--默認(rèn)的,可以不設(shè)置
> );
[INFO] Execute statement succeed.
-----------2、hive中手動(dòng)加載數(shù)據(jù),第一次只增加一條數(shù)據(jù)----------------------------------
0: jdbc:hive2://server4:10000> show tables;
+----------------------+
| tab_name |
+----------------------+
| alan_dim_user_table |
| alan_student |
| student_ext |
| tbl |
| test_change |
| user_dept |
+----------------------+
6 rows selected (0.05 seconds)
0: jdbc:hive2://server4:10000> load data inpath '/flinktest/hivetest' into table alan_dim_user_table partition(t_year='2023',t_month='09',t_day='04');
0: jdbc:hive2://server4:10000> select * from alan_dim_user_table;
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
| alan_dim_user_table.u_id | alan_dim_user_table.u_name | alan_dim_user_table.balance | alan_dim_user_table.age | alan_dim_user_table.t_year | alan_dim_user_table.t_month | alan_dim_user_table.t_day |
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
| 1 | alan | 12.2300 | 18 | 2023 | 09 | 04 |
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
-----3、flink 創(chuàng)建事實(shí)表----------------------------------
Flink SQL> SET table.sql-dialect=default;
[INFO] Session property has been set.
Flink SQL> CREATE TABLE alan_fact_order_table (
> o_id STRING,
> o_amount DOUBLE,
> u_id BIGINT, -- 用戶id
> item_id BIGINT, -- 商品id
> action STRING, -- 用戶行為
> ts BIGINT, -- 用戶行為發(fā)生的時(shí)間戳
> proctime as PROCTIME(), -- 通過計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
> `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件時(shí)間
> WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND -- 在eventTime上定義watermark
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test_hive_topic',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testhivegroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
---------4、創(chuàng)建kafka 主題、發(fā)送消息(發(fā)送消息是在flink流式查詢語句后)----------------------------------
[alanchan@server2 bin]$ kafka-topics.sh --delete --topic test_hive_topic --bootstrap-server server1:9092
[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic test_hive_topic --partitions 1 --replication-factor 1
Created topic test_hive_topic.
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive_topic
>1,123.34,1,8001,'b',1693874219248
----------5、flink 流式查詢(觀察維表是否加載出來數(shù)據(jù))----------------------------------
Flink SQL> SELECT
> o.o_id,
> o.u_id,
> o.action,
> o.ts,
> o.event_time,
> u.u_name,
> u.t_year,
> u.t_month,
> u.t_day
> FROM alan_fact_order_table AS o
> JOIN alan_dim_user_table FOR SYSTEM_TIME AS OF o.proctime AS u ON o.u_id = u.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op | o_id | u_id | action | ts | event_time | u_name | t_year | t_month | t_day |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 1 | 1 | 'b' | 1693874219248 | 2023-09-05 00:51:28.407 | alan | 2023 | 09 | 04 |
-------6、hive中加載更多的維度表數(shù)據(jù)(驗(yàn)證維度表是否1小時(shí)更新一次)----------------------------------
0: jdbc:hive2://server4:10000> load data inpath '/flinktest/hivetest2' into table alan_dim_user_table partition(t_year='2023',t_month='09',t_day='05');
No rows affected (0.194 seconds)
0: jdbc:hive2://server4:10000> select * from alan_dim_user_table;
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
| alan_dim_user_table.u_id | alan_dim_user_table.u_name | alan_dim_user_table.balance | alan_dim_user_table.age | alan_dim_user_table.t_year | alan_dim_user_table.t_month | alan_dim_user_table.t_day |
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
| 1 | alan | 12.2300 | 18 | 2023 | 09 | 04 |
| 2 | alanchan | 22.2300 | 10 | 2023 | 09 | 05 |
| 3 | alanchanchn | 32.2300 | 28 | 2023 | 09 | 05 |
| 4 | alan_chan | 12.4300 | 29 | 2023 | 09 | 05 |
| 5 | alan_chan_chn | 52.2300 | 38 | 2023 | 09 | 05 |
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
5 rows selected (0.143 seconds)
--------------7、kafka中繼續(xù)發(fā)送消息,然后觀察flink流式查詢結(jié)果的變化----------------------------------
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive_topic
>1,123.34,1,8001,'b',1693874219248-----------該數(shù)據(jù)上文中已經(jīng)發(fā)送過,為了表示數(shù)據(jù)的連續(xù)性,沒有刪除
>20,321.34,3,9001,'a',1693874222274
>30,41.34,5,7001,'c',1693874223285
>50,666.66,2,3001,'d',1693875816640
--------------8、kafka發(fā)送消息后,flink流式查詢結(jié)果----------------------------------
Flink SQL> SELECT
> o.o_id,
> o.u_id,
> o.action,
> o.ts,
> o.event_time,
> u.u_name,
> u.t_year,
> u.t_month,
> u.t_day
> FROM alan_fact_order_table AS o
> JOIN alan_dim_user_table FOR SYSTEM_TIME AS OF o.proctime AS u ON o.u_id = u.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op | o_id | u_id | action | ts | event_time | u_name | t_year | t_month | t_day |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 20 | 3 | 'a' | 1693874222274 | 2023-09-05 00:54:49.526 | alanchanchn | 2023 | 09 | 05 |
| +I | 30 | 5 | 'c' | 1693874223285 | 2023-09-05 00:55:55.461 | alan_chan_chn | 2023 | 09 | 05 |
| +I | 50 | 2 | 'd' | 1693875816640 | 2023-09-05 01:07:23.891 | alanchan | 2023 | 09 | 05 |
--------------9、hive維表數(shù)據(jù)不變化,kafka再次發(fā)送消息,觀察flink流式查詢結(jié)果----------------------------------
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive_topic
>1,123.34,1,8001,'b',1693874219248
>20,321.34,3,9001,'a',1693874222274
>30,41.34,5,7001,'c',1693874223285
>50,666.66,2,3001,'d',1693875816640
>60,666.66,4,3001,'e',1693880868579
>
--------------10、hive維表數(shù)據(jù)不變化,kafka再次發(fā)送消息后,觀察flink流式查詢結(jié)果(還是原來的查詢界面)---------------
Flink SQL> SELECT
> o.o_id,
> o.u_id,
> o.action,
> o.ts,
> o.event_time,
> u.u_name,
> u.t_year,
> u.t_month,
> u.t_day
> FROM alan_fact_order_table AS o
> JOIN alan_dim_user_table FOR SYSTEM_TIME AS OF o.proctime AS u ON o.u_id = u.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op | o_id | u_id | action | ts | event_time | u_name | t_year | t_month | t_day |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 20 | 3 | 'a' | 1693874222274 | 2023-09-05 00:54:49.526 | alanchanchn | 2023 | 09 | 05 |
| +I | 30 | 5 | 'c' | 1693874223285 | 2023-09-05 00:55:55.461 | alan_chan_chn | 2023 | 09 | 05 |
| +I | 50 | 2 | 'd' | 1693875816640 | 2023-09-05 01:07:23.891 | alanchan | 2023 | 09 | 05 |
| +I | 60 | 4 | 'e' | 1693880868579 | 2023-09-05 02:30:58.368 | alan_chan | 2023 | 09 | 05 |
---及時(shí)查出了數(shù)據(jù)的變化-------------------
2)、Temporal Join 最新的表
對(duì)于 Hive 表,我們可以把它看作是一個(gè)無界流進(jìn)行讀取,在這個(gè)案例中,當(dāng)我們查詢時(shí)只能去追蹤最新的版本。 最新版本的表保留了 Hive 表的所有數(shù)據(jù)。
當(dāng) temporal join 最新的 Hive 表,Hive 表會(huì)緩存到 Slot 內(nèi)存中,并且數(shù)據(jù)流中的每條記錄通過 key 去關(guān)聯(lián)表找到對(duì)應(yīng)的匹配項(xiàng)。 使用最新的 Hive 表作為時(shí)態(tài)表不需要額外的配置。作為可選項(xiàng),您可以使用以下配置項(xiàng)配置 Hive 表緩存的 TTL。當(dāng)緩存失效,Hive 表會(huì)重新掃描并加載最新的數(shù)據(jù)。
下面的案例演示加載 Hive 表的所有數(shù)據(jù)作為時(shí)態(tài)表。
1、代碼示例
-- 假設(shè) Hive 表中的數(shù)據(jù)被批處理 pipeline 覆蓋。
SET table.sql-dialect=hive;
CREATE TABLE alan_dim_user_table2 (
u_id BIGINT,
u_name STRING,
balance DECIMAL(10, 4),
age INT
)
row format delimited
fields terminated by ","
TBLPROPERTIES (
'streaming-source.enable' = 'false', -- 有默認(rèn)的配置項(xiàng),可以不填。
'streaming-source.partition.include' = 'all', -- 有默認(rèn)的配置項(xiàng),可以不填。
'lookup.join.cache.ttl' = '1 h'
);
SET table.sql-dialect=default;
CREATE TABLE alan_fact_order_table2 (
o_id STRING,
o_amount DOUBLE,
u_id BIGINT, -- 用戶id
item_id BIGINT, -- 商品id
action STRING, -- 用戶行為
ts BIGINT, -- 用戶行為發(fā)生的時(shí)間戳
proctime as PROCTIME() -- 通過計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
) WITH (
'connector' = 'kafka',
'topic' = 'test_hive2_topic',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'properties.group.id' = 'testhivegroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
-- streaming sql, kafka join Hive 維度表. 當(dāng)緩存失效時(shí) Flink 會(huì)加載維度表的所有數(shù)據(jù)。
SELECT
o.o_id,
o.u_id,
o.action,
o.ts,
o.proctime,
dim.u_name,
dim.age,
dim.balance
FROM alan_fact_order_table2 AS o
JOIN alan_dim_user_table2 FOR SYSTEM_TIME AS OF o.proctime AS dim
ON o.u_id = dim.u_id;
2、flink驗(yàn)證步驟
----本示例是在flink版本為1.13.6的環(huán)境驗(yàn)證的----------------------------------
----本示例ttl設(shè)置為1小時(shí),方便驗(yàn)證----------------------------------
----1、flink創(chuàng)建維表----------------------------------
Flink SQL> show tables;
+-----------------------+
| table name |
+-----------------------+
| alan_dim_user_table |
| alan_fact_order_table |
| alan_student |
| student_ext |
| tbl |
| test_change |
| user_dept |
+-----------------------+
7 rows in set
Flink SQL> SET table.sql-dialect=hive;
[INFO] Session property has been set.
Flink SQL> CREATE TABLE alan_dim_user_table2 (
> u_id BIGINT,
> u_name STRING,
> balance DECIMAL(10, 4),
> age INT
> )
> row format delimited
> fields terminated by ","
> TBLPROPERTIES (
> 'streaming-source.enable' = 'false', -- 有默認(rèn)的配置項(xiàng),可以不填。
> 'streaming-source.partition.include' = 'all', -- 有默認(rèn)的配置項(xiàng),可以不填。
> 'lookup.join.cache.ttl' = '1 h'
> );
[INFO] Execute statement succeed.
----2、hive中對(duì)維表插入數(shù)據(jù)----------------------------------
0: jdbc:hive2://server4:10000> load data inpath '/flinktest/hivetest' into table alan_dim_user_table2;
No rows affected (0.139 seconds)
0: jdbc:hive2://server4:10000> select * from alan_dim_user_table2;
+----------------------------+------------------------------+-------------------------------+---------------------------+
| alan_dim_user_table2.u_id | alan_dim_user_table2.u_name | alan_dim_user_table2.balance | alan_dim_user_table2.age |
+----------------------------+------------------------------+-------------------------------+---------------------------+
| 1 | alan | 12.2300 | 18 |
| 2 | alanchan | 22.2300 | 10 |
| 3 | alanchanchn | 32.2300 | 28 |
+----------------------------+------------------------------+-------------------------------+---------------------------+
3 rows selected (0.124 seconds)
----3、flink中創(chuàng)建事實(shí)表----------------------------------
Flink SQL> SET table.sql-dialect=default;
Hive Session ID = 4d502166-65b7-4079-af12-35919101ed8d
[INFO] Session property has been set.
Flink SQL> CREATE TABLE alan_fact_order_table2 (
> o_id STRING,
> o_amount DOUBLE,
> u_id BIGINT, -- 用戶id
> item_id BIGINT, -- 商品id
> action STRING, -- 用戶行為
> ts BIGINT, -- 用戶行為發(fā)生的時(shí)間戳
> proctime as PROCTIME() -- 通過計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test_hive2_topic',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testhivegroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
----4、創(chuàng)建kafka topic,并發(fā)送數(shù)據(jù)----------------------------------
[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic test_hive2_topic --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic test_hive2_topic.
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive2_topic
>1,123.34,1,8001,'b',1693887925763
>30,41.34,5,7001,'c',1693874222274
>30,41.34,5,7001,'c',1693887926780
>20,321.34,3,9001,'a',1693887928801
>50,666.66,2,3001,'d',1693887927790
----5、flink中查詢,觀察查詢結(jié)果----------------------------------
Flink SQL> SELECT
> o.o_id,
> o.u_id,
> o.action,
> o.ts,
> o.proctime,
> dim.u_name,
> dim.age,
> dim.balance
> FROM alan_fact_order_table2 AS o
> JOIN alan_dim_user_table2 FOR SYSTEM_TIME AS OF o.proctime AS dim
> ON o.u_id = dim.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| op | o_id | u_id | action | ts | proctime | u_name | age | balance |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| +I | 1 | 1 | 'b' | 1693887925763 | 2023-09-05 04:24:47.825 | alan | 18 | 12.2300 |
| +I | 20 | 3 | 'a' | 1693887928801 | 2023-09-05 04:26:06.437 | alanchanchn | 28 | 32.2300 |
| +I | 50 | 2 | 'd' | 1693887927790 | 2023-09-05 04:26:46.404 | alanchan | 10 | 22.2300 |
----6、在hive中加載新的數(shù)據(jù),kafka中發(fā)送新的消息,觀察flink的查詢結(jié)果----------------------------------
0: jdbc:hive2://server4:10000> load data inpath '/flinktest/hivetest' into table alan_dim_user_table2;
No rows affected (0.129 seconds)
0: jdbc:hive2://server4:10000> select * from alan_dim_user_table2;
+----------------------------+------------------------------+-------------------------------+---------------------------+
| alan_dim_user_table2.u_id | alan_dim_user_table2.u_name | alan_dim_user_table2.balance | alan_dim_user_table2.age |
+----------------------------+------------------------------+-------------------------------+---------------------------+
| 1 | alan | 12.2300 | 18 |
| 2 | alanchan | 22.2300 | 10 |
| 3 | alanchanchn | 32.2300 | 28 |
| 4 | alan_chan | 12.4300 | 29 |
| 5 | alan_chan_chn | 52.2300 | 38 |
+----------------------------+------------------------------+-------------------------------+---------------------------+
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive2_topic
>1,123.34,1,8001,'b',1693887925763
>30,41.34,5,7001,'c',1693874222274
>30,41.34,5,7001,'c',1693887926780
>20,321.34,3,9001,'a',1693887928801
>50,666.66,2,3001,'d',1693887927790
>30,41.34,5,7001,'c',1693887926780-----該條數(shù)據(jù)在flink的查詢結(jié)果中沒有顯示
Flink SQL> SELECT
> o.o_id,
> o.u_id,
> o.action,
> o.ts,
> o.proctime,
> dim.u_name,
> dim.age,
> dim.balance
> FROM alan_fact_order_table2 AS o
> JOIN alan_dim_user_table2 FOR SYSTEM_TIME AS OF o.proctime AS dim
> ON o.u_id = dim.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| op | o_id | u_id | action | ts | proctime | u_name | age | balance |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| +I | 1 | 1 | 'b' | 1693887925763 | 2023-09-05 04:24:47.825 | alan | 18 | 12.2300 |
| +I | 20 | 3 | 'a' | 1693887928801 | 2023-09-05 04:26:06.437 | alanchanchn | 28 | 32.2300 |
| +I | 50 | 2 | 'd' | 1693887927790 | 2023-09-05 04:26:46.404 | alanchan | 10 | 22.2300 |
----7、ttl過期后,再在kafka中發(fā)送新的消息,觀察flink的查詢結(jié)果----------------------------------
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive2_topic
...,下面2條數(shù)據(jù)是TTL過期后發(fā)送的,如預(yù)期一樣查出來了結(jié)果
>30,41.34,5,7001,'c',1693893016308
>1,123.34,1,8001,'b',1693893020334
Flink SQL> SELECT
> o.o_id,
> o.u_id,
> o.action,
> o.ts,
> o.proctime,
> dim.u_name,
> dim.age,
> dim.balance
> FROM alan_fact_order_table2 AS o
> JOIN alan_dim_user_table2 FOR SYSTEM_TIME AS OF o.proctime AS dim
> ON o.u_id = dim.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| op | o_id | u_id | action | ts | proctime | u_name | age | balance |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| +I | 1 | 1 | 'b' | 1693887925763 | 2023-09-05 04:24:47.825 | alan | 18 | 12.2300 |
| +I | 20 | 3 | 'a' | 1693887928801 | 2023-09-05 04:26:06.437 | alanchanchn | 28 | 32.2300 |
| +I | 50 | 2 | 'd' | 1693887927790 | 2023-09-05 04:26:46.404 | alanchan | 10 | 22.2300 |
| +I | 30 | 5 | 'c' | 1693893016308 | 2023-09-05 05:49:47.984 | alan_chan_chn | 38 | 52.2300 |
| +I | 1 | 1 | 'b' | 1693893020334 | 2023-09-05 05:50:23.696 | alan | 18 | 12.2300 |
------以上,完成了驗(yàn)證
每個(gè)參與 join 的 subtask 需要在他們的緩存中保留 Hive 表。請(qǐng)確保 Hive 表可以放到 TM task slot 中。
建議把這兩個(gè)選項(xiàng)配置成較大的值 streaming-source.monitor-interval(最新的分區(qū)作為時(shí)態(tài)表) 和 lookup.join.cache.ttl(所有的分區(qū)作為時(shí)態(tài)表)。否則,任務(wù)會(huì)頻繁更新和加載表,容易出現(xiàn)性能問題。
目前(截至flink 1.17版本),緩存刷新的時(shí)候會(huì)重新加載整個(gè) Hive 表,所以沒有辦法區(qū)分?jǐn)?shù)據(jù)是新數(shù)據(jù)還是舊數(shù)據(jù)。
3、寫入hive數(shù)據(jù)
Flink 支持批和流兩種模式往 Hive 中寫入數(shù)據(jù),當(dāng)作為批程序,只有當(dāng)作業(yè)完成時(shí),F(xiàn)link 寫入 Hive 表的數(shù)據(jù)才能被看見。批模式寫入支持追加到現(xiàn)有的表或者覆蓋現(xiàn)有的表。
1)、代碼示例1
# ------ INSERT INTO 將追加到表或者分區(qū),保證數(shù)據(jù)的完整性 ------
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;
# ------ INSERT OVERWRITE 將覆蓋表或者分區(qū)中所有已經(jīng)存在的數(shù)據(jù) ------
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;
2)、flink驗(yàn)證步驟
-------------flink 1.13.6環(huán)境中操作示例---------
Flink SQL> CREATE TABLE alan_w_user_table (
> u_id BIGINT,
> u_name STRING,
> balance DECIMAL(10, 4),
> age INT
> )
> row format delimited
> fields terminated by ","
> ;
Hive Session ID = 30451c4a-5ca9-470c-9274-9ecf5330c76d
[INFO] Execute statement succeed.
Flink SQL> show tables;
Hive Session ID = 8c5f20ac-989e-423c-b936-d8274ceff5b1
+------------------------+
| table name |
+------------------------+
| alan_dim_user_table |
| alan_dim_user_table2 |
| alan_fact_order_table |
| alan_fact_order_table2 |
| alan_student |
| alan_w_user_table |
| student_ext |
| tbl |
| test_change |
| user_dept |
+------------------------+
10 rows in set
Flink SQL> INSERT INTO alan_w_user_table values (1,'alan',12.4,18);
Job ID: ea03b7c37aca92197c608da292cbb8f3
Flink SQL> select * from alan_w_user_table;
+----+----------------------+--------------------------------+--------------+-------------+
| op | u_id | u_name | balance | age |
+----+----------------------+--------------------------------+--------------+-------------+
| +I | 1 | alan | 12.4000 | 18 |
+----+----------------------+--------------------------------+--------------+-------------+
Received a total of 1 row
-----flink streaming模式下是不支持insert overwrite的,需要設(shè)置為batch模式
Flink SQL> INSERT OVERWRITE alan_w_user_table values (1,'alanchan',22.4,19);
Hive Session ID = 58ec8fbd-aa1b-40c1-ab09-6da083e6327e
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.
-----默認(rèn)為streaming模式,設(shè)置為batch模式
Flink SQL> SET execution.runtime-mode = batch;
Hive Session ID = 3eb977f9-1036-42e3-8b0f-22c2357706fc
[INFO] Session property has been set.
------flink batch模式下,不能開啟checkpoint,需要關(guān)閉checkpoint才能支持batch job,
Flink SQL> INSERT OVERWRITE alan_w_user_table values (1,'alanchan',22.4,19);
Hive Session ID = 5b2db357-5c12-44a0-8159-f6f18ba5fbea
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Checkpoint is not supported for batch jobs.
----------此處只是為了演示insert into與insert overwrite的區(qū)別,此區(qū)別與hive中的一致,此處不再贅述,詳見hive專欄的部分
還可以將數(shù)據(jù)插入到特定的分區(qū)中。
3)、代碼示例2
# ------ 插入靜態(tài)分區(qū) ------
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
# ------ 插入動(dòng)態(tài)分區(qū) ------
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
# ------ 插入靜態(tài)(my_type)和動(dòng)態(tài)(my_date)分區(qū) ------
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';
4)、flink驗(yàn)證步驟
------------------------flink 1.13.6環(huán)境中操作示例---------------------------------------------
----------靜態(tài)分區(qū),插入數(shù)據(jù)----------
Flink SQL> SET table.sql-dialect=hive;
[INFO] Session property has been set.
Flink SQL> CREATE TABLE alan_wp_user_table (
> u_id BIGINT,
> u_name STRING,
> balance DECIMAL(10, 4),
> age INT
> ) PARTITIONED BY (dt STRING,hr STRING)
> row format delimited
> fields terminated by ","
> TBLPROPERTIES (
> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> 'sink.partition-commit.trigger'='partition-time',
> 'sink.partition-commit.delay'='10 s',
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> );
[INFO] Execute statement succeed.
Flink SQL> INSERT into alan_wp_user_table PARTITION (dt='2023-09-05', hr = '05') values (1,'alan',12.4,18);
Job ID: 8b88ccfb6e6e47a79334e79bbc946389
Flink SQL> select * from alan_wp_user_table;
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op | u_id | u_name | balance | age | dt | hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I | 1 | alan | 12.4000 | 18 | 2023-09-05 | 05 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
Received a total of 1 row
---------另外一種插入方式----------
Flink SQL> INSERT into alan_wp_user_table PARTITION (dt='2023-09-05', hr = '05') SELECT 2,'alanchan', 25.8,19;
Job ID: 93dbf92c01e41c245a38fb5776eb7d59
Flink SQL> select * from alan_wp_user_table;
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op | u_id | u_name | balance | age | dt | hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I | 2 | alanchan | 25.8000 | 19 | 2023-09-05 | 05 |
| +I | 1 | alan | 12.4000 | 18 | 2023-09-05 | 05 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
------ 插入動(dòng)態(tài)分區(qū) ------
INSERT into alan_wp_user_table SELECT 3,'alanchanchn', 35.8,29, '2023-09-05', '05';
Flink SQL> INSERT into alan_wp_user_table PARTITION (dt='2023-09-05', hr = '05') values (1,'alan',12.4,18);
------如果hive中查得到數(shù)據(jù),flink sql中查不到數(shù)據(jù),flink sql cli 中執(zhí)行 SET table.sql-dialect=hive;命令再查即可
Flink SQL> select * from alan_wp_user_table;
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op | u_id | u_name | balance | age | dt | hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I | 1 | alan | 12.4000 | 18 | 2023-09-05 | 05 |
| +I | 2 | alanchan | 25.8000 | 19 | 2023-09-05 | 05 |
| +I | 3 | alanchanchn | 35.8000 | 29 | 2023-09-05 | 05 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
------ 插入靜態(tài)(my_type)和動(dòng)態(tài)(my_date)分區(qū) ------
------該種插入方式需要是batch模式,batch模式不支持checkpoint ,該種情況沒有進(jìn)一步驗(yàn)證
Flink SQL> SET execution.runtime-mode = batch;
[INFO] Session property has been set.
Flink SQL> INSERT OVERWRITE alan_wp_user_table PARTITION (dt='2023-09-05') SELECT 4,'alan_chanchn', 45.8,39, '06';
Hive Session ID = 26829c28-8581-4bf4-b4f7-bea17042e6de
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Checkpoint is not supported for batch jobs.
流寫會(huì)不斷的往 Hive 中添加新數(shù)據(jù),提交記錄使它們可見。用戶可以通過幾個(gè)屬性控制如何觸發(fā)提交。
流寫不支持 Insert overwrite
5)、代碼示例3
下面的案例演示如何流式地從 Kafka 寫入 Hive 表并執(zhí)行分區(qū)提交,然后運(yùn)行一個(gè)批處理查詢將數(shù)據(jù)讀出來。
---創(chuàng)建hive表
SET table.sql-dialect=hive;
CREATE TABLE alan_hive_user_table (
u_id BIGINT,
u_name STRING,
balance DECIMAL(10, 4),
age INT
) PARTITIONED BY (dt STRING,hr STRING)
row format delimited
fields terminated by ","
TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='10 s',
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.rolling-policy.rollover-interval'='5s',
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai' -- 假設(shè)用戶配置的時(shí)區(qū)為 'Asia/Shanghai',
);
---創(chuàng)建kafka表
SET table.sql-dialect=default;
CREATE TABLE alan_kafka_table (
u_id BIGINT,
u_name STRING,
balance DECIMAL(10, 4),
age INT,
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件時(shí)間
WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND -- 在eventTime上定義watermark
) WITH (
'connector' = 'kafka',
'topic' = 'alan_kafka_hive_topic',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
-- 流式sql 插入hive數(shù)據(jù)庫
INSERT INTO alan_hive_user_table
SELECT u_id, u_name,balance,age, DATE_FORMAT(`event_time`, 'yyyy-MM-dd'), DATE_FORMAT(`event_time`, 'HH')
FROM alan_kafka_table;
-- 批處理sql ,按照分區(qū)查詢
SELECT * FROM alan_hive_user_table WHERE dt='2023-09-05' and hr='07';
6)、flink驗(yàn)證步驟
-----設(shè)置運(yùn)行環(huán)境
Flink SQL> SET execution.runtime-mode = streaming;
[INFO] Session property has been set.
----設(shè)置hive方言
Flink SQL> SET table.sql-dialect=hive;
Hive Session ID = b64d5e77-1f0e-4480-a680-0f7ebf7e34c4
[INFO] Session property has been set.
-----創(chuàng)建hive表
Flink SQL> CREATE TABLE alan_hive_user_table (
> u_id BIGINT,
> u_name STRING,
> balance DECIMAL(10, 4),
> age INT
> ) PARTITIONED BY (dt STRING,hr STRING)
> row format delimited
> fields terminated by ","
> TBLPROPERTIES (
> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> 'sink.partition-commit.trigger'='partition-time',
> 'sink.partition-commit.delay'='10 s',
> 'sink.partition-commit.policy.kind'='metastore,success-file',
> 'sink.rolling-policy.rollover-interval'='5s',
> 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai' -- 假設(shè)用戶配置的時(shí)區(qū)為 'Asia/Shanghai',
> );
[INFO] Execute statement succeed.
----設(shè)置flink 默認(rèn)方言
Flink SQL> SET table.sql-dialect=default;
[INFO] Session property has been set.
------創(chuàng)建kafka表
Flink SQL> CREATE TABLE alan_kafka_table (
> u_id BIGINT,
> u_name STRING,
> balance DECIMAL(10, 4),
> age INT,
> `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件時(shí)間
> WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND -- 在eventTime上定義watermark
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alan_kafka_hive_topic',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
------流式sql ,按照分區(qū)流式插入數(shù)據(jù),也即flink的一個(gè)任務(wù)
Flink SQL> INSERT INTO alan_hive_user_table
> SELECT u_id, u_name,balance,age, DATE_FORMAT(`event_time`, 'yyyy-MM-dd'), DATE_FORMAT(`event_time`, 'HH')
> FROM alan_kafka_table;
Job ID: 95fceba5540315957ed7d0b873461e43
-----kafka 發(fā)送數(shù)據(jù)
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_kafka_hive_topic
>1,'alan',123.34,18
>2,'alanchan',223.34,28
>
---flink sql 查詢數(shù)據(jù),kafka發(fā)送一次查詢一次
Flink SQL> select * from alan_hive_user_table where dt='2023-09-05' and hr='07';
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op | u_id | u_name | balance | age | dt | hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I | 1 | 'alan' | 123.3400 | 18 | 2023-09-05 | 07 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
Received a total of 1 row
Flink SQL> select * from alan_hive_user_table where dt='2023-09-05' and hr='07';
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op | u_id | u_name | balance | age | dt | hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I | 1 | 'alan' | 123.3400 | 18 | 2023-09-05 | 07 |
| +I | 2 | 'alanchan' | 223.3400 | 28 | 2023-09-05 | 07 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
Received a total of 2 rows
如果在 TIMESTAMP_LTZ 列定義了 watermark 并且使用 partition-time 提交,需要對(duì) sink.partition-commit.watermark-time-zone 設(shè)置會(huì)話時(shí)區(qū),否則分區(qū)提交會(huì)發(fā)生在幾個(gè)小時(shí)后。
下面的示例可以參考16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)中的示例,區(qū)別在于connector不同,實(shí)際設(shè)置一樣,不再贅述。
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假設(shè)用戶配置的時(shí)區(qū)是 'Asia/Shanghai'。
'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
ts BIGINT, -- time in epoch milliseconds
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列聲明 watermark。
) WITH (...);
-- streaming sql, insert into hive table
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH')
FROM kafka_table;
-- batch sql, select with partition pruning
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
默認(rèn)情況下,對(duì)于流,F(xiàn)link 僅支持重命名 committers,對(duì)于 S3 文件系統(tǒng)不支持流寫的 exactly-once 語義。 通過將以下參數(shù)設(shè)置為 false,可以實(shí)現(xiàn) exactly-once 寫入 S3。 這會(huì)調(diào)用 Flink 原生的 writer ,但是僅針對(duì) parquet 和 orc 文件類型有效。 這個(gè)配置項(xiàng)可以在 TableConfig 中配置,該配置項(xiàng)對(duì)作業(yè)的所有 sink 都生效。
7)、動(dòng)態(tài)分區(qū)的寫入
不同于靜態(tài)分區(qū)的寫入總是需要用戶指定分區(qū)列的值,動(dòng)態(tài)分區(qū)允許用戶在寫入數(shù)據(jù)的時(shí)候不指定分區(qū)列的值。 比如,有這樣一個(gè)分區(qū)表:
CREATE TABLE alan_wp_user_table (
u_id BIGINT,
u_name STRING,
balance DECIMAL(10, 4),
age INT
) PARTITIONED BY (dt STRING,hr STRING)
row format delimited
fields terminated by ","
TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='10 s',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
用戶可以使用如下的 SQL 語句向該分區(qū)表寫入數(shù)據(jù):
INSERT into alan_wp_user_table SELECT 3,'alanchanchn', 35.8,29, '2023-09-05', '05';
在該 SQL 語句中,用戶沒有指定分區(qū)列的值,這就是一個(gè)典型的動(dòng)態(tài)分區(qū)寫入的例子。
默認(rèn)情況下, 如果是動(dòng)態(tài)分區(qū)的寫入, 在實(shí)際寫入目標(biāo)表之前,F(xiàn)link 將額外對(duì)數(shù)據(jù)按照動(dòng)態(tài)分區(qū)列進(jìn)行排序。 這就意味著 sink 節(jié)點(diǎn)收到的數(shù)據(jù)都是按分區(qū)排序的,即首先收到一個(gè)分區(qū)的數(shù)據(jù),然后收到另一個(gè)分區(qū)的數(shù)據(jù),不同分區(qū)的數(shù)據(jù)不會(huì)混在一起。 這樣 Hive sink 節(jié)點(diǎn)就可以一次只維護(hù)一個(gè)分區(qū)的 writer,否則,Hive sink 需要維護(hù)收到的數(shù)據(jù)對(duì)應(yīng)的所有分區(qū)的 writer,如果分區(qū)的 writer 過多的話,則可能會(huì)導(dǎo)致內(nèi)存溢出(OutOfMemory)異常。
為了避免額外的排序,你可以將作業(yè)的配置項(xiàng) table.exec.hive.sink.sort-by-dynamic-partition.enable(默認(rèn)是 true)設(shè)置為 false。 但是這種配置下,如之前所述,如果單個(gè) sink 節(jié)點(diǎn)收到的動(dòng)態(tài)分區(qū)數(shù)過多的話,則有可能會(huì)出現(xiàn)內(nèi)存溢出的異常。
如果數(shù)據(jù)傾斜不嚴(yán)重的話,你可以在 SQL 語句中添加 DISTRIBUTED BY <partition_field> 將相同分區(qū)的數(shù)據(jù)分布到到相同的 sink 節(jié)點(diǎn)上來緩解單個(gè) sink 節(jié)點(diǎn)的分區(qū) writer 過多的問題。
此外,你也可以在 SQL 語句中添加 DISTRIBUTED BY <partition_field> 來達(dá)到將 table.exec.hive.sink.sort-by-dynamic-partition.enable 設(shè)置為 false 的效果。
該配置項(xiàng) table.exec.hive.sink.sort-by-dynamic-partition.enable 只在批模式下生效。
目前(截至1.17版本),只有在 Flink 批模式下使用了 Hive 方言,才可以使用 DISTRIBUTED BY 和 SORTED BY。
8)、自動(dòng)收集統(tǒng)計(jì)信息
在使用 Flink 寫入 Hive 表的時(shí)候,F(xiàn)link 將默認(rèn)自動(dòng)收集寫入數(shù)據(jù)的統(tǒng)計(jì)信息然后將其提交至 Hive metastore 中。 但在某些情況下,你可能不想自動(dòng)收集統(tǒng)計(jì)信息,因?yàn)槭占@些統(tǒng)計(jì)信息可能會(huì)花費(fèi)一定的時(shí)間。 為了避免 Flink 自動(dòng)收集統(tǒng)計(jì)信息,你可以設(shè)置作業(yè)參數(shù) table.exec.hive.sink.statistic-auto-gather.enable (默認(rèn)是 true) 為 false。
如果寫入的 Hive 表是以 Parquet 或者 ORC 格式存儲(chǔ)的時(shí)候,numFiles/totalSize/numRows/rawDataSize 這些統(tǒng)計(jì)信息可以被 Flink 收集到。 否則, 只有 numFiles/totalSize 可以被收集到。
對(duì)于 Parquet 或者 ORC 格式的表,為了快速收集到統(tǒng)計(jì)信息 numRows/rawDataSize, Flink 只會(huì)讀取文件的 footer。但是在文件數(shù)量很多的情況下,這可能也會(huì)比較耗時(shí),你可以通過 設(shè)置作業(yè)參數(shù) table.exec.hive.sink.statistic-auto-gather.thread-num(默認(rèn)是 3)為一個(gè)更大的值來加快統(tǒng)計(jì)信息的收集。
只有批模式才支持自動(dòng)收集統(tǒng)計(jì)信息,流模式目前還不支持自動(dòng)收集統(tǒng)計(jì)信息。
9)、文件合并
在使用 Flink 寫 Hive 表的時(shí)候,F(xiàn)link 也支持自動(dòng)對(duì)小文件進(jìn)行合并以減少小文件的數(shù)量。
-
Stream Mode
流模式下,合并小文件的行為與寫 文件系統(tǒng) 一樣,更多細(xì)節(jié)請(qǐng)參考 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1) -
Batch Mode
在批模式,并且自動(dòng)合并小文件已經(jīng)開啟的情況下,在結(jié)束寫 Hive 表后,F(xiàn)link 會(huì)計(jì)算每個(gè)分區(qū)下文件的平均大小,如果文件的平均大小小于用戶指定的一個(gè)閾值,F(xiàn)link 則會(huì)將這些文件合并成指定大小的文件。下面是文件合并涉及到的參數(shù):
4、格式
Flink 對(duì) Hive 的集成已經(jīng)在如下的文件格式進(jìn)行了測試:文章來源:http://www.zghlxwxcb.cn/news/detail-695368.html
- Text
- CSV
- SequenceFile
- ORC
- Parquet
hive的文件格式的設(shè)置方式與直接在hive中設(shè)置方式一樣,不再贅述。具體可以參考
3、hive的使用示例詳解-建表、數(shù)據(jù)類型詳解、內(nèi)部外部表、分區(qū)表、分桶表
4、hive的使用示例詳解-事務(wù)表、視圖、物化視圖、DDL(數(shù)據(jù)庫、表以及分區(qū))管理詳細(xì)操作
以上,詳細(xì)的介紹了Flink 與hive的集成、通過flink sql讀寫hive數(shù)據(jù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-695368.html
到了這里,關(guān)于43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!