一、Apache Kafka SQL 連接器
- Scan Source: Unbounded Sink: Streaming Append Mode
- Kafka 連接器提供從 Kafka topic 中消費(fèi)和寫入數(shù)據(jù)的能力。
二、依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.2-1.18</version>
</dependency>
三、創(chuàng)建Kafka 表
以下示例展示了如何創(chuàng)建 Kafka 表:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
四、可用的元數(shù)據(jù)
以下的連接器元數(shù)據(jù)可以在表定義中通過元數(shù)據(jù)列的形式獲取。
R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫的(W)。 只讀列必須聲明為 VIRTUAL 以在 INSERT INTO 操作中排除它們。
鍵 | 數(shù)據(jù)類型 | 描述 | R/W |
---|---|---|---|
topic | STRING NOT NULL | Kafka 記錄的 Topic 名。 | R |
partition | INT NOT NULL | Kafka 記錄的 partition ID。 | R |
headers | MAP NOT NULL | 二進(jìn)制 Map 類型的 Kafka 記錄頭(Header)。 | R/W |
leader-epoch | INT NULL | Kafka 記錄的 Leader epoch(如果可用)。 | R |
offset | BIGINT NOT NULL | Kafka 記錄在 partition 中的 offset。 | R |
timestamp | TIMESTAMP_LTZ(3) NOT NULL | Kafka 記錄的時(shí)間戳。 | R/W |
timestamp-type | STRING NOT NULL | Kafka 記錄的時(shí)間戳類型??赡艿念愋陀?“NoTimestampType”, “CreateTime”(會在寫入元數(shù)據(jù)時(shí)設(shè)置),或 “LogAppendTime”。 | R |
以下擴(kuò)展的 CREATE TABLE 示例展示了使用這些元數(shù)據(jù)字段的語法:
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
格式元信息
連接器可以讀出消息格式的元數(shù)據(jù)。格式元數(shù)據(jù)的配置鍵以 ‘value.’ 作為前綴。
以下示例展示了如何獲取 Kafka 和 Debezium 的元數(shù)據(jù)字段:
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
五、連接器參數(shù)
參數(shù) | 是否必選 | 默認(rèn)值 | 數(shù)據(jù)類型 | 描述 |
---|---|---|---|---|
connector | 必選 | (無) | String | 指定使用的連接器,Kafka 連接器使用 ‘kafka’。 |
topic | required for sink | (無) | String | 當(dāng)表用作 source 時(shí)讀取數(shù)據(jù)的 topic 名。亦支持用分號間隔的 topic 列表,如 ‘topic-1;topic-2’。注意,對 source 表而言,‘topic’ 和 ‘topic-pattern’ 兩個(gè)選項(xiàng)只能使用其中一個(gè)。當(dāng)表被用作 sink 時(shí),該配置表示寫入的 topic 名。注意 sink 表不支持 topic 列表。 |
topic-pattern | 可選 | (無) | String | 匹配讀取 topic 名稱的正則表達(dá)式。在作業(yè)開始運(yùn)行時(shí),所有匹配該正則表達(dá)式的 topic 都將被 Kafka consumer 訂閱。注意,對 source 表而言,‘topic’ 和 ‘topic-pattern’ 兩個(gè)選項(xiàng)只能使用其中一個(gè)。 |
properties.bootstrap.servers | 必選 | (無) | String | 逗號分隔的 Kafka broker 列表。 |
properties.group.id | 對 source 可選,不適用于 sink | (無) | String | Kafka source 的消費(fèi)組 id。如果未指定消費(fèi)組 ID,則會使用自動生成的 “KafkaSource-{tableIdentifier}” 作為消費(fèi)組 ID。 |
properties.* | 可選 | (無) | String | 可以設(shè)置和傳遞任意 Kafka 的配置項(xiàng)。后綴名必須匹配在 Kafka 配置文檔 中定義的配置鍵。Flink 將移除 “properties.” 配置鍵前綴并將變換后的配置鍵和值傳入底層的 Kafka 客戶端。例如,你可以通過 ‘properties.allow.auto.create.topics’ = ‘false’ 來禁用 topic 的自動創(chuàng)建。但是某些配置項(xiàng)不支持進(jìn)行配置,因?yàn)?Flink 會覆蓋這些配置,例如 ‘key.deserializer’ 和 ‘value.deserializer’。 |
format | 必選 | (無) | String | 用來序列化或反序列化 Kafka 消息的格式。 請參閱 格式 頁面以獲取更多關(guān)于格式的細(xì)節(jié)和相關(guān)配置項(xiàng)。 注意:該配置項(xiàng)和 ‘value.format’ 二者必需其一。 |
key.format | 可選 | (無) | String | 用來序列化和反序列化 Kafka 消息鍵(Key)的格式。 請參閱 格式 頁面以獲取更多關(guān)于格式的細(xì)節(jié)和相關(guān)配置項(xiàng)。 注意:如果定義了鍵格式,則配置項(xiàng) ‘key.fields’ 也是必需的。 否則 Kafka 記錄將使用空值作為鍵。 |
key.fields | 可選 | [] | List | 表結(jié)構(gòu)中用來配置消息鍵(Key)格式數(shù)據(jù)類型的字段列表。默認(rèn)情況下該列表為空,因此消息鍵沒有定義。 列表格式為 ‘field1;field2’。 |
key.fields-prefix | 可選 | (無) | String | 為所有消息鍵(Key)格式字段指定自定義前綴,以避免與消息體(Value)格式字段重名。默認(rèn)情況下前綴為空。 如果定義了前綴,表結(jié)構(gòu)和配置項(xiàng) ‘key.fields’ 都需要使用帶前綴的名稱。 當(dāng)構(gòu)建消息鍵格式字段時(shí),前綴會被移除,消息鍵格式將會使用無前綴的名稱。 請注意該配置項(xiàng)要求必須將 ‘value.fields-include’ 配置為 ‘EXCEPT_KEY’。 |
value.format | 必選 | (無) | String | 序列化和反序列化 Kafka 消息體時(shí)使用的格式。 請參閱 格式 頁面以獲取更多關(guān)于格式的細(xì)節(jié)和相關(guān)配置項(xiàng)。 注意:該配置項(xiàng)和 ‘format’ 二者必需其一。 |
value.fields-include | 可選 | ALL | 枚舉類型,可選值:[ALL, EXCEPT_KEY] | 定義消息體(Value)格式如何處理消息鍵(Key)字段的策略。 默認(rèn)情況下,表結(jié)構(gòu)中 ‘ALL’ 即所有的字段都會包含在消息體格式中,即消息鍵字段在消息鍵和消息體格式中都會出現(xiàn)。 |
scan.startup.mode | 可選 | group-offsets | Enum | Kafka consumer 的啟動模式。有效值為:‘earliest-offset’,‘latest-offset’,‘group-offsets’,‘timestamp’ 和 ‘specific-offsets’。 |
scan.startup.specific-offsets | 可選 | (無) | String | 在使用 ‘specific-offsets’ 啟動模式時(shí)為每個(gè) partition 指定 offset,例如 ‘partition:0,offset:42;partition:1,offset:300’。 |
scan.startup.timestamp-millis | 可選 | (無) | Long | 在使用 ‘timestamp’ 啟動模式時(shí)指定啟動的時(shí)間戳(單位毫秒)。 |
scan.bounded.mode | optional | optional | unbounded | Enum |
scan.bounded.specific-offsets | optional | yes | (none) | String |
scan.bounded.timestamp-millis | optional | yes | (none) | Long |
scan.topic-partition-discovery.interval | 可選 | (無) | Duration | Consumer 定期探測動態(tài)創(chuàng)建的 Kafka topic 和 partition 的時(shí)間間隔。 |
sink.partitioner | 可選 | ‘default’ | String | Flink partition 到 Kafka partition 的分區(qū)映射關(guān)系,可選值有:default:使用 Kafka 默認(rèn)的分區(qū)器對消息進(jìn)行分區(qū)。fixed:每個(gè) Flink partition 最終對應(yīng)最多一個(gè) Kafka partition。round-robin:Flink partition 按輪循(round-robin)的模式對應(yīng)到 Kafka partition。只有當(dāng)未指定消息的消息鍵時(shí)生效。自定義 FlinkKafkaPartitioner 的子類:例如 ‘org.mycompany.MyPartitioner’。 |
sink.semantic | 可選 | at-least-once | String | 定義 Kafka sink 的語義。有效值為 ‘a(chǎn)t-least-once’,‘exactly-once’ 和 ‘none’。 |
sink.parallelism | 可選 | (無) | Integer | 定義 Kafka sink 算子的并行度。默認(rèn)情況下,并行度由框架定義為與上游串聯(lián)的算子相同。 |
六、特性
消息鍵(Key)與消息體(Value)的格式
Kafka 消息的消息鍵和消息體部分都可以使用某種 格式 來序列化或反序列化成二進(jìn)制數(shù)據(jù)。
消息體格式
由于 Kafka 消息中消息鍵是可選的,以下語句將使用消息體格式讀取和寫入消息,但不使用消息鍵格式。 ‘format’ 選項(xiàng)與 ‘value.format’ 意義相同。 所有的格式配置使用格式識別符作為前綴。
CREATE TABLE KafkaTable (
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'format' = 'json',
'json.ignore-parse-errors' = 'true'
)
消息體格式將配置為以下的數(shù)據(jù)類型:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
消息鍵和消息體格式
以下示例展示了如何配置和使用消息鍵和消息體格式。 格式配置使用 ‘key’ 或 ‘value’ 加上格式識別符作為前綴。
CREATE TABLE KafkaTable (
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.fields' = 'user_id;item_id',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'ALL'
)
消息鍵格式包含了在 ‘key.fields’ 中列出的字段(使用 ‘;’ 分隔)和字段順序。 因此將配置為以下的數(shù)據(jù)類型:
ROW<`user_id` BIGINT, `item_id` BIGINT>
由于消息體格式配置為 ‘value.fields-include’ = ‘ALL’,所以消息鍵字段也會出現(xiàn)在消息體格式的數(shù)據(jù)類型中:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
重名的格式字段
如果消息鍵字段和消息體字段重名,連接器無法根據(jù)表結(jié)構(gòu)信息將這些列區(qū)分開。 ‘key.fields-prefix’ 配置項(xiàng)可以在表結(jié)構(gòu)中為消息鍵字段指定一個(gè)唯一名稱,并在配置消息鍵格式的時(shí)候保留原名。
以下示例展示了在消息鍵和消息體中同時(shí)包含 version 字段的情況:
CREATE TABLE KafkaTable (
`k_version` INT,
`k_user_id` BIGINT,
`k_item_id` BIGINT,
`version` INT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.fields-prefix' = 'k_',
'key.fields' = 'k_version;k_user_id;k_item_id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
)
消息體格式必須配置為 ‘EXCEPT_KEY’ 模式。格式將被配置為以下的數(shù)據(jù)類型:
消息鍵格式:
ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT>
消息體格式:
ROW<`version` INT, `behavior` STRING>
七、Topic 和 Partition 的探測
topic 和 topic-pattern 配置項(xiàng)決定了 source 消費(fèi)的 topic 或 topic 的匹配規(guī)則。topic 配置項(xiàng)可接受使用分號間隔的 topic 列表,例如 topic-1;topic-2。 topic-pattern 配置項(xiàng)使用正則表達(dá)式來探測匹配的 topic。例如 topic-pattern 設(shè)置為 test-topic-[0-9],則在作業(yè)啟動時(shí),所有匹配該正則表達(dá)式的 topic(以 test-topic- 開頭,以一位數(shù)字結(jié)尾)都將被 consumer 訂閱。
為允許 consumer 在作業(yè)啟動之后探測到動態(tài)創(chuàng)建的 topic,請將 scan.topic-partition-discovery.interval 配置為一個(gè)非負(fù)值。這將使 consumer 能夠探測匹配名稱規(guī)則的 topic 中新的 partition。
注意 topic 列表和 topic 匹配規(guī)則只適用于 source。對于 sink 端,F(xiàn)link 目前只支持單一 topic。
八、起始消費(fèi)位點(diǎn)
scan.startup.mode 配置項(xiàng)決定了 Kafka consumer 的啟動模式。有效值為:
- group-offsets:從 Zookeeper/Kafka 中某個(gè)指定的消費(fèi)組已提交的偏移量開始。
- earliest-offset:從可能的最早偏移量開始。
- latest-offset:從最末尾偏移量開始。
- timestamp:從用戶為每個(gè) partition 指定的時(shí)間戳開始。
- specific-offsets:從用戶為每個(gè) partition 指定的偏移量開始。
默認(rèn)值 group-offsets 表示從 Zookeeper/Kafka 中最近一次已提交的偏移量開始消費(fèi)。
如果使用了 timestamp,必須使用另外一個(gè)配置項(xiàng) scan.startup.timestamp-millis 來指定一個(gè)從格林尼治標(biāo)準(zhǔn)時(shí)間 1970 年 1 月 1 日 00:00:00.000 開始計(jì)算的毫秒單位時(shí)間戳作為起始時(shí)間。
如果使用了 specific-offsets,必須使用另外一個(gè)配置項(xiàng) scan.startup.specific-offsets 來為每個(gè) partition 指定起始偏移量, 例如,選項(xiàng)值 partition:0,offset:42;partition:1,offset:300 表示 partition 0 從偏移量 42 開始,partition 1 從偏移量 300 開始。
九、有界結(jié)束位置
配置選項(xiàng) scan.bounded.mode 指定 Kafka 消費(fèi)者的有界模式。有效的枚舉是:
-
group-offsets
:以特定消費(fèi)者組的 ZooKeeper / Kafka 代理中提交的偏移量為界。這是在給定分區(qū)的消費(fèi)開始時(shí)進(jìn)行評估的。 -
latest-offset
:以最新偏移量為界。這是在給定分區(qū)的消費(fèi)開始時(shí)進(jìn)行評估的。 -
timestamp
:以用戶提供的時(shí)間戳為界。 -
specific-offsets
:以用戶為每個(gè)分區(qū)提供的特定偏移量為界。
如果未設(shè)置配置選項(xiàng)值 scan.bounded.mode ,則默認(rèn)為無界表。
如果指定了時(shí)間戳,則需要另一個(gè)配置選項(xiàng) scan.bounded.timestamp-millis 來指定自 1970 年 1 月 1 日 00:00:00.000 GMT 以來的特定有界時(shí)間戳(以毫秒為單位)。
如果指定了 Specific-offsets,則需要另一個(gè)配置選項(xiàng) scan.bounded.specific-offsets 來為每個(gè)分區(qū)指定特定的有界偏移量,例如選項(xiàng)值partition:0,offset:42;partition:1,offset:300表示分區(qū)0的偏移量42和分區(qū)1的偏移量300。如果未提供分區(qū)的偏移量,則不會從該分區(qū)消耗數(shù)據(jù)。
十、CDC 變更日志(Changelog) Source
Flink 原生支持使用 Kafka 作為 CDC 變更日志(changelog) source。如果 Kafka topic 中的消息是通過變更數(shù)據(jù)捕獲(CDC)工具從其他數(shù)據(jù)庫捕獲的變更事件,則你可以使用 CDC 格式將消息解析為 Flink SQL 系統(tǒng)中的插入(INSERT)、更新(UPDATE)、刪除(DELETE)消息。
在許多情況下,變更日志(changelog) source 都是非常有用的功能,例如將數(shù)據(jù)庫中的增量數(shù)據(jù)同步到其他系統(tǒng),審核日志,數(shù)據(jù)庫的物化視圖,時(shí)態(tài)表關(guān)聯(lián)數(shù)據(jù)庫表的更改歷史等。
Flink 提供了幾種 CDC 格式:
- debezium
- canal
- maxwell
十一、Sink 分區(qū)
配置項(xiàng) sink.partitioner 指定了從 Flink 分區(qū)到 Kafka 分區(qū)的映射關(guān)系。 默認(rèn)情況下,F(xiàn)link 使用 Kafka 默認(rèn)分區(qū)器 來對消息分區(qū)。默認(rèn)分區(qū)器對沒有消息鍵的消息使用 粘性分區(qū)策略(sticky partition strategy) 進(jìn)行分區(qū),對含有消息鍵的消息使用 murmur2 哈希算法計(jì)算分區(qū)。
為了控制數(shù)據(jù)行到分區(qū)的路由,也可以提供一個(gè)自定義的 sink 分區(qū)器?!甪ixed’ 分區(qū)器會將同一個(gè) Flink 分區(qū)中的消息寫入同一個(gè) Kafka 分區(qū),從而減少網(wǎng)絡(luò)連接的開銷。
十二、一致性保證
默認(rèn)情況下,如果查詢在 啟用 checkpoint 模式下執(zhí)行時(shí),Kafka sink 按照至少一次(at-lease-once)語義保證將數(shù)據(jù)寫入到 Kafka topic 中。
當(dāng) Flink checkpoint 啟用時(shí),kafka 連接器可以提供精確一次(exactly-once)的語義保證。
除了啟用 Flink checkpoint,還可以通過傳入對應(yīng)的 sink.semantic 選項(xiàng)來選擇三種不同的運(yùn)行模式:
- none:Flink 不保證任何語義。已經(jīng)寫出的記錄可能會丟失或重復(fù)。
- at-least-once (默認(rèn)設(shè)置):保證沒有記錄會丟失(但可能會重復(fù))。
- exactly-once:使用 Kafka 事務(wù)提供精確一次(exactly-once)語義。當(dāng)使用事務(wù)向 Kafka 寫入數(shù)據(jù)時(shí),請將所有從 Kafka 中消費(fèi)記錄的應(yīng)用中的 isolation.level 配置項(xiàng)設(shè)置成實(shí)際所需的值(read_committed 或 read_uncommitted,后者為默認(rèn)值)。
十三、Source 按分區(qū) Watermark
- Flink 對于 Kafka 支持發(fā)送按分區(qū)的 watermark。Watermark 在 Kafka consumer 中生成。
- 按分區(qū) watermark 的合并方式和在流 shuffle 時(shí)合并 Watermark 的方式一致。 Source 輸出的 watermark 由讀取的分區(qū)中最小的 watermark 決定。
- 如果 topic 中的某些分區(qū)閑置,watermark 生成器將不會向前推進(jìn)。 你可以在表配置中設(shè)置 ‘table.exec.source.idle-timeout’ 選項(xiàng)來避免上述問題。
十四、安全
要啟用加密和認(rèn)證相關(guān)的安全配置,只需將安全配置加上 “properties.” 前綴配置在 Kafka 表上即可。下面的代碼片段展示了如何配置 Kafka 表以使用 PLAIN 作為 SASL 機(jī)制并提供 JAAS 配置:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)
另一個(gè)更復(fù)雜的例子,使用 SASL_SSL 作為安全協(xié)議并使用 SCRAM-SHA-256 作為 SASL 機(jī)制:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/* SSL 配置 */
/* 配置服務(wù)端提供的 truststore (CA 證書) 的路徑 */
'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/* 如果要求客戶端認(rèn)證,則需要配置 keystore (私鑰) 的路徑 */
'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* SASL 配置 */
/* 將 SASL 機(jī)制配置為 as SCRAM-SHA-256 */
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/* 配置 JAAS */
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
)
如果在作業(yè) JAR 中 Kafka 客戶端依賴的類路徑被重置了(relocate class),登錄模塊(login module)的類路徑可能會不同,因此請根據(jù)登錄模塊在 JAR 中實(shí)際的類路徑來改寫以上配置。例如在 SQL client JAR 中,Kafka client 依賴被重置在了 org.apache.flink.kafka.shaded.org.apache.kafka 路徑下, 因此 plain 登錄模塊的類路徑應(yīng)寫為 org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule。文章來源:http://www.zghlxwxcb.cn/news/detail-791150.html
十五、數(shù)據(jù)類型映射
Kafka 將消息鍵值以二進(jìn)制進(jìn)行存儲,因此 Kafka 并不存在 schema 或數(shù)據(jù)類型。Kafka 消息使用格式配置進(jìn)行序列化和反序列化,例如 csv,json,avro。 因此,數(shù)據(jù)類型映射取決于使用的格式。文章來源地址http://www.zghlxwxcb.cn/news/detail-791150.html
到了這里,關(guān)于Flink系列之:Apache Kafka SQL 連接器的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!