国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例

這篇具有很好參考價(jià)值的文章主要介紹了43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文詳細(xì)的介紹了Flink 與hive的集成、通過flink sql讀寫hive數(shù)據(jù)。
本文依賴有hadoop、hive、kafka、flink等所有環(huán)境可用。
本分分為4個(gè)部分,即讀取hive數(shù)據(jù)、時(shí)態(tài)表的應(yīng)用、寫入hive中數(shù)據(jù)和文件格式。
本示例中hive的版本是3.1.2
flink的驗(yàn)證版本是1.13.6
hadoop的版本是3.1.4
kafka的版本是2.12-3.0.0

一、Hive的讀寫介紹

Apache Flink 可通過 HiveCatalog對(duì) Apache Hive 表的統(tǒng)一 BATCH 和 STREAM 處理。這意味著 Flink 可以用作 Hive 批處理引擎的高性能替代方案,或者連續(xù)地將數(shù)據(jù)寫入和讀出 Hive 表,以支持實(shí)時(shí)數(shù)據(jù)倉庫應(yīng)用程序。

1、讀hive數(shù)據(jù)

Flink 支持在 BATCH 和 STREAMING 模式下從 Hive 讀取數(shù)據(jù)。當(dāng)作為 BATCH 應(yīng)用程序運(yùn)行時(shí),F(xiàn)link 會(huì)在執(zhí)行查詢的時(shí)間點(diǎn)對(duì)表的狀態(tài)執(zhí)行查詢。流式讀取將持續(xù)監(jiān)視表,并在新數(shù)據(jù)可用時(shí)以增量方式獲取新數(shù)據(jù)。Flink 將默認(rèn)讀取有界的表。

流式處理讀取支持同時(shí)使用分區(qū)表和非分區(qū)表。對(duì)于分區(qū)表,F(xiàn)link 會(huì)監(jiān)控新分區(qū)的生成,并在可用時(shí)增量讀取它們。對(duì)于非分區(qū)表,F(xiàn)link 會(huì)監(jiān)控文件夾中新文件的生成,并增量讀取新文件。
43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例,# Flink專欄,flink,hive,大數(shù)據(jù),flink hive,flink kafka,flink sql,flink 流批一體化
SQL Hints可用于將配置應(yīng)用于 Hive 表,而無需更改其在 Hive 元存儲(chǔ)中的定義。
示例如下:

SELECT * 
FROM hive_table 
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2023-08-20') */;

注意:

  • 監(jiān)控策略是掃描當(dāng)前位置路徑中的所有目錄/文件。許多分區(qū)可能會(huì)導(dǎo)致性能下降。
  • 非分區(qū)表的流式讀取要求以原子方式將每個(gè)文件寫入目標(biāo)目錄。
  • 分區(qū)表的流式讀取要求在 Hive 元存儲(chǔ)視圖中以原子方式添加每個(gè)分區(qū)。否則,將使用添加到現(xiàn)有分區(qū)的新數(shù)據(jù)。
  • 流式讀取不支持 Flink DDL 中的水印語法。這些表不能用于窗口運(yùn)算符。

1)、讀取hive的視圖

Flink 能夠從 Hive 定義的視圖中讀取數(shù)據(jù),但存在一些限制:

  • 當(dāng)前的catalog必須被設(shè)置為hivecatalog,設(shè)置方式有兩種,即 TableAPI:tenv.useCatalog(“alan_hivecatalog”) 和SQLcli:use catalog alan_hivecatalog
  • Hive 和 Flink SQL 有不同的語法,例如不同的保留關(guān)鍵字和文字。確保視圖的查詢與 Flink 語法兼容。

2)、讀取時(shí)矢量化優(yōu)化(Vectorized Optimization upon Read)

當(dāng)滿足以下條件時(shí),F(xiàn)link 將自動(dòng)使用 Hive 表的矢量化讀?。?/p>

  • 數(shù)據(jù)格式是ORC或parquet時(shí)
  • 字段沒有復(fù)雜數(shù)據(jù)類型時(shí),如List、Map、Struct和Union

默認(rèn)情況下啟用此功能??梢允褂靡韵屡渲媒盟?。

table.exec.hive.fallback-mapred-reader=true

3)、Source 并發(fā)推理(Source Parallelism Inference)

默認(rèn)情況下,F(xiàn)link 會(huì)根據(jù)文件數(shù)量和每個(gè)文件中的塊數(shù)來推斷其 Hive 讀取器的最佳并行性。

Flink 允許您靈活配置并行推理的策略。您可以在 TableConfig 中配置以下參數(shù)(請(qǐng)注意,這些參數(shù)會(huì)影響作業(yè)的所有源):
43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例,# Flink專欄,flink,hive,大數(shù)據(jù),flink hive,flink kafka,flink sql,flink 流批一體化

4)、讀 Hive 表時(shí)調(diào)整數(shù)據(jù)分片(Split) 大小

讀 Hive 表時(shí), 數(shù)據(jù)文件將會(huì)被切分為若干個(gè)分片(split), 每一個(gè)分片是要讀取的數(shù)據(jù)的一部分。 分片是 Flink 進(jìn)行任務(wù)分配和數(shù)據(jù)并行讀取的基本粒度。 用戶可以通過下面的參數(shù)來調(diào)整每個(gè)分片的大小來做一定的讀性能調(diào)優(yōu)。
43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例,# Flink專欄,flink,hive,大數(shù)據(jù),flink hive,flink kafka,flink sql,flink 流批一體化

為了調(diào)整數(shù)據(jù)分片的大小, Flink 首先將計(jì)算得到所有分區(qū)下的所有文件的大小。 但是這在分區(qū)數(shù)量很多的情況下會(huì)比較耗時(shí),你可以配置作業(yè)參數(shù) table.exec.hive.calculate-partition-size.thread-num(默認(rèn)為3)為一個(gè)更大的值使用更多的線程來進(jìn)行加速。
目前上述參數(shù)僅適用于 ORC 格式的 Hive 表。

5)、加載分區(qū)切片

Flink 使用多個(gè)線程并發(fā)將 Hive 分區(qū)切分成多個(gè) split 進(jìn)行讀取。你可以使用 table.exec.hive.load-partition-splits.thread-num 去配置線程數(shù)。默認(rèn)值是3,你配置的值應(yīng)該大于0。

6)、讀取帶有子目錄的分區(qū)

在某些情況下,或許會(huì)創(chuàng)建一個(gè)引用其他表的外部表,但是該表的分區(qū)列是另一張表分區(qū)字段的子集。 比如,你創(chuàng)建了一個(gè)分區(qū)表 fact_tz,分區(qū)字段是 day 和 hour:

CREATE TABLE fact_tz(x int) PARTITIONED BY (day STRING, hour STRING);

然后你基于 fact_tz 表創(chuàng)建了一個(gè)外部表 fact_daily,并使用了一個(gè)粗粒度的分區(qū)字段 day:

CREATE EXTERNAL TABLE fact_daily(x int) PARTITIONED BY (ds STRING) LOCATION '/user/hive/warehouse/test.db/fact_tz';

當(dāng)讀取外部表 fact_daily 時(shí),該表的分區(qū)目錄下存在子目錄(hour=1 到 hour=24)。

默認(rèn)情況下,可以將帶有子目錄的分區(qū)添加到外部表中。Flink SQL 會(huì)遞歸掃描所有的子目錄,并獲取所有子目錄中數(shù)據(jù)。

ALTER TABLE fact_daily ADD PARTITION (ds='2023-08-11') location '/user/hive/warehouse/test.db/fact_tz/ds=2023-08-11';

