Flink 系列文章
一、Flink 專(zhuān)欄
Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 -
2、Flink基礎(chǔ)系列
本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 -
3、Flik Table API和SQL基礎(chǔ)系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫(kù)、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。 -
4、Flik Table API和SQL提高與應(yīng)用系列
本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開(kāi)發(fā)難度的內(nèi)容。 -
5、Flink 監(jiān)控系列
本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。
二、Flink 示例專(zhuān)欄
Flink 示例專(zhuān)欄是 Flink 專(zhuān)欄的輔助說(shuō)明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專(zhuān)欄不再分目錄,通過(guò)鏈接即可看出介紹的內(nèi)容。
兩專(zhuān)欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引
本文簡(jiǎn)單的介紹了關(guān)于通過(guò)flink sql與kafka的進(jìn)行交互的內(nèi)容及相關(guān)示例。
本文依賴環(huán)境是hadoop、kafka環(huán)境好用,如果是ha環(huán)境則需要zookeeper的環(huán)境。
一、Table & SQL Connectors 示例:Apache Kafka
1、maven依賴(java編碼依賴)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
2、創(chuàng)建 Kafka 表
以下示例展示了如何創(chuàng)建 Kafka 表,分別使用了csv和json格式文件:
1)、csv格式文件示例
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
CREATE TABLE t1 (
`id` INT,
name STRING,
age BIGINT,
t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 't_kafkasource',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'format' = 'csv'
);
------kafka關(guān)于主題和發(fā)送數(shù)據(jù)命令
kafka-topics.sh --delete --topic t_kafkasource --bootstrap-server server1:9092
kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafkasource --partitions 1 --replication-factor 1
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
--測(cè)試數(shù)據(jù)
1,alan,15
2,alanchan,20
3,alanchanchn,25
4,alan_chan,30
5,alan_chan_chn,45
[root@server2 bin]# kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,alan,15
>2,alanchan,20
>3,alanchanchn,25
>4,alan_chan,30
>5,alan_chan_chn,45
>
- flink sql查詢結(jié)果
2)、csv格式文件示例
CREATE TABLE t2 (
`id` INT,
name string,
age BIGINT,
t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 't_kafkasource_t2',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'format' = 'json'
);
--------kafka關(guān)于主題和發(fā)送數(shù)據(jù)命令
kafka-topics.sh --delete --topic t_kafkasource_t2 --bootstrap-server server1:9092
kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafkasource_t2 --partitions 1 --replication-factor 1
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_t2
----- 測(cè)試數(shù)據(jù)
{ "id":"1" ,"name":"alan","age":"12" }
{ "id":"2" ,"name":"alanchan","age":"22" }
{ "id":"3" ,"name":"alanchanchan","age":"32" }
{ "id":"4" ,"name":"alan_chan","age":"42" }
{ "id":"5" ,"name":"alan_chan_chn","age":"52" }
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_t2
>{"data": [{ "id":"1" ,"name":"alan","age":"12" } ]}
>{ "id":"1" ,"name":"alan","age":"12" }
>{ "id":"2" ,"name":"alanchan","age":"22" }
>{ "id":"3" ,"name":"alanchanchan","age":"32" }
>{ "id":"4" ,"name":"alan_chan","age":"42" }
>{ "id":"5" ,"name":"alan_chan_chn","age":"52" }
>
- Flink sql 查詢結(jié)果
3、可用的元數(shù)據(jù)
以下的連接器元數(shù)據(jù)可以在表定義中通過(guò)元數(shù)據(jù)列的形式獲取。
R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫(xiě)的(W)。 只讀列必須聲明為 VIRTUAL 以在 INSERT INTO 操作中排除它們。
以下擴(kuò)展的 CREATE TABLE 示例展示了使用這些元數(shù)據(jù)字段的語(yǔ)法:
CREATE TABLE Alan_KafkaTable_1 (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
---------------------------------驗(yàn)證--------------------------------
Flink SQL> CREATE TABLE Alan_KafkaTable_1 (
> `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> `partition` BIGINT METADATA VIRTUAL,
> `offset` BIGINT METADATA VIRTUAL,
> `user_id` BIGINT,
> `item_id` BIGINT,
> `behavior` STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
Flink SQL> desc Alan_KafkaTable_1;
+------------+--------------+------+-----+---------------------------+-----------+
| name | type | null | key | extras | watermark |
+------------+--------------+------+-----+---------------------------+-----------+
| event_time | TIMESTAMP(3) | true | | METADATA FROM 'timestamp' | |
| partition | BIGINT | true | | METADATA VIRTUAL | |
| offset | BIGINT | true | | METADATA VIRTUAL | |
| user_id | BIGINT | true | | | |
| item_id | BIGINT | true | | | |
| behavior | STRING | true | | | |
+------------+--------------+------+-----+---------------------------+-----------+
6 rows in set
---kafka發(fā)送數(shù)據(jù)
[alanchan@server3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic user_behavior
>1,1001,login
>1,2001,p_read
----flink sql 查詢結(jié)果
Flink SQL> select * from Alan_KafkaTable_1;
2023-08-21 05:40:38,916 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-21 05:40:38,916 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-21 05:40:38,919 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server4:40896 of application 'application_1688448920799_0009'.
+----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
| op | event_time | partition | offset | user_id | item_id | behavior |
+----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
| +I | 2023-08-21 05:38:47.009 | 0 | 0 | 1 | 1001 | login |
| +I | 2023-08-21 05:39:03.994 | 0 | 1 | 1 | 2001 | p_read |
連接器可以讀出消息格式的元數(shù)據(jù)。格式元數(shù)據(jù)的配置鍵以 ‘value.’ 作為前綴。
以下示例展示了如何獲取 Kafka 和 Debezium 的元數(shù)據(jù)字段:
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
--------------------驗(yàn)證--------------------------------------
Flink SQL> CREATE TABLE Alan_KafkaTable_2 (
> `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
> `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
> `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
> `offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
> `user_id` BIGINT,
> `item_id` BIGINT,
> `behavior` STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior2',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'value.format' = 'debezium-json'
> );
[INFO] Execute statement succeed.
Flink SQL> desc Alan_KafkaTable_2;
+--------------+--------------+------+-----+------------------------------------------------+-----------+
| name | type | null | key | extras | watermark |
+--------------+--------------+------+-----+------------------------------------------------+-----------+
| event_time | TIMESTAMP(3) | true | | METADATA FROM 'value.source.timestamp' VIRTUAL | |
| origin_table | STRING | true | | METADATA FROM 'value.source.table' VIRTUAL | |
| partition_id | BIGINT | true | | METADATA FROM 'partition' VIRTUAL | |
| offset | BIGINT | true | | METADATA VIRTUAL | |
| user_id | BIGINT | true | | | |
| item_id | BIGINT | true | | | |
| behavior | STRING | true | | | |
+--------------+--------------+------+-----+------------------------------------------------+-----------+
7 rows in set
-----測(cè)試數(shù)據(jù)
{
"before": null,
"after": {
"user_id": 1,
"item_id": "1001",
"behavior": "login"
},
"source": {"version": "1.13.5"},
"op": "c",
"ts_ms": 1692593500222,
"transaction": null
}
{
"before": {
"user_id": 1,
"item_id": "1001",
"behavior": "login"
},
"after": {
"user_id": 1,
"item_id": "2001",
"behavior": "p_read"
},
"source": {"version": "1.13.5"},
"op": "u",
"ts_ms": 1692593502242,
"transaction": null
}
---kafka發(fā)送數(shù)據(jù)
[alanchan@server3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic user_behavior2
>{ "before": null, "after": { "user_id": 1, "item_id": "1001", "behavior": "login" }, "source": {"version": "1.13.5"}, "op": "c", "ts_ms": 1692593500222, "transaction": null}
>{ "before": { "user_id": 1, "item_id": "1001", "behavior": "login" }, "after": { "user_id": 1, "item_id": "2001", "behavior": "p_read" }, "source": {"version": "1.13.5"}, "op": "u", "ts_ms": 1692593502242, "transaction": null}
>
----flink sql 查詢結(jié)果,沒(méi)有獲取到debezium的元數(shù)據(jù),是因?yàn)闇y(cè)試環(huán)境沒(méi)有debezium運(yùn)行環(huán)境,由于該種方式基本上被Flink CDC取代,故不做深入的驗(yàn)證
Flink SQL> select * from Alan_KafkaTable_2;
2023-08-21 06:43:59,445 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-21 06:43:59,445 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-21 06:43:59,449 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server4:40896 of application 'application_1688448920799_0009'.
+----+-------------------------+--------------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
| op | event_time | origin_table | partition_id | offset | user_id | item_id | behavior |
+----+-------------------------+--------------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
| +I | (NULL) | (NULL) | 0 | 0 | 1 | 1001 | login |
| -U | (NULL) | (NULL) | 0 | 1 | 1 | 1001 | login |
| +U | (NULL) | (NULL) | 0 | 1 | 1 | 2001 | p_read |
4、連接器參數(shù)
上圖中的格式請(qǐng)參考鏈接:35、Flink 的JSON Format
5、特性
1)、消息鍵(Key)與消息體(Value)的格式
Kafka 消息的消息鍵和消息體部分都可以使用某種 35、Flink 的JSON Format 來(lái)序列化或反序列化成二進(jìn)制數(shù)據(jù)。
1、消息體格式
由于 Kafka 消息中消息鍵是可選的,以下語(yǔ)句將使用消息體格式讀取和寫(xiě)入消息,但不使用消息鍵格式。 ‘format’ 選項(xiàng)與 ‘value.format’ 意義相同。 所有的格式配置使用格式識(shí)別符作為前綴。
CREATE TABLE KafkaTable (
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'format' = 'json',
'json.ignore-parse-errors' = 'true'
)
消息體格式將配置為以下的數(shù)據(jù)類(lèi)型:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
2、消息鍵和消息體格式
以下示例展示了如何配置和使用消息鍵和消息體格式。 格式配置使用 ‘key’ 或 ‘value’ 加上格式識(shí)別符作為前綴。
CREATE TABLE KafkaTable (
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.fields' = 'user_id;item_id',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'ALL'
)
消息鍵格式包含了在 ‘key.fields’ 中列出的字段(使用 ‘;’ 分隔)和字段順序。 因此將配置為以下的數(shù)據(jù)類(lèi)型:
ROW<`user_id` BIGINT, `item_id` BIGINT>
由于消息體格式配置為 ‘value.fields-include’ = ‘ALL’,所以消息鍵字段也會(huì)出現(xiàn)在消息體格式的數(shù)據(jù)類(lèi)型中:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
3、重名的格式字段
如果消息鍵字段和消息體字段重名,連接器無(wú)法根據(jù)表結(jié)構(gòu)信息將這些列區(qū)分開(kāi)。 ‘key.fields-prefix’ 配置項(xiàng)可以在表結(jié)構(gòu)中為消息鍵字段指定一個(gè)唯一名稱(chēng),并在配置消息鍵格式的時(shí)候保留原名。
以下示例展示了在消息鍵和消息體中同時(shí)包含 version 字段的情況:
CREATE TABLE KafkaTable (
`k_version` INT,
`k_user_id` BIGINT,
`k_item_id` BIGINT,
`version` INT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.fields-prefix' = 'k_',
'key.fields' = 'k_version;k_user_id;k_item_id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
)
消息體格式必須配置為 ‘EXCEPT_KEY’ 模式。格式將被配置為以下的數(shù)據(jù)類(lèi)型:
# 消息鍵格式:
ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT>
# 消息體格式:
ROW<`version` INT, `behavior` STRING>
2)、Topic 和 Partition 的探測(cè)
topic 和 topic-pattern 配置項(xiàng)決定了 source 消費(fèi)的 topic 或 topic 的匹配規(guī)則。topic 配置項(xiàng)可接受使用分號(hào)間隔的 topic 列表,例如 topic-1;topic-2。 topic-pattern 配置項(xiàng)使用正則表達(dá)式來(lái)探測(cè)匹配的 topic。例如 topic-pattern 設(shè)置為 test-topic-[0-9],則在作業(yè)啟動(dòng)時(shí),所有匹配該正則表達(dá)式的 topic(以 test-topic- 開(kāi)頭,以一位數(shù)字結(jié)尾)都將被 consumer 訂閱。
為允許 consumer 在作業(yè)啟動(dòng)之后探測(cè)到動(dòng)態(tài)創(chuàng)建的 topic,請(qǐng)將 scan.topic-partition-discovery.interval 配置為一個(gè)非負(fù)值。這將使 consumer 能夠探測(cè)匹配名稱(chēng)規(guī)則的 topic 中新的 partition。
請(qǐng)參閱 37、Flink 的Apache Kafka connector 以獲取更多關(guān)于 topic 和 partition 探測(cè)的信息。
topic 列表和 topic 匹配規(guī)則只適用于 source。對(duì)于 sink 端,F(xiàn)link 目前只支持單一 topic。
3)、起始消費(fèi)位點(diǎn)
scan.startup.mode 配置項(xiàng)決定了 Kafka consumer 的啟動(dòng)模式。有效值為:
- group-offsets:從 Zookeeper/Kafka 中某個(gè)指定的消費(fèi)組已提交的偏移量開(kāi)始。
- earliest-offset:從可能的最早偏移量開(kāi)始。
- latest-offset:從最末尾偏移量開(kāi)始。
- timestamp:從用戶為每個(gè) partition 指定的時(shí)間戳開(kāi)始。
- specific-offsets:從用戶為每個(gè) partition 指定的偏移量開(kāi)始。
默認(rèn)值 group-offsets 表示從 Zookeeper/Kafka 中最近一次已提交的偏移量開(kāi)始消費(fèi)。
如果使用了 timestamp,必須使用另外一個(gè)配置項(xiàng) scan.startup.timestamp-millis 來(lái)指定一個(gè)從格林尼治標(biāo)準(zhǔn)時(shí)間 1970 年 1 月 1 日 00:00:00.000 開(kāi)始計(jì)算的毫秒單位時(shí)間戳作為起始時(shí)間。
如果使用了 specific-offsets,必須使用另外一個(gè)配置項(xiàng) scan.startup.specific-offsets 來(lái)為每個(gè) partition 指定起始偏移量, 例如,選項(xiàng)值 partition:0,offset:42;partition:1,offset:300 表示 partition 0 從偏移量 42 開(kāi)始,partition 1 從偏移量 300 開(kāi)始。
4)、Bounded Ending Position
配置選項(xiàng) scan.bounded.mode 指定 Kafka 使用者的有界模式。有效的枚舉為:
- “group-offsets”:以特定消費(fèi)者組的 ZooKeeper/Kafka 代理中的已提交偏移量為界。這是在給定分區(qū)的使用開(kāi)始時(shí)評(píng)估的。
- “l(fā)atest-offset”:以最新偏移量為界。這是在給定分區(qū)的使用開(kāi)始時(shí)評(píng)估的。
- “timestamp”:以用戶提供的時(shí)間戳為界。
- “specific-offsets”:以用戶為每個(gè)分區(qū)提供的特定偏移量為界。
如果未設(shè)置配置選項(xiàng)值 scan.bounded.mode,則默認(rèn)值為無(wú)界表。
如果指定了時(shí)間戳,則需要另一個(gè)配置選項(xiàng) scan.bounded.timestamp-millis 來(lái)指定自 1970 年 1 月 1 日 00:00:00.000 GMT 以來(lái)的特定有界時(shí)間戳(以毫秒為單位)。
如果指定了特定偏移量,則需要另一個(gè)配置選項(xiàng) scan.bounded.specific-offset 來(lái)為每個(gè)分區(qū)指定特定的有界偏移量,例如,選項(xiàng)值 partition:0,offset:42;partition:1,offset:300 表示分區(qū) 0 的偏移量為 42,分區(qū) 1 的偏移量為 300。如果未提供分區(qū)的偏移量,則不會(huì)從該分區(qū)消耗。
5)、CDC 變更日志(Changelog) Source
Flink 原生支持使用 Kafka 作為 CDC 變更日志(changelog) source。如果 Kafka topic 中的消息是通過(guò)變更數(shù)據(jù)捕獲(CDC)工具從其他數(shù)據(jù)庫(kù)捕獲的變更事件,則你可以使用 CDC 格式將消息解析為 Flink SQL 系統(tǒng)中的插入(INSERT)、更新(UPDATE)、刪除(DELETE)消息。
在許多情況下,變更日志(changelog) source 都是非常有用的功能,例如將數(shù)據(jù)庫(kù)中的增量數(shù)據(jù)同步到其他系統(tǒng),審核日志,數(shù)據(jù)庫(kù)的物化視圖,時(shí)態(tài)表關(guān)聯(lián)數(shù)據(jù)庫(kù)表的更改歷史等。
Flink 提供了幾種 CDC 格式:
- 38、Flink 的CDC 格式:debezium
- 39、Flink 的CDC 格式:canal
- 40、Flink 的CDC 格式:maxwell
6)、Sink 分區(qū)
配置項(xiàng) sink.partitioner 指定了從 Flink 分區(qū)到 Kafka 分區(qū)的映射關(guān)系。 默認(rèn)情況下,F(xiàn)link 使用 Kafka 默認(rèn)分區(qū)器 來(lái)對(duì)消息分區(qū)。默認(rèn)分區(qū)器對(duì)沒(méi)有消息鍵的消息使用 粘性分區(qū)策略(sticky partition strategy) 進(jìn)行分區(qū),對(duì)含有消息鍵的消息使用 murmur2 哈希算法計(jì)算分區(qū)。
為了控制數(shù)據(jù)行到分區(qū)的路由,也可以提供一個(gè)自定義的 sink 分區(qū)器。‘fixed’ 分區(qū)器會(huì)將同一個(gè) Flink 分區(qū)中的消息寫(xiě)入同一個(gè) Kafka 分區(qū),從而減少網(wǎng)絡(luò)連接的開(kāi)銷(xiāo)。
7)、一致性保證
默認(rèn)情況下,如果查詢?cè)?啟用 checkpoint 模式下執(zhí)行時(shí),Kafka sink 按照至少一次(at-lease-once)語(yǔ)義保證將數(shù)據(jù)寫(xiě)入到 Kafka topic 中。
當(dāng) Flink checkpoint 啟用時(shí),kafka 連接器可以提供精確一次(exactly-once)的語(yǔ)義保證。
除了啟用 Flink checkpoint,還可以通過(guò)傳入對(duì)應(yīng)的 sink.semantic 選項(xiàng)來(lái)選擇三種不同的運(yùn)行模式:
- none:Flink 不保證任何語(yǔ)義。已經(jīng)寫(xiě)出的記錄可能會(huì)丟失或重復(fù)。
- at-least-once (默認(rèn)設(shè)置):保證沒(méi)有記錄會(huì)丟失(但可能會(huì)重復(fù))。
- exactly-once:使用 Kafka 事務(wù)提供精確一次(exactly-once)語(yǔ)義。當(dāng)使用事務(wù)向 Kafka 寫(xiě)入數(shù)據(jù)時(shí),請(qǐng)將所有從 Kafka 中消費(fèi)記錄的應(yīng)用中的 isolation.level 配置項(xiàng)設(shè)置成實(shí)際所需的值(read_committed 或 read_uncommitted,后者為默認(rèn)值)。
請(qǐng)參閱 37、Flink 的Apache Kafka connector 以獲取更多關(guān)于語(yǔ)義保證的信息。
8)、Source 按分區(qū) Watermark
Flink 對(duì)于 Kafka 支持發(fā)送按分區(qū)的 watermark。Watermark 在 Kafka consumer 中生成。 按分區(qū) watermark 的合并方式和在流 shuffle 時(shí)合并 Watermark 的方式一致。 Source 輸出的 watermark 由讀取的分區(qū)中最小的 watermark 決定。 如果 topic 中的某些分區(qū)閑置,watermark 生成器將不會(huì)向前推進(jìn)。 你可以在表配置中設(shè)置 ‘table.exec.source.idle-timeout’ 選項(xiàng)來(lái)避免上述問(wèn)題。
請(qǐng)參閱 7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)) 策略 以獲取更多細(xì)節(jié)。
9)、安全
要啟用加密和認(rèn)證相關(guān)的安全配置,只需將安全配置加上 “properties.” 前綴配置在 Kafka 表上即可。下面的代碼片段展示了如何配置 Kafka 表以使用 PLAIN 作為 SASL 機(jī)制并提供 JAAS 配置:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)
另一個(gè)更復(fù)雜的例子,使用 SASL_SSL 作為安全協(xié)議并使用 SCRAM-SHA-256 作為 SASL 機(jī)制:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/* SSL 配置 */
/* 配置服務(wù)端提供的 truststore (CA 證書(shū)) 的路徑 */
'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/* 如果要求客戶端認(rèn)證,則需要配置 keystore (私鑰) 的路徑 */
'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* SASL 配置 */
/* 將 SASL 機(jī)制配置為 as SCRAM-SHA-256 */
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/* 配置 JAAS */
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
)
如果在作業(yè) JAR 中 Kafka 客戶端依賴的類(lèi)路徑被重置了(relocate class),登錄模塊(login module)的類(lèi)路徑可能會(huì)不同,因此請(qǐng)根據(jù)登錄模塊在 JAR 中實(shí)際的類(lèi)路徑來(lái)改寫(xiě)以上配置。例如在 SQL client JAR 中,Kafka client 依賴被重置在了 org.apache.flink.kafka.shaded.org.apache.kafka 路徑下, 因此 plain 登錄模塊的類(lèi)路徑應(yīng)寫(xiě)為 org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule。
關(guān)于安全配置的詳細(xì)描述,請(qǐng)參閱 Apache Kafka 文檔中的"安全"一節(jié)。
6、數(shù)據(jù)類(lèi)型映射
Kafka 將消息鍵值以二進(jìn)制進(jìn)行存儲(chǔ),因此 Kafka 并不存在 schema 或數(shù)據(jù)類(lèi)型。Kafka 消息使用格式配置進(jìn)行序列化和反序列化,例如 csv,json,avro。 因此,數(shù)據(jù)類(lèi)型映射取決于使用的格式。請(qǐng)參閱 格式 頁(yè)面以獲取更多細(xì)節(jié)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-692231.html
以上,簡(jiǎn)單的介紹了關(guān)于通過(guò)flink sql與kafka的進(jìn)行交互的內(nèi)容。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-692231.html
到了這里,關(guān)于16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!