你可以設(shè)置作業(yè)屬性 table.exec.hive.read-partition-with-subdirectory.enabled (默認(rèn)為 true) 為 false 以禁止 Flink 讀取子目錄。 如果你設(shè)置成 false 并且分區(qū)目錄下不包含任何子目錄,F(xiàn)link 會(huì)拋出 java.io.IOException: Not a file: /path/to/data/* 異常。

2、時(shí)態(tài)表 Join

你可以使用 Hive 表作為時(shí)態(tài)表,然后一個(gè)數(shù)據(jù)流就可以使用 temporal join 關(guān)聯(lián) Hive 表。 請(qǐng)參照 temporal join 獲取更多關(guān)于 temporal join 的信息。

Flink 支持 processing-time temporal join Hive 表,processing-time temporal join 總是關(guān)聯(lián)最新版本的時(shí)態(tài)表。 Flink 支持 temporal join Hive 的分區(qū)表和非分區(qū)表,對(duì)于分區(qū)表,F(xiàn)link 支持自動(dòng)跟蹤 Hive 表的最新分區(qū)。

注意: Flink 還不支持 event-time temporal join Hive 表。

1)、Temporal Join 最新的分區(qū)

對(duì)于隨時(shí)變化的分區(qū)表,我們可以把它看作是一個(gè)無界流進(jìn)行讀取,如果每個(gè)分區(qū)包含完整數(shù)據(jù),則分區(qū)可以作為時(shí)態(tài)表的一個(gè)版本,時(shí)態(tài)表的版本保存分區(qū)的數(shù)據(jù)。

Flink 支持在使用 processing time temporal join 時(shí)自動(dòng)追蹤最新的分區(qū)(版本),通過 streaming-source.partition-order 定義最新的分區(qū)(版本)。 用戶最常使用的案例就是在 Flink 流作業(yè)中使用 Hive 表作為維度表。

注意: 該特性僅支持 Flink 流模式。

下面的案例演示了經(jīng)典的業(yè)務(wù) pipeline,使用 Hive 中的表作為維度表,它們由每天一次的批任務(wù)或者 Flink 任務(wù)來更新(為方便驗(yàn)證改為每小時(shí)更新一次)。 Kafka 數(shù)據(jù)流來自實(shí)時(shí)在線業(yè)務(wù)數(shù)據(jù)或者日志,該流需要關(guān)聯(lián)維度表以豐富數(shù)據(jù)流。

1、代碼示例
-- 假設(shè) Hive 表中的數(shù)據(jù)每天更新, 每天包含最新和完整的維度數(shù)據(jù)
SET table.sql-dialect=hive;
CREATE TABLE alan_dim_user_table (
  u_id STRING,
  u_name STRING,
  balance DECIMAL(10, 4),
  age INT
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
  -- 使用默認(rèn)的 partition-name 每12小時(shí)加載最新分區(qū)數(shù)據(jù)(推薦)
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-name',  -- 有默認(rèn)的配置項(xiàng),可以不填。

  -- 使用分區(qū)文件create-time 每12小時(shí)加載最新分區(qū)數(shù)據(jù)
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.partition-order' = 'create-time',
  'streaming-source.monitor-interval' = '12 h'

  -- 使用 partition-time 每12小時(shí)加載最新分區(qū)數(shù)據(jù)
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-time',
  'partition.time-extractor.kind' = 'default',
  'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' 
);

SET table.sql-dialect=hive;
CREATE TABLE alan_dim_user_table (
  u_id BIGINT,
  u_name STRING,
  balance DECIMAL(10, 4),
  age INT
) PARTITIONED BY (t_year STRING, t_month STRING, t_day STRING) 
  row format delimited 
  fields terminated by "," 
  TBLPROPERTIES (
  -- 使用默認(rèn)的 partition-name 每1小時(shí)加載最新分區(qū)數(shù)據(jù)(推薦)
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '1 h',
  'streaming-source.partition-order' = 'partition-name'--默認(rèn)的,可以不設(shè)置
);

-- streaming sql, kafka temporal join Hive 維度表. Flink 將在 'streaming-source.monitor-interval' 的間隔內(nèi)自動(dòng)加載最新分區(qū)的數(shù)據(jù)。
SELECT * FROM orders_table AS o 
JOIN alan_dim_user_table FOR SYSTEM_TIME AS OF o.proctime AS u
ON o.u_id = u.u_id;
2、flink 驗(yàn)證步驟

-------------------------flink、kafka、hive操作示例----------------------------------
----本示例是在flink版本為1.13.6的環(huán)境驗(yàn)證的----------------------------------
---------1、創(chuàng)建flink 的維表,每小時(shí)更新一次數(shù)據(jù)----------------------------------
Flink SQL> SET table.sql-dialect=hive;
[INFO] Session property has been set.

Flink SQL> show tables;
+--------------+
|   table name |
+--------------+
| alan_student |
|  student_ext |
|          tbl |
|  test_change |
|    user_dept |
+--------------+
5 rows in set
Flink SQL> CREATE TABLE alan_dim_user_table (
>   u_id BIGINT,
>   u_name STRING,
>   balance DECIMAL(10, 4),
>   age INT
> ) PARTITIONED BY (t_year STRING, t_month STRING, t_day STRING) 
>   row format delimited 
>   fields terminated by "," 
>   TBLPROPERTIES (
>   -- 使用默認(rèn)的 partition-name 每1小時(shí)加載最新分區(qū)數(shù)據(jù)(推薦)
>   'streaming-source.enable' = 'true',
>   'streaming-source.partition.include' = 'latest',
>   'streaming-source.monitor-interval' = '1 h',
>   'streaming-source.partition-order' = 'partition-name'--默認(rèn)的,可以不設(shè)置
> );
[INFO] Execute statement succeed.

-----------2、hive中手動(dòng)加載數(shù)據(jù),第一次只增加一條數(shù)據(jù)----------------------------------
0: jdbc:hive2://server4:10000> show tables;
+----------------------+
|       tab_name       |
+----------------------+
| alan_dim_user_table  |
| alan_student         |
| student_ext          |
| tbl                  |
| test_change          |
| user_dept            |
+----------------------+
6 rows selected (0.05 seconds)

0: jdbc:hive2://server4:10000> load data  inpath '/flinktest/hivetest' into table alan_dim_user_table partition(t_year='2023',t_month='09',t_day='04');
0: jdbc:hive2://server4:10000> select * from alan_dim_user_table;
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
| alan_dim_user_table.u_id  | alan_dim_user_table.u_name  | alan_dim_user_table.balance  | alan_dim_user_table.age  | alan_dim_user_table.t_year  | alan_dim_user_table.t_month  | alan_dim_user_table.t_day  |
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
| 1                         | alan                        | 12.2300                      | 18                       | 2023                        | 09                           | 04                         |
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+

-----3、flink 創(chuàng)建事實(shí)表----------------------------------
Flink SQL> SET table.sql-dialect=default;
[INFO] Session property has been set.

Flink SQL> CREATE TABLE alan_fact_order_table (
>     o_id STRING,
>     o_amount DOUBLE,
>     u_id BIGINT, -- 用戶id
>     item_id BIGINT, -- 商品id
>     action STRING,  -- 用戶行為
>     ts     BIGINT,  -- 用戶行為發(fā)生的時(shí)間戳
>     proctime as PROCTIME(),   -- 通過計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
>     `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件時(shí)間
>     WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND  -- 在eventTime上定義watermark
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'test_hive_topic',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testhivegroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.

---------4、創(chuàng)建kafka 主題、發(fā)送消息(發(fā)送消息是在flink流式查詢語句后)----------------------------------
[alanchan@server2 bin]$ kafka-topics.sh --delete --topic test_hive_topic --bootstrap-server server1:9092
[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic test_hive_topic --partitions 1 --replication-factor 1
Created topic test_hive_topic.
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive_topic
>1,123.34,1,8001,'b',1693874219248
----------5、flink 流式查詢(觀察維表是否加載出來數(shù)據(jù))----------------------------------
Flink SQL> SELECT
>   o.o_id,
>   o.u_id,
>   o.action,
>   o.ts,
>   o.event_time,
>   u.u_name,
>   u.t_year,
>   u.t_month,
>   u.t_day 
> FROM alan_fact_order_table AS o 
> JOIN alan_dim_user_table FOR SYSTEM_TIME AS OF o.proctime AS u ON o.u_id = u.u_id;

+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                           o_id |                 u_id |                         action |                   ts |              event_time |                         u_name |                         t_year |                        t_month |                          t_day |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                              1 |                    1 |                            'b' |        1693874219248 | 2023-09-05 00:51:28.407 |                           alan |                           2023 |                             09 |                             04 |
-------6、hive中加載更多的維度表數(shù)據(jù)(驗(yàn)證維度表是否1小時(shí)更新一次)----------------------------------
0: jdbc:hive2://server4:10000> load data  inpath '/flinktest/hivetest2' into table alan_dim_user_table partition(t_year='2023',t_month='09',t_day='05');
No rows affected (0.194 seconds)
0: jdbc:hive2://server4:10000> select * from alan_dim_user_table;
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
| alan_dim_user_table.u_id  | alan_dim_user_table.u_name  | alan_dim_user_table.balance  | alan_dim_user_table.age  | alan_dim_user_table.t_year  | alan_dim_user_table.t_month  | alan_dim_user_table.t_day  |
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
| 1                         | alan                        | 12.2300                      | 18                       | 2023                        | 09                           | 04                         |
| 2                         | alanchan                    | 22.2300                      | 10                       | 2023                        | 09                           | 05                         |
| 3                         | alanchanchn                 | 32.2300                      | 28                       | 2023                        | 09                           | 05                         |
| 4                         | alan_chan                   | 12.4300                      | 29                       | 2023                        | 09                           | 05                         |
| 5                         | alan_chan_chn               | 52.2300                      | 38                       | 2023                        | 09                           | 05                         |
+---------------------------+-----------------------------+------------------------------+--------------------------+-----------------------------+------------------------------+----------------------------+
5 rows selected (0.143 seconds)
--------------7、kafka中繼續(xù)發(fā)送消息,然后觀察flink流式查詢結(jié)果的變化----------------------------------
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive_topic
>1,123.34,1,8001,'b',1693874219248-----------該數(shù)據(jù)上文中已經(jīng)發(fā)送過,為了表示數(shù)據(jù)的連續(xù)性,沒有刪除
>20,321.34,3,9001,'a',1693874222274
>30,41.34,5,7001,'c',1693874223285    
>50,666.66,2,3001,'d',1693875816640

--------------8、kafka發(fā)送消息后,flink流式查詢結(jié)果----------------------------------
Flink SQL> SELECT
>   o.o_id,
>   o.u_id,
>   o.action,
>   o.ts,
>   o.event_time,
>   u.u_name,
>   u.t_year,
>   u.t_month,
>   u.t_day 
> FROM alan_fact_order_table AS o 
> JOIN alan_dim_user_table FOR SYSTEM_TIME AS OF o.proctime AS u ON o.u_id = u.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                           o_id |                 u_id |                         action |                   ts |              event_time |                         u_name |                         t_year |                        t_month |                          t_day |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                             20 |                    3 |                            'a' |        1693874222274 | 2023-09-05 00:54:49.526 |                    alanchanchn |                           2023 |                             09 |                             05 |
| +I |                             30 |                    5 |                            'c' |        1693874223285 | 2023-09-05 00:55:55.461 |                  alan_chan_chn |                           2023 |                             09 |                             05 |
| +I |                             50 |                    2 |                            'd' |        1693875816640 | 2023-09-05 01:07:23.891 |                       alanchan |                           2023 |                             09 |                             05 |

--------------9、hive維表數(shù)據(jù)不變化,kafka再次發(fā)送消息,觀察flink流式查詢結(jié)果----------------------------------
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive_topic
>1,123.34,1,8001,'b',1693874219248
>20,321.34,3,9001,'a',1693874222274
>30,41.34,5,7001,'c',1693874223285    
>50,666.66,2,3001,'d',1693875816640
>60,666.66,4,3001,'e',1693880868579
>
--------------10、hive維表數(shù)據(jù)不變化,kafka再次發(fā)送消息后,觀察flink流式查詢結(jié)果(還是原來的查詢界面)---------------
Flink SQL> SELECT
>   o.o_id,
>   o.u_id,
>   o.action,
>   o.ts,
>   o.event_time,
>   u.u_name,
>   u.t_year,
>   u.t_month,
>   u.t_day 
> FROM alan_fact_order_table AS o 
> JOIN alan_dim_user_table FOR SYSTEM_TIME AS OF o.proctime AS u ON o.u_id = u.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                           o_id |                 u_id |                         action |                   ts |              event_time |                         u_name |                         t_year |                        t_month |                          t_day |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                             20 |                    3 |                            'a' |        1693874222274 | 2023-09-05 00:54:49.526 |                    alanchanchn |                           2023 |                             09 |                             05 |
| +I |                             30 |                    5 |                            'c' |        1693874223285 | 2023-09-05 00:55:55.461 |                  alan_chan_chn |                           2023 |                             09 |                             05 |
| +I |                             50 |                    2 |                            'd' |        1693875816640 | 2023-09-05 01:07:23.891 |                       alanchan |                           2023 |                             09 |                             05 |
| +I |                             60 |                    4 |                            'e' |        1693880868579 | 2023-09-05 02:30:58.368 |                      alan_chan |                           2023 |                             09 |                             05 |

---及時(shí)查出了數(shù)據(jù)的變化-------------------

2)、Temporal Join 最新的表

對(duì)于 Hive 表,我們可以把它看作是一個(gè)無界流進(jìn)行讀取,在這個(gè)案例中,當(dāng)我們查詢時(shí)只能去追蹤最新的版本。 最新版本的表保留了 Hive 表的所有數(shù)據(jù)。

當(dāng) temporal join 最新的 Hive 表,Hive 表會(huì)緩存到 Slot 內(nèi)存中,并且數(shù)據(jù)流中的每條記錄通過 key 去關(guān)聯(lián)表找到對(duì)應(yīng)的匹配項(xiàng)。 使用最新的 Hive 表作為時(shí)態(tài)表不需要額外的配置。作為可選項(xiàng),您可以使用以下配置項(xiàng)配置 Hive 表緩存的 TTL。當(dāng)緩存失效,Hive 表會(huì)重新掃描并加載最新的數(shù)據(jù)。
43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例,# Flink專欄,flink,hive,大數(shù)據(jù),flink hive,flink kafka,flink sql,flink 流批一體化
下面的案例演示加載 Hive 表的所有數(shù)據(jù)作為時(shí)態(tài)表。

1、代碼示例
-- 假設(shè) Hive 表中的數(shù)據(jù)被批處理 pipeline 覆蓋。
SET table.sql-dialect=hive;
CREATE TABLE alan_dim_user_table2 (
  u_id BIGINT,
  u_name STRING,
  balance DECIMAL(10, 4),
  age INT
)
  row format delimited 
  fields terminated by "," 
  TBLPROPERTIES (
  'streaming-source.enable' = 'false',           -- 有默認(rèn)的配置項(xiàng),可以不填。
  'streaming-source.partition.include' = 'all',  -- 有默認(rèn)的配置項(xiàng),可以不填。
  'lookup.join.cache.ttl' = '1 h'
);

SET table.sql-dialect=default;
CREATE TABLE alan_fact_order_table2 (
    o_id STRING,
    o_amount DOUBLE,
    u_id BIGINT, -- 用戶id
    item_id BIGINT, -- 商品id
    action STRING,  -- 用戶行為
    ts     BIGINT,  -- 用戶行為發(fā)生的時(shí)間戳
    proctime as PROCTIME()   -- 通過計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_hive2_topic',
  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
  'properties.group.id' = 'testhivegroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

-- streaming sql, kafka join Hive 維度表. 當(dāng)緩存失效時(shí) Flink 會(huì)加載維度表的所有數(shù)據(jù)。
SELECT
  o.o_id,
  o.u_id,
  o.action,
  o.ts,
  o.proctime,
  dim.u_name,
  dim.age,
  dim.balance 
FROM alan_fact_order_table2 AS o 
JOIN alan_dim_user_table2 FOR SYSTEM_TIME AS OF o.proctime AS dim
ON o.u_id = dim.u_id;
2、flink驗(yàn)證步驟
----本示例是在flink版本為1.13.6的環(huán)境驗(yàn)證的----------------------------------
----本示例ttl設(shè)置為1小時(shí),方便驗(yàn)證----------------------------------
----1、flink創(chuàng)建維表----------------------------------
Flink SQL> show tables;
+-----------------------+
|            table name |
+-----------------------+
|   alan_dim_user_table |
| alan_fact_order_table |
|          alan_student |
|           student_ext |
|                   tbl |
|           test_change |
|             user_dept |
+-----------------------+
7 rows in set

Flink SQL> SET table.sql-dialect=hive;
[INFO] Session property has been set.

Flink SQL> CREATE TABLE alan_dim_user_table2 (
>   u_id BIGINT,
>   u_name STRING,
>   balance DECIMAL(10, 4),
>   age INT
> )
>   row format delimited 
>   fields terminated by "," 
>   TBLPROPERTIES (
>   'streaming-source.enable' = 'false',           -- 有默認(rèn)的配置項(xiàng),可以不填。
>   'streaming-source.partition.include' = 'all',  -- 有默認(rèn)的配置項(xiàng),可以不填。
>   'lookup.join.cache.ttl' = '1 h'
> );
[INFO] Execute statement succeed.

----2、hive中對(duì)維表插入數(shù)據(jù)----------------------------------
0: jdbc:hive2://server4:10000> load data  inpath '/flinktest/hivetest' into table alan_dim_user_table2;
No rows affected (0.139 seconds)
0: jdbc:hive2://server4:10000> select * from alan_dim_user_table2;
+----------------------------+------------------------------+-------------------------------+---------------------------+
| alan_dim_user_table2.u_id  | alan_dim_user_table2.u_name  | alan_dim_user_table2.balance  | alan_dim_user_table2.age  |
+----------------------------+------------------------------+-------------------------------+---------------------------+
| 1                          | alan                         | 12.2300                       | 18                        |
| 2                          | alanchan                     | 22.2300                       | 10                        |
| 3                          | alanchanchn                  | 32.2300                       | 28                        |
+----------------------------+------------------------------+-------------------------------+---------------------------+
3 rows selected (0.124 seconds)

----3、flink中創(chuàng)建事實(shí)表----------------------------------

Flink SQL> SET table.sql-dialect=default;
Hive Session ID = 4d502166-65b7-4079-af12-35919101ed8d
[INFO] Session property has been set.

Flink SQL> CREATE TABLE alan_fact_order_table2 (
>     o_id STRING,
>     o_amount DOUBLE,
>     u_id BIGINT, -- 用戶id
>     item_id BIGINT, -- 商品id
>     action STRING,  -- 用戶行為
>     ts     BIGINT,  -- 用戶行為發(fā)生的時(shí)間戳
>     proctime as PROCTIME()   -- 通過計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'test_hive2_topic',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testhivegroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.

----4、創(chuàng)建kafka topic,并發(fā)送數(shù)據(jù)----------------------------------
[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic test_hive2_topic --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic test_hive2_topic.
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive2_topic
>1,123.34,1,8001,'b',1693887925763
>30,41.34,5,7001,'c',1693874222274
>30,41.34,5,7001,'c',1693887926780
>20,321.34,3,9001,'a',1693887928801
>50,666.66,2,3001,'d',1693887927790

----5、flink中查詢,觀察查詢結(jié)果----------------------------------
Flink SQL> SELECT
>   o.o_id,
>   o.u_id,
>   o.action,
>   o.ts,
>   o.proctime,
>   dim.u_name,
>   dim.age,
>   dim.balance 
> FROM alan_fact_order_table2 AS o 
> JOIN alan_dim_user_table2 FOR SYSTEM_TIME AS OF o.proctime AS dim
> ON o.u_id = dim.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| op |                           o_id |                 u_id |                         action |                   ts |                proctime |                         u_name |         age |      balance |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| +I |                              1 |                    1 |                            'b' |        1693887925763 | 2023-09-05 04:24:47.825 |                           alan |          18 |      12.2300 |
| +I |                             20 |                    3 |                            'a' |        1693887928801 | 2023-09-05 04:26:06.437 |                    alanchanchn |          28 |      32.2300 |
| +I |                             50 |                    2 |                            'd' |        1693887927790 | 2023-09-05 04:26:46.404 |                       alanchan |          10 |      22.2300 |

----6、在hive中加載新的數(shù)據(jù),kafka中發(fā)送新的消息,觀察flink的查詢結(jié)果----------------------------------
0: jdbc:hive2://server4:10000> load data  inpath '/flinktest/hivetest' into table alan_dim_user_table2;
No rows affected (0.129 seconds)
0: jdbc:hive2://server4:10000> select * from alan_dim_user_table2;
+----------------------------+------------------------------+-------------------------------+---------------------------+
| alan_dim_user_table2.u_id  | alan_dim_user_table2.u_name  | alan_dim_user_table2.balance  | alan_dim_user_table2.age  |
+----------------------------+------------------------------+-------------------------------+---------------------------+
| 1                          | alan                         | 12.2300                       | 18                        |
| 2                          | alanchan                     | 22.2300                       | 10                        |
| 3                          | alanchanchn                  | 32.2300                       | 28                        |
| 4                          | alan_chan                    | 12.4300                       | 29                        |
| 5                          | alan_chan_chn                | 52.2300                       | 38                        |
+----------------------------+------------------------------+-------------------------------+---------------------------+

[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive2_topic
>1,123.34,1,8001,'b',1693887925763
>30,41.34,5,7001,'c',1693874222274
>30,41.34,5,7001,'c',1693887926780
>20,321.34,3,9001,'a',1693887928801
>50,666.66,2,3001,'d',1693887927790
>30,41.34,5,7001,'c',1693887926780-----該條數(shù)據(jù)在flink的查詢結(jié)果中沒有顯示

Flink SQL> SELECT
>   o.o_id,
>   o.u_id,
>   o.action,
>   o.ts,
>   o.proctime,
>   dim.u_name,
>   dim.age,
>   dim.balance 
> FROM alan_fact_order_table2 AS o 
> JOIN alan_dim_user_table2 FOR SYSTEM_TIME AS OF o.proctime AS dim
> ON o.u_id = dim.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| op |                           o_id |                 u_id |                         action |                   ts |                proctime |                         u_name |         age |      balance |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| +I |                              1 |                    1 |                            'b' |        1693887925763 | 2023-09-05 04:24:47.825 |                           alan |          18 |      12.2300 |
| +I |                             20 |                    3 |                            'a' |        1693887928801 | 2023-09-05 04:26:06.437 |                    alanchanchn |          28 |      32.2300 |
| +I |                             50 |                    2 |                            'd' |        1693887927790 | 2023-09-05 04:26:46.404 |                       alanchan |          10 |      22.2300 |

----7、ttl過期后,再在kafka中發(fā)送新的消息,觀察flink的查詢結(jié)果----------------------------------
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_hive2_topic
...,下面2條數(shù)據(jù)是TTL過期后發(fā)送的,如預(yù)期一樣查出來了結(jié)果
>30,41.34,5,7001,'c',1693893016308
>1,123.34,1,8001,'b',1693893020334

Flink SQL> SELECT
>   o.o_id,
>   o.u_id,
>   o.action,
>   o.ts,
>   o.proctime,
>   dim.u_name,
>   dim.age,
>   dim.balance 
> FROM alan_fact_order_table2 AS o 
> JOIN alan_dim_user_table2 FOR SYSTEM_TIME AS OF o.proctime AS dim
> ON o.u_id = dim.u_id;
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| op |                           o_id |                 u_id |                         action |                   ts |                proctime |                         u_name |         age |      balance |
+----+--------------------------------+----------------------+--------------------------------+----------------------+-------------------------+--------------------------------+-------------+--------------+
| +I |                              1 |                    1 |                            'b' |        1693887925763 | 2023-09-05 04:24:47.825 |                           alan |          18 |      12.2300 |
| +I |                             20 |                    3 |                            'a' |        1693887928801 | 2023-09-05 04:26:06.437 |                    alanchanchn |          28 |      32.2300 |
| +I |                             50 |                    2 |                            'd' |        1693887927790 | 2023-09-05 04:26:46.404 |                       alanchan |          10 |      22.2300 |
| +I |                             30 |                    5 |                            'c' |        1693893016308 | 2023-09-05 05:49:47.984 |                  alan_chan_chn |          38 |      52.2300 |
| +I |                              1 |                    1 |                            'b' |        1693893020334 | 2023-09-05 05:50:23.696 |                           alan |          18 |      12.2300 |

------以上,完成了驗(yàn)證

每個(gè)參與 join 的 subtask 需要在他們的緩存中保留 Hive 表。請(qǐng)確保 Hive 表可以放到 TM task slot 中。
建議把這兩個(gè)選項(xiàng)配置成較大的值 streaming-source.monitor-interval(最新的分區(qū)作為時(shí)態(tài)表) 和 lookup.join.cache.ttl(所有的分區(qū)作為時(shí)態(tài)表)。否則,任務(wù)會(huì)頻繁更新和加載表,容易出現(xiàn)性能問題。
目前(截至flink 1.17版本),緩存刷新的時(shí)候會(huì)重新加載整個(gè) Hive 表,所以沒有辦法區(qū)分?jǐn)?shù)據(jù)是新數(shù)據(jù)還是舊數(shù)據(jù)。

3、寫入hive數(shù)據(jù)

Flink 支持批和流兩種模式往 Hive 中寫入數(shù)據(jù),當(dāng)作為批程序,只有當(dāng)作業(yè)完成時(shí),F(xiàn)link 寫入 Hive 表的數(shù)據(jù)才能被看見。批模式寫入支持追加到現(xiàn)有的表或者覆蓋現(xiàn)有的表。

1)、代碼示例1

# ------ INSERT INTO 將追加到表或者分區(qū),保證數(shù)據(jù)的完整性 ------ 
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;

# ------ INSERT OVERWRITE 將覆蓋表或者分區(qū)中所有已經(jīng)存在的數(shù)據(jù) ------ 
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;

2)、flink驗(yàn)證步驟

-------------flink 1.13.6環(huán)境中操作示例---------
Flink SQL> CREATE TABLE alan_w_user_table (
>   u_id BIGINT,
>   u_name STRING,
>   balance DECIMAL(10, 4),
>   age INT
> )
>   row format delimited 
>   fields terminated by "," 
>  ;
Hive Session ID = 30451c4a-5ca9-470c-9274-9ecf5330c76d
[INFO] Execute statement succeed.

Flink SQL> show tables;
Hive Session ID = 8c5f20ac-989e-423c-b936-d8274ceff5b1
+------------------------+
|             table name |
+------------------------+
|    alan_dim_user_table |
|   alan_dim_user_table2 |
|  alan_fact_order_table |
| alan_fact_order_table2 |
|           alan_student |
|      alan_w_user_table |
|            student_ext |
|                    tbl |
|            test_change |
|              user_dept |
+------------------------+
10 rows in set

Flink SQL> INSERT INTO alan_w_user_table values (1,'alan',12.4,18);
Job ID: ea03b7c37aca92197c608da292cbb8f3

Flink SQL> select * from alan_w_user_table;
+----+----------------------+--------------------------------+--------------+-------------+
| op |                 u_id |                         u_name |      balance |         age |
+----+----------------------+--------------------------------+--------------+-------------+
| +I |                    1 |                           alan |      12.4000 |          18 |
+----+----------------------+--------------------------------+--------------+-------------+
Received a total of 1 row
-----flink streaming模式下是不支持insert overwrite的,需要設(shè)置為batch模式
Flink SQL> INSERT OVERWRITE  alan_w_user_table values (1,'alanchan',22.4,19);
Hive Session ID = 58ec8fbd-aa1b-40c1-ab09-6da083e6327e
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.
-----默認(rèn)為streaming模式,設(shè)置為batch模式
Flink SQL> SET execution.runtime-mode = batch;
Hive Session ID = 3eb977f9-1036-42e3-8b0f-22c2357706fc
[INFO] Session property has been set.
------flink batch模式下,不能開啟checkpoint,需要關(guān)閉checkpoint才能支持batch job,
Flink SQL> INSERT OVERWRITE  alan_w_user_table values (1,'alanchan',22.4,19);
Hive Session ID = 5b2db357-5c12-44a0-8159-f6f18ba5fbea
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Checkpoint is not supported for batch jobs.
----------此處只是為了演示insert into與insert overwrite的區(qū)別,此區(qū)別與hive中的一致,此處不再贅述,詳見hive專欄的部分

還可以將數(shù)據(jù)插入到特定的分區(qū)中。

3)、代碼示例2

# ------ 插入靜態(tài)分區(qū) ------ 
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;

# ------ 插入動(dòng)態(tài)分區(qū) ------ 
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';

# ------ 插入靜態(tài)(my_type)和動(dòng)態(tài)(my_date)分區(qū) ------ 
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';

4)、flink驗(yàn)證步驟

------------------------flink 1.13.6環(huán)境中操作示例---------------------------------------------
----------靜態(tài)分區(qū),插入數(shù)據(jù)----------
Flink SQL> SET table.sql-dialect=hive;
[INFO] Session property has been set.

Flink SQL> CREATE TABLE alan_wp_user_table (
>   u_id BIGINT,
>   u_name STRING,
>   balance DECIMAL(10, 4),
>   age INT
> ) PARTITIONED BY (dt STRING,hr STRING) 
>   row format delimited 
>   fields terminated by "," 
>   TBLPROPERTIES (
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.delay'='10 s',
>   'sink.partition-commit.policy.kind'='metastore,success-file'
> );
[INFO] Execute statement succeed.

Flink SQL> INSERT into alan_wp_user_table PARTITION (dt='2023-09-05', hr = '05') values (1,'alan',12.4,18);
Job ID: 8b88ccfb6e6e47a79334e79bbc946389

Flink SQL> select * from alan_wp_user_table;
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op |                 u_id |                         u_name |      balance |         age |                             dt |                             hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I |                    1 |                           alan |      12.4000 |          18 |                     2023-09-05 |                             05 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
Received a total of 1 row

---------另外一種插入方式----------
Flink SQL> INSERT into alan_wp_user_table PARTITION (dt='2023-09-05', hr = '05') SELECT 2,'alanchan', 25.8,19;
Job ID: 93dbf92c01e41c245a38fb5776eb7d59


Flink SQL>  select * from alan_wp_user_table;
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op |                 u_id |                         u_name |      balance |         age |                             dt |                             hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I |                    2 |                       alanchan |      25.8000 |          19 |                     2023-09-05 |                             05 |
| +I |                    1 |                           alan |      12.4000 |          18 |                     2023-09-05 |                             05 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+

------ 插入動(dòng)態(tài)分區(qū) ------ 
INSERT into alan_wp_user_table SELECT 3,'alanchanchn', 35.8,29, '2023-09-05', '05';

Flink SQL> INSERT into alan_wp_user_table PARTITION (dt='2023-09-05', hr = '05') values (1,'alan',12.4,18);
------如果hive中查得到數(shù)據(jù),flink sql中查不到數(shù)據(jù),flink sql cli 中執(zhí)行  SET table.sql-dialect=hive;命令再查即可

Flink SQL> select * from alan_wp_user_table;

+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op |                 u_id |                         u_name |      balance |         age |                             dt |                             hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I |                    1 |                           alan |      12.4000 |          18 |                     2023-09-05 |                             05 |
| +I |                    2 |                       alanchan |      25.8000 |          19 |                     2023-09-05 |                             05 |
| +I |                    3 |                    alanchanchn |      35.8000 |          29 |                     2023-09-05 |                             05 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+

------ 插入靜態(tài)(my_type)和動(dòng)態(tài)(my_date)分區(qū) ------ 
------該種插入方式需要是batch模式,batch模式不支持checkpoint ,該種情況沒有進(jìn)一步驗(yàn)證
Flink SQL> SET execution.runtime-mode = batch;
[INFO] Session property has been set.

Flink SQL> INSERT OVERWRITE alan_wp_user_table PARTITION (dt='2023-09-05') SELECT 4,'alan_chanchn', 45.8,39, '06';
Hive Session ID = 26829c28-8581-4bf4-b4f7-bea17042e6de
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Checkpoint is not supported for batch jobs.

流寫會(huì)不斷的往 Hive 中添加新數(shù)據(jù),提交記錄使它們可見。用戶可以通過幾個(gè)屬性控制如何觸發(fā)提交。

流寫不支持 Insert overwrite

5)、代碼示例3

下面的案例演示如何流式地從 Kafka 寫入 Hive 表并執(zhí)行分區(qū)提交,然后運(yùn)行一個(gè)批處理查詢將數(shù)據(jù)讀出來。

---創(chuàng)建hive表
SET table.sql-dialect=hive;
CREATE TABLE alan_hive_user_table (
  u_id BIGINT,
  u_name STRING,
  balance DECIMAL(10, 4),
  age INT
) PARTITIONED BY (dt STRING,hr STRING) 
  row format delimited 
  fields terminated by "," 
  TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='10 s',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'sink.rolling-policy.rollover-interval'='5s',
  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai' -- 假設(shè)用戶配置的時(shí)區(qū)為 'Asia/Shanghai', 
);

---創(chuàng)建kafka表
SET table.sql-dialect=default;
CREATE TABLE alan_kafka_table (
  u_id BIGINT,
  u_name STRING,
  balance DECIMAL(10, 4),
  age INT,
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件時(shí)間
  WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND  -- 在eventTime上定義watermark
) WITH (
  'connector' = 'kafka',
  'topic' = 'alan_kafka_hive_topic',
  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

-- 流式sql 插入hive數(shù)據(jù)庫
INSERT INTO alan_hive_user_table 
SELECT u_id, u_name,balance,age, DATE_FORMAT(`event_time`, 'yyyy-MM-dd'), DATE_FORMAT(`event_time`, 'HH')
FROM alan_kafka_table;

-- 批處理sql ,按照分區(qū)查詢
SELECT * FROM alan_hive_user_table WHERE dt='2023-09-05' and hr='07';

6)、flink驗(yàn)證步驟

-----設(shè)置運(yùn)行環(huán)境
Flink SQL> SET execution.runtime-mode = streaming;
[INFO] Session property has been set.

----設(shè)置hive方言
Flink SQL> SET table.sql-dialect=hive;
Hive Session ID = b64d5e77-1f0e-4480-a680-0f7ebf7e34c4
[INFO] Session property has been set.
-----創(chuàng)建hive表
Flink SQL> CREATE TABLE alan_hive_user_table (
>   u_id BIGINT,
>   u_name STRING,
>   balance DECIMAL(10, 4),
>   age INT
> ) PARTITIONED BY (dt STRING,hr STRING) 
>   row format delimited 
>   fields terminated by "," 
>   TBLPROPERTIES (
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.delay'='10 s',
>   'sink.partition-commit.policy.kind'='metastore,success-file',
>   'sink.rolling-policy.rollover-interval'='5s',
>   'sink.partition-commit.watermark-time-zone'='Asia/Shanghai' -- 假設(shè)用戶配置的時(shí)區(qū)為 'Asia/Shanghai', 
> );
[INFO] Execute statement succeed.
----設(shè)置flink 默認(rèn)方言
Flink SQL> SET table.sql-dialect=default;
[INFO] Session property has been set.
------創(chuàng)建kafka表
Flink SQL> CREATE TABLE alan_kafka_table (
>   u_id BIGINT,
>   u_name STRING,
>   balance DECIMAL(10, 4),
>   age INT,
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件時(shí)間
>   WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND  -- 在eventTime上定義watermark
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'alan_kafka_hive_topic',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
------流式sql ,按照分區(qū)流式插入數(shù)據(jù),也即flink的一個(gè)任務(wù)
Flink SQL> INSERT INTO alan_hive_user_table 
> SELECT u_id, u_name,balance,age, DATE_FORMAT(`event_time`, 'yyyy-MM-dd'), DATE_FORMAT(`event_time`, 'HH')
> FROM alan_kafka_table;

Job ID: 95fceba5540315957ed7d0b873461e43
-----kafka 發(fā)送數(shù)據(jù)
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_kafka_hive_topic
>1,'alan',123.34,18
>2,'alanchan',223.34,28
>

---flink sql 查詢數(shù)據(jù),kafka發(fā)送一次查詢一次
Flink SQL> select * from alan_hive_user_table where dt='2023-09-05' and hr='07';
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op |                 u_id |                         u_name |      balance |         age |                             dt |                             hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I |                    1 |                         'alan' |     123.3400 |          18 |                     2023-09-05 |                             07 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
Received a total of 1 row

Flink SQL> select * from alan_hive_user_table where dt='2023-09-05' and hr='07';


+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| op |                 u_id |                         u_name |      balance |         age |                             dt |                             hr |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
| +I |                    1 |                         'alan' |     123.3400 |          18 |                     2023-09-05 |                             07 |
| +I |                    2 |                     'alanchan' |     223.3400 |          28 |                     2023-09-05 |                             07 |
+----+----------------------+--------------------------------+--------------+-------------+--------------------------------+--------------------------------+
Received a total of 2 rows


如果在 TIMESTAMP_LTZ 列定義了 watermark 并且使用 partition-time 提交,需要對(duì) sink.partition-commit.watermark-time-zone 設(shè)置會(huì)話時(shí)區(qū),否則分區(qū)提交會(huì)發(fā)生在幾個(gè)小時(shí)后。
下面的示例可以參考16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)中的示例,區(qū)別在于connector不同,實(shí)際設(shè)置一樣,不再贅述。

SET table.sql-dialect=hive;
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假設(shè)用戶配置的時(shí)區(qū)是 'Asia/Shanghai'。
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  ts BIGINT, -- time in epoch milliseconds
  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列聲明 watermark。
) WITH (...);

-- streaming sql, insert into hive table
INSERT INTO TABLE hive_table 
SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH')
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

默認(rèn)情況下,對(duì)于流,F(xiàn)link 僅支持重命名 committers,對(duì)于 S3 文件系統(tǒng)不支持流寫的 exactly-once 語義。 通過將以下參數(shù)設(shè)置為 false,可以實(shí)現(xiàn) exactly-once 寫入 S3。 這會(huì)調(diào)用 Flink 原生的 writer ,但是僅針對(duì) parquet 和 orc 文件類型有效。 這個(gè)配置項(xiàng)可以在 TableConfig 中配置,該配置項(xiàng)對(duì)作業(yè)的所有 sink 都生效。
43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例,# Flink專欄,flink,hive,大數(shù)據(jù),flink hive,flink kafka,flink sql,flink 流批一體化

7)、動(dòng)態(tài)分區(qū)的寫入

不同于靜態(tài)分區(qū)的寫入總是需要用戶指定分區(qū)列的值,動(dòng)態(tài)分區(qū)允許用戶在寫入數(shù)據(jù)的時(shí)候不指定分區(qū)列的值。 比如,有這樣一個(gè)分區(qū)表:

CREATE TABLE alan_wp_user_table (
  u_id BIGINT,
  u_name STRING,
  balance DECIMAL(10, 4),
  age INT
) PARTITIONED BY (dt STRING,hr STRING) 
  row format delimited 
  fields terminated by "," 
  TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='10 s',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

用戶可以使用如下的 SQL 語句向該分區(qū)表寫入數(shù)據(jù):

INSERT into alan_wp_user_table SELECT 3,'alanchanchn', 35.8,29, '2023-09-05', '05';

在該 SQL 語句中,用戶沒有指定分區(qū)列的值,這就是一個(gè)典型的動(dòng)態(tài)分區(qū)寫入的例子。

默認(rèn)情況下, 如果是動(dòng)態(tài)分區(qū)的寫入, 在實(shí)際寫入目標(biāo)表之前,F(xiàn)link 將額外對(duì)數(shù)據(jù)按照動(dòng)態(tài)分區(qū)列進(jìn)行排序。 這就意味著 sink 節(jié)點(diǎn)收到的數(shù)據(jù)都是按分區(qū)排序的,即首先收到一個(gè)分區(qū)的數(shù)據(jù),然后收到另一個(gè)分區(qū)的數(shù)據(jù),不同分區(qū)的數(shù)據(jù)不會(huì)混在一起。 這樣 Hive sink 節(jié)點(diǎn)就可以一次只維護(hù)一個(gè)分區(qū)的 writer,否則,Hive sink 需要維護(hù)收到的數(shù)據(jù)對(duì)應(yīng)的所有分區(qū)的 writer,如果分區(qū)的 writer 過多的話,則可能會(huì)導(dǎo)致內(nèi)存溢出(OutOfMemory)異常。

為了避免額外的排序,你可以將作業(yè)的配置項(xiàng) table.exec.hive.sink.sort-by-dynamic-partition.enable(默認(rèn)是 true)設(shè)置為 false。 但是這種配置下,如之前所述,如果單個(gè) sink 節(jié)點(diǎn)收到的動(dòng)態(tài)分區(qū)數(shù)過多的話,則有可能會(huì)出現(xiàn)內(nèi)存溢出的異常。

如果數(shù)據(jù)傾斜不嚴(yán)重的話,你可以在 SQL 語句中添加 DISTRIBUTED BY <partition_field> 將相同分區(qū)的數(shù)據(jù)分布到到相同的 sink 節(jié)點(diǎn)上來緩解單個(gè) sink 節(jié)點(diǎn)的分區(qū) writer 過多的問題。

此外,你也可以在 SQL 語句中添加 DISTRIBUTED BY <partition_field> 來達(dá)到將 table.exec.hive.sink.sort-by-dynamic-partition.enable 設(shè)置為 false 的效果。

該配置項(xiàng) table.exec.hive.sink.sort-by-dynamic-partition.enable 只在批模式下生效。
目前(截至1.17版本),只有在 Flink 批模式下使用了 Hive 方言,才可以使用 DISTRIBUTED BY 和 SORTED BY。

8)、自動(dòng)收集統(tǒng)計(jì)信息

在使用 Flink 寫入 Hive 表的時(shí)候,F(xiàn)link 將默認(rèn)自動(dòng)收集寫入數(shù)據(jù)的統(tǒng)計(jì)信息然后將其提交至 Hive metastore 中。 但在某些情況下,你可能不想自動(dòng)收集統(tǒng)計(jì)信息,因?yàn)槭占@些統(tǒng)計(jì)信息可能會(huì)花費(fèi)一定的時(shí)間。 為了避免 Flink 自動(dòng)收集統(tǒng)計(jì)信息,你可以設(shè)置作業(yè)參數(shù) table.exec.hive.sink.statistic-auto-gather.enable (默認(rèn)是 true) 為 false。

如果寫入的 Hive 表是以 Parquet 或者 ORC 格式存儲(chǔ)的時(shí)候,numFiles/totalSize/numRows/rawDataSize 這些統(tǒng)計(jì)信息可以被 Flink 收集到。 否則, 只有 numFiles/totalSize 可以被收集到。

對(duì)于 Parquet 或者 ORC 格式的表,為了快速收集到統(tǒng)計(jì)信息 numRows/rawDataSize, Flink 只會(huì)讀取文件的 footer。但是在文件數(shù)量很多的情況下,這可能也會(huì)比較耗時(shí),你可以通過 設(shè)置作業(yè)參數(shù) table.exec.hive.sink.statistic-auto-gather.thread-num(默認(rèn)是 3)為一個(gè)更大的值來加快統(tǒng)計(jì)信息的收集。

只有批模式才支持自動(dòng)收集統(tǒng)計(jì)信息,流模式目前還不支持自動(dòng)收集統(tǒng)計(jì)信息。

9)、文件合并

在使用 Flink 寫 Hive 表的時(shí)候,F(xiàn)link 也支持自動(dòng)對(duì)小文件進(jìn)行合并以減少小文件的數(shù)量。

  • Stream Mode
    流模式下,合并小文件的行為與寫 文件系統(tǒng) 一樣,更多細(xì)節(jié)請(qǐng)參考 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)

  • Batch Mode
    在批模式,并且自動(dòng)合并小文件已經(jīng)開啟的情況下,在結(jié)束寫 Hive 表后,F(xiàn)link 會(huì)計(jì)算每個(gè)分區(qū)下文件的平均大小,如果文件的平均大小小于用戶指定的一個(gè)閾值,F(xiàn)link 則會(huì)將這些文件合并成指定大小的文件。下面是文件合并涉及到的參數(shù):
    43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例,# Flink專欄,flink,hive,大數(shù)據(jù),flink hive,flink kafka,flink sql,flink 流批一體化

4、格式

Flink 對(duì) Hive 的集成已經(jīng)在如下的文件格式進(jìn)行了測試:

  • Text
  • CSV
  • SequenceFile
  • ORC
  • Parquet
    hive的文件格式的設(shè)置方式與直接在hive中設(shè)置方式一樣,不再贅述。具體可以參考
    3、hive的使用示例詳解-建表、數(shù)據(jù)類型詳解、內(nèi)部外部表、分區(qū)表、分桶表
    4、hive的使用示例詳解-事務(wù)表、視圖、物化視圖、DDL(數(shù)據(jù)庫、表以及分區(qū))管理詳細(xì)操作

以上,詳細(xì)的介紹了Flink 與hive的集成、通過flink sql讀寫hive數(shù)據(jù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-695368.html

到了這里,關(guān)于43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 33、Flink之hive介紹與簡單示例

    33、Flink之hive介紹與簡單示例

    1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接 13、Flink 的table api與sql的基本概念、通用api介紹及入門示例 14、Flink 的table api與sql之?dāng)?shù)據(jù)類型: 內(nèi)置數(shù)據(jù)類型以及它們的屬性 15、Flink 的table api與sql之流式概念-詳解的介紹了動(dòng)

    2024年02月10日
    瀏覽(16)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆級(jí)超詳細(xì)含圖文)

    Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆級(jí)超詳細(xì)含圖文)

    說明: 本篇將詳細(xì)介紹用二進(jìn)制安裝包部署hadoop等組件,注意事項(xiàng),各組件的使用,常用的一些命令,以及在部署中遇到的問題解決思路等等,都將詳細(xì)介紹。 ip hostname 192.168.1.11 node1 192.168.1.12 node2 192.168.1.13 node3 1.2.1系統(tǒng)版本 1.2.2內(nèi)存建議最少4g、2cpu、50G以上的磁盤容量 本次

    2024年02月12日
    瀏覽(38)
  • 【Flink-Kafka-To-Hive】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫入 Hive

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 Hive。 2、相關(guān)配置存放于 Mysql 中,通過 Mysql 進(jìn)行動(dòng)態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、Flink 集成 Kafka 寫入 Hive 需要進(jìn)行 checkpoint 才能落盤至 HDFS。 5、先在 Hive 中創(chuàng)建表然后動(dòng)態(tài)獲取 Hive 的表

    2024年02月03日
    瀏覽(23)
  • Flink集成Hive之Hive Catalog

    流程流程: Flink消費(fèi)Kafka,邏輯處理后將實(shí)時(shí)流轉(zhuǎn)換為表視圖,利用HiveCataLog創(chuàng)建Hive表,將實(shí)時(shí)流?表insert進(jìn)Hive,注意分區(qū)時(shí)間字段需要為 yyyy-MM-dd形式,否則拋出異常:java.time.format.DateTimeParseException: Text \\\'20240111\\\' could not be parsed 寫入到hive分區(qū)表 streamEnv需要開啟checkpoint,保證flink寫入

    2024年01月16日
    瀏覽(20)
  • Hive & Spark & Flink 數(shù)據(jù)傾斜

    絕大部分任務(wù)都很快完成,只有一個(gè)或者少數(shù)幾個(gè)任務(wù)執(zhí)行的很慢甚至最終執(zhí)行失敗, 這樣的現(xiàn)象為數(shù)據(jù)傾斜現(xiàn)象。 任務(wù)進(jìn)度長時(shí)間維持在 99%或者 100%的附近,查看任務(wù)監(jiān)控頁面,發(fā)現(xiàn)只有少量 reduce 子任務(wù)未完成,因?yàn)槠涮幚淼臄?shù)據(jù)量和其他的 reduce 差異過大。 單一 redu

    2024年02月07日
    瀏覽(31)
  • Flink Hive Catalog操作案例

    Flink Hive Catalog操作案例

    在此對(duì)Flink讀寫Hive表操作進(jìn)行逐步記錄,需要指出的是,其中操作Hive分區(qū)表和非分區(qū)表的DDL有所不同,以下分別記錄。 Hive-3.1.3 Flink-1.17.1 1、上傳依賴jar包到flink/lib目錄下 2、更換planner依賴(Hive集成的推薦設(shè)置) 3、啟動(dòng)Hive MetaStore 4、啟動(dòng)flink集群和sql-client 5、在flink sql-cl

    2024年02月08日
    瀏覽(18)
  • Flink SQL Hive Connector使用場景

    目錄 1.介紹 2.使用 2.1注冊(cè)HiveCatalog 2.2Hive Read 2.2.1流讀關(guān)鍵配置 2.2.2示例

    2024年02月06日
    瀏覽(24)
  • 萬字解決Flink|Spark|Hive 數(shù)據(jù)傾斜

    萬字解決Flink|Spark|Hive 數(shù)據(jù)傾斜

    此篇主要總結(jié)到Hive,Flink,Spark出現(xiàn)數(shù)據(jù)傾斜的表現(xiàn),原因和解決辦法。首先會(huì)讓大家認(rèn)識(shí)到不同框架或者計(jì)算引擎處理傾斜的方案。最后你會(huì)發(fā)現(xiàn)計(jì)算框架只是“異曲”,文末總結(jié)才是“同工之妙”。點(diǎn)擊收藏與分享,工作和漲薪用得到?。。?數(shù)據(jù)傾斜最籠統(tǒng)概念就是數(shù)據(jù)的

    2024年02月03日
    瀏覽(29)
  • Hive SQL 遷移 Flink SQL 在快手的實(shí)踐

    Hive SQL 遷移 Flink SQL 在快手的實(shí)踐

    摘要:本文整理自快手?jǐn)?shù)據(jù)架構(gòu)工程師張芒,阿里云工程師劉大龍,在 Flink Forward Asia 2022 生產(chǎn)實(shí)踐專場的分享。本篇內(nèi)容主要分為四個(gè)部分: Flink 流批一體引擎 Flink Batch 生產(chǎn)實(shí)踐 核心優(yōu)化解讀 未來規(guī)劃 點(diǎn)擊查看原文視頻 演講PPT 首先,介紹一下我們選擇 Flink 作為流批一體

    2024年02月16日
    瀏覽(16)
  • 【Flink實(shí)戰(zhàn)】Flink hint更靈活、更細(xì)粒度的設(shè)置Flink sql行為與簡化hive連接器參數(shù)設(shè)置

    SQL 提示(SQL Hints)是和 SQL 語句一起使用來改變執(zhí)行計(jì)劃的。本章介紹如何使用 SQL 提示來實(shí)現(xiàn)各種干預(yù)。 SQL 提示一般可以用于以下: 增強(qiáng) planner:沒有完美的 planner, SQL 提示讓用戶更好地控制執(zhí)行; 增加元數(shù)據(jù)(或者統(tǒng)計(jì)信息):如\\\"已掃描的表索引\\\"和\\\"一些混洗鍵(shu

    2024年04月25日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包