国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)

這篇具有很好參考價(jià)值的文章主要介紹了16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Flink 系列文章

一、Flink 專(zhuān)欄

Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫(kù)、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開(kāi)發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專(zhuān)欄

Flink 示例專(zhuān)欄是 Flink 專(zhuān)欄的輔助說(shuō)明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專(zhuān)欄不再分目錄,通過(guò)鏈接即可看出介紹的內(nèi)容。

兩專(zhuān)欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文簡(jiǎn)單的介紹了關(guān)于通過(guò)flink sql與kafka的進(jìn)行交互的內(nèi)容及相關(guān)示例。
本文依賴環(huán)境是hadoop、kafka環(huán)境好用,如果是ha環(huán)境則需要zookeeper的環(huán)境。

一、Table & SQL Connectors 示例:Apache Kafka

1、maven依賴(java編碼依賴)

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>1.17.1</version>
</dependency>

2、創(chuàng)建 Kafka 表

以下示例展示了如何創(chuàng)建 Kafka 表,分別使用了csv和json格式文件:

1)、csv格式文件示例

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)


CREATE TABLE t1 (
    `id` INT,
    name STRING,
    age BIGINT,
    t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 't_kafkasource',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
    'format' = 'csv'
);

------kafka關(guān)于主題和發(fā)送數(shù)據(jù)命令
kafka-topics.sh --delete --topic t_kafkasource --bootstrap-server server1:9092
kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafkasource --partitions 1 --replication-factor 1
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource

--測(cè)試數(shù)據(jù)
1,alan,15
2,alanchan,20
3,alanchanchn,25
4,alan_chan,30
5,alan_chan_chn,45
 
[root@server2 bin]# kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,alan,15
>2,alanchan,20
>3,alanchanchn,25
>4,alan_chan,30
>5,alan_chan_chn,45
>

  • flink sql查詢結(jié)果
    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3),# Flink專(zhuān)欄,flink,sql,flink 流批一體化,flink sql,flink kafka,flink 實(shí)時(shí)計(jì)算,flink connector

2)、csv格式文件示例

CREATE TABLE t2 (
    `id` INT,
    name string,
    age BIGINT,
    t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 't_kafkasource_t2',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
    'format' = 'json'
);

--------kafka關(guān)于主題和發(fā)送數(shù)據(jù)命令
kafka-topics.sh --delete --topic t_kafkasource_t2 --bootstrap-server server1:9092
kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafkasource_t2 --partitions 1 --replication-factor 1
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_t2


----- 測(cè)試數(shù)據(jù)
{ "id":"1" ,"name":"alan","age":"12" } 
{ "id":"2" ,"name":"alanchan","age":"22" } 
{ "id":"3" ,"name":"alanchanchan","age":"32" } 
{ "id":"4" ,"name":"alan_chan","age":"42" } 
{ "id":"5" ,"name":"alan_chan_chn","age":"52" } 


[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_t2
>{"data": [{ "id":"1" ,"name":"alan","age":"12" } ]}
>{ "id":"1" ,"name":"alan","age":"12" }
>{ "id":"2" ,"name":"alanchan","age":"22" }
>{ "id":"3" ,"name":"alanchanchan","age":"32" } 
>{ "id":"4" ,"name":"alan_chan","age":"42" } 
>{ "id":"5" ,"name":"alan_chan_chn","age":"52" } 
>

  • Flink sql 查詢結(jié)果
    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3),# Flink專(zhuān)欄,flink,sql,flink 流批一體化,flink sql,flink kafka,flink 實(shí)時(shí)計(jì)算,flink connector

3、可用的元數(shù)據(jù)

以下的連接器元數(shù)據(jù)可以在表定義中通過(guò)元數(shù)據(jù)列的形式獲取。

R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫(xiě)的(W)。 只讀列必須聲明為 VIRTUAL 以在 INSERT INTO 操作中排除它們。
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3),# Flink專(zhuān)欄,flink,sql,flink 流批一體化,flink sql,flink kafka,flink 實(shí)時(shí)計(jì)算,flink connector
以下擴(kuò)展的 CREATE TABLE 示例展示了使用這些元數(shù)據(jù)字段的語(yǔ)法:

CREATE TABLE Alan_KafkaTable_1 (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);


---------------------------------驗(yàn)證--------------------------------
Flink SQL> CREATE TABLE Alan_KafkaTable_1 (
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.

Flink SQL> desc Alan_KafkaTable_1;
+------------+--------------+------+-----+---------------------------+-----------+
|       name |         type | null | key |                    extras | watermark |
+------------+--------------+------+-----+---------------------------+-----------+
| event_time | TIMESTAMP(3) | true |     | METADATA FROM 'timestamp' |           |
|  partition |       BIGINT | true |     |          METADATA VIRTUAL |           |
|     offset |       BIGINT | true |     |          METADATA VIRTUAL |           |
|    user_id |       BIGINT | true |     |                           |           |
|    item_id |       BIGINT | true |     |                           |           |
|   behavior |       STRING | true |     |                           |           |
+------------+--------------+------+-----+---------------------------+-----------+
6 rows in set
---kafka發(fā)送數(shù)據(jù)
[alanchan@server3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic user_behavior
>1,1001,login
>1,2001,p_read

----flink sql 查詢結(jié)果
Flink SQL> select * from Alan_KafkaTable_1;
2023-08-21 05:40:38,916 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-21 05:40:38,916 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-21 05:40:38,919 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface server4:40896 of application 'application_1688448920799_0009'.
+----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
| op |              event_time |            partition |               offset |              user_id |              item_id |                       behavior |
+----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
| +I | 2023-08-21 05:38:47.009 |                    0 |                    0 |                    1 |                 1001 |                          login |
| +I | 2023-08-21 05:39:03.994 |                    0 |                    1 |                    1 |                 2001 |                         p_read |


連接器可以讀出消息格式的元數(shù)據(jù)。格式元數(shù)據(jù)的配置鍵以 ‘value.’ 作為前綴。

以下示例展示了如何獲取 Kafka 和 Debezium 的元數(shù)據(jù)字段:

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
  `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
  `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector
  `offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

--------------------驗(yàn)證--------------------------------------
Flink SQL> CREATE TABLE Alan_KafkaTable_2 (
>   `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
>   `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
>   `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector
>   `offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior2',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'value.format' = 'debezium-json'
> );
[INFO] Execute statement succeed.

Flink SQL> desc Alan_KafkaTable_2;
+--------------+--------------+------+-----+------------------------------------------------+-----------+
|         name |         type | null | key |                                         extras | watermark |
+--------------+--------------+------+-----+------------------------------------------------+-----------+
|   event_time | TIMESTAMP(3) | true |     | METADATA FROM 'value.source.timestamp' VIRTUAL |           |
| origin_table |       STRING | true |     |     METADATA FROM 'value.source.table' VIRTUAL |           |
| partition_id |       BIGINT | true |     |              METADATA FROM 'partition' VIRTUAL |           |
|       offset |       BIGINT | true |     |                               METADATA VIRTUAL |           |
|      user_id |       BIGINT | true |     |                                                |           |
|      item_id |       BIGINT | true |     |                                                |           |
|     behavior |       STRING | true |     |                                                |           |
+--------------+--------------+------+-----+------------------------------------------------+-----------+
7 rows in set

-----測(cè)試數(shù)據(jù)
{
  "before": null,
  "after": {
    "user_id": 1,
    "item_id": "1001",
    "behavior": "login"
  },
  "source": {"version": "1.13.5"},
  "op": "c",
  "ts_ms": 1692593500222,
  "transaction": null
}

{
  "before": {
    "user_id": 1,
    "item_id": "1001",
    "behavior": "login"
  },
  "after": {
    "user_id": 1,
    "item_id": "2001",
    "behavior": "p_read"
  },
  "source": {"version": "1.13.5"},
  "op": "u",
  "ts_ms": 1692593502242,
  "transaction": null
}

---kafka發(fā)送數(shù)據(jù)
[alanchan@server3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic user_behavior2
>{  "before": null,  "after": {    "user_id": 1,    "item_id": "1001",    "behavior": "login"  },  "source": {"version": "1.13.5"},  "op": "c",  "ts_ms": 1692593500222,  "transaction": null}
>{  "before": {    "user_id": 1,    "item_id": "1001",    "behavior": "login"  },  "after": {    "user_id": 1,    "item_id": "2001",    "behavior": "p_read"  },  "source": {"version": "1.13.5"},  "op": "u",  "ts_ms": 1692593502242,  "transaction": null}
>


----flink sql 查詢結(jié)果,沒(méi)有獲取到debezium的元數(shù)據(jù),是因?yàn)闇y(cè)試環(huán)境沒(méi)有debezium運(yùn)行環(huán)境,由于該種方式基本上被Flink CDC取代,故不做深入的驗(yàn)證
Flink SQL> select * from Alan_KafkaTable_2;
2023-08-21 06:43:59,445 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-21 06:43:59,445 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-21 06:43:59,449 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface server4:40896 of application 'application_1688448920799_0009'.
+----+-------------------------+--------------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
| op |              event_time |                   origin_table |         partition_id |               offset |              user_id |              item_id |                       behavior |
+----+-------------------------+--------------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
| +I |                  (NULL) |                         (NULL) |                    0 |                    0 |                    1 |                 1001 |                          login |
| -U |                  (NULL) |                         (NULL) |                    0 |                    1 |                    1 |                 1001 |                          login |
| +U |                  (NULL) |                         (NULL) |                    0 |                    1 |                    1 |                 2001 |                         p_read |

4、連接器參數(shù)

16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3),# Flink專(zhuān)欄,flink,sql,flink 流批一體化,flink sql,flink kafka,flink 實(shí)時(shí)計(jì)算,flink connector
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3),# Flink專(zhuān)欄,flink,sql,flink 流批一體化,flink sql,flink kafka,flink 實(shí)時(shí)計(jì)算,flink connector
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3),# Flink專(zhuān)欄,flink,sql,flink 流批一體化,flink sql,flink kafka,flink 實(shí)時(shí)計(jì)算,flink connector
上圖中的格式請(qǐng)參考鏈接:35、Flink 的JSON Format

5、特性

1)、消息鍵(Key)與消息體(Value)的格式

Kafka 消息的消息鍵和消息體部分都可以使用某種 35、Flink 的JSON Format 來(lái)序列化或反序列化成二進(jìn)制數(shù)據(jù)。

1、消息體格式

由于 Kafka 消息中消息鍵是可選的,以下語(yǔ)句將使用消息體格式讀取和寫(xiě)入消息,但不使用消息鍵格式。 ‘format’ 選項(xiàng)與 ‘value.format’ 意義相同。 所有的格式配置使用格式識(shí)別符作為前綴。

CREATE TABLE KafkaTable (
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  ...

  'format' = 'json',
  'json.ignore-parse-errors' = 'true'
)

消息體格式將配置為以下的數(shù)據(jù)類(lèi)型:

ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
2、消息鍵和消息體格式

以下示例展示了如何配置和使用消息鍵和消息體格式。 格式配置使用 ‘key’ 或 ‘value’ 加上格式識(shí)別符作為前綴。

CREATE TABLE KafkaTable (
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  ...

  'key.format' = 'json',
  'key.json.ignore-parse-errors' = 'true',
  'key.fields' = 'user_id;item_id',

  'value.format' = 'json',
  'value.json.fail-on-missing-field' = 'false',
  'value.fields-include' = 'ALL'
)

消息鍵格式包含了在 ‘key.fields’ 中列出的字段(使用 ‘;’ 分隔)和字段順序。 因此將配置為以下的數(shù)據(jù)類(lèi)型:

ROW<`user_id` BIGINT, `item_id` BIGINT>

由于消息體格式配置為 ‘value.fields-include’ = ‘ALL’,所以消息鍵字段也會(huì)出現(xiàn)在消息體格式的數(shù)據(jù)類(lèi)型中:

ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
3、重名的格式字段

如果消息鍵字段和消息體字段重名,連接器無(wú)法根據(jù)表結(jié)構(gòu)信息將這些列區(qū)分開(kāi)。 ‘key.fields-prefix’ 配置項(xiàng)可以在表結(jié)構(gòu)中為消息鍵字段指定一個(gè)唯一名稱(chēng),并在配置消息鍵格式的時(shí)候保留原名。

以下示例展示了在消息鍵和消息體中同時(shí)包含 version 字段的情況:

CREATE TABLE KafkaTable (
  `k_version` INT,
  `k_user_id` BIGINT,
  `k_item_id` BIGINT,
  `version` INT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  ...

  'key.format' = 'json',
  'key.fields-prefix' = 'k_',
  'key.fields' = 'k_version;k_user_id;k_item_id',

  'value.format' = 'json',
  'value.fields-include' = 'EXCEPT_KEY'
)

消息體格式必須配置為 ‘EXCEPT_KEY’ 模式。格式將被配置為以下的數(shù)據(jù)類(lèi)型:

# 消息鍵格式:
ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT>

# 消息體格式:
ROW<`version` INT, `behavior` STRING>

2)、Topic 和 Partition 的探測(cè)

topic 和 topic-pattern 配置項(xiàng)決定了 source 消費(fèi)的 topic 或 topic 的匹配規(guī)則。topic 配置項(xiàng)可接受使用分號(hào)間隔的 topic 列表,例如 topic-1;topic-2。 topic-pattern 配置項(xiàng)使用正則表達(dá)式來(lái)探測(cè)匹配的 topic。例如 topic-pattern 設(shè)置為 test-topic-[0-9],則在作業(yè)啟動(dòng)時(shí),所有匹配該正則表達(dá)式的 topic(以 test-topic- 開(kāi)頭,以一位數(shù)字結(jié)尾)都將被 consumer 訂閱。

為允許 consumer 在作業(yè)啟動(dòng)之后探測(cè)到動(dòng)態(tài)創(chuàng)建的 topic,請(qǐng)將 scan.topic-partition-discovery.interval 配置為一個(gè)非負(fù)值。這將使 consumer 能夠探測(cè)匹配名稱(chēng)規(guī)則的 topic 中新的 partition。

請(qǐng)參閱 37、Flink 的Apache Kafka connector 以獲取更多關(guān)于 topic 和 partition 探測(cè)的信息。

topic 列表和 topic 匹配規(guī)則只適用于 source。對(duì)于 sink 端,F(xiàn)link 目前只支持單一 topic。

3)、起始消費(fèi)位點(diǎn)

scan.startup.mode 配置項(xiàng)決定了 Kafka consumer 的啟動(dòng)模式。有效值為:

  • group-offsets:從 Zookeeper/Kafka 中某個(gè)指定的消費(fèi)組已提交的偏移量開(kāi)始。
  • earliest-offset:從可能的最早偏移量開(kāi)始。
  • latest-offset:從最末尾偏移量開(kāi)始。
  • timestamp:從用戶為每個(gè) partition 指定的時(shí)間戳開(kāi)始。
  • specific-offsets:從用戶為每個(gè) partition 指定的偏移量開(kāi)始。

默認(rèn)值 group-offsets 表示從 Zookeeper/Kafka 中最近一次已提交的偏移量開(kāi)始消費(fèi)。

如果使用了 timestamp,必須使用另外一個(gè)配置項(xiàng) scan.startup.timestamp-millis 來(lái)指定一個(gè)從格林尼治標(biāo)準(zhǔn)時(shí)間 1970 年 1 月 1 日 00:00:00.000 開(kāi)始計(jì)算的毫秒單位時(shí)間戳作為起始時(shí)間。

如果使用了 specific-offsets,必須使用另外一個(gè)配置項(xiàng) scan.startup.specific-offsets 來(lái)為每個(gè) partition 指定起始偏移量, 例如,選項(xiàng)值 partition:0,offset:42;partition:1,offset:300 表示 partition 0 從偏移量 42 開(kāi)始,partition 1 從偏移量 300 開(kāi)始。

4)、Bounded Ending Position

配置選項(xiàng) scan.bounded.mode 指定 Kafka 使用者的有界模式。有效的枚舉為:

  • “group-offsets”:以特定消費(fèi)者組的 ZooKeeper/Kafka 代理中的已提交偏移量為界。這是在給定分區(qū)的使用開(kāi)始時(shí)評(píng)估的。
  • “l(fā)atest-offset”:以最新偏移量為界。這是在給定分區(qū)的使用開(kāi)始時(shí)評(píng)估的。
  • “timestamp”:以用戶提供的時(shí)間戳為界。
  • “specific-offsets”:以用戶為每個(gè)分區(qū)提供的特定偏移量為界。

如果未設(shè)置配置選項(xiàng)值 scan.bounded.mode,則默認(rèn)值為無(wú)界表。

如果指定了時(shí)間戳,則需要另一個(gè)配置選項(xiàng) scan.bounded.timestamp-millis 來(lái)指定自 1970 年 1 月 1 日 00:00:00.000 GMT 以來(lái)的特定有界時(shí)間戳(以毫秒為單位)。

如果指定了特定偏移量,則需要另一個(gè)配置選項(xiàng) scan.bounded.specific-offset 來(lái)為每個(gè)分區(qū)指定特定的有界偏移量,例如,選項(xiàng)值 partition:0,offset:42;partition:1,offset:300 表示分區(qū) 0 的偏移量為 42,分區(qū) 1 的偏移量為 300。如果未提供分區(qū)的偏移量,則不會(huì)從該分區(qū)消耗。

5)、CDC 變更日志(Changelog) Source

Flink 原生支持使用 Kafka 作為 CDC 變更日志(changelog) source。如果 Kafka topic 中的消息是通過(guò)變更數(shù)據(jù)捕獲(CDC)工具從其他數(shù)據(jù)庫(kù)捕獲的變更事件,則你可以使用 CDC 格式將消息解析為 Flink SQL 系統(tǒng)中的插入(INSERT)、更新(UPDATE)、刪除(DELETE)消息。

在許多情況下,變更日志(changelog) source 都是非常有用的功能,例如將數(shù)據(jù)庫(kù)中的增量數(shù)據(jù)同步到其他系統(tǒng),審核日志,數(shù)據(jù)庫(kù)的物化視圖,時(shí)態(tài)表關(guān)聯(lián)數(shù)據(jù)庫(kù)表的更改歷史等。

Flink 提供了幾種 CDC 格式:

  • 38、Flink 的CDC 格式:debezium
  • 39、Flink 的CDC 格式:canal
  • 40、Flink 的CDC 格式:maxwell

6)、Sink 分區(qū)

配置項(xiàng) sink.partitioner 指定了從 Flink 分區(qū)到 Kafka 分區(qū)的映射關(guān)系。 默認(rèn)情況下,F(xiàn)link 使用 Kafka 默認(rèn)分區(qū)器 來(lái)對(duì)消息分區(qū)。默認(rèn)分區(qū)器對(duì)沒(méi)有消息鍵的消息使用 粘性分區(qū)策略(sticky partition strategy) 進(jìn)行分區(qū),對(duì)含有消息鍵的消息使用 murmur2 哈希算法計(jì)算分區(qū)。

為了控制數(shù)據(jù)行到分區(qū)的路由,也可以提供一個(gè)自定義的 sink 分區(qū)器。‘fixed’ 分區(qū)器會(huì)將同一個(gè) Flink 分區(qū)中的消息寫(xiě)入同一個(gè) Kafka 分區(qū),從而減少網(wǎng)絡(luò)連接的開(kāi)銷(xiāo)。

7)、一致性保證

默認(rèn)情況下,如果查詢?cè)?啟用 checkpoint 模式下執(zhí)行時(shí),Kafka sink 按照至少一次(at-lease-once)語(yǔ)義保證將數(shù)據(jù)寫(xiě)入到 Kafka topic 中。

當(dāng) Flink checkpoint 啟用時(shí),kafka 連接器可以提供精確一次(exactly-once)的語(yǔ)義保證。

除了啟用 Flink checkpoint,還可以通過(guò)傳入對(duì)應(yīng)的 sink.semantic 選項(xiàng)來(lái)選擇三種不同的運(yùn)行模式:

  • none:Flink 不保證任何語(yǔ)義。已經(jīng)寫(xiě)出的記錄可能會(huì)丟失或重復(fù)。
  • at-least-once (默認(rèn)設(shè)置):保證沒(méi)有記錄會(huì)丟失(但可能會(huì)重復(fù))。
  • exactly-once:使用 Kafka 事務(wù)提供精確一次(exactly-once)語(yǔ)義。當(dāng)使用事務(wù)向 Kafka 寫(xiě)入數(shù)據(jù)時(shí),請(qǐng)將所有從 Kafka 中消費(fèi)記錄的應(yīng)用中的 isolation.level 配置項(xiàng)設(shè)置成實(shí)際所需的值(read_committed 或 read_uncommitted,后者為默認(rèn)值)。

請(qǐng)參閱 37、Flink 的Apache Kafka connector 以獲取更多關(guān)于語(yǔ)義保證的信息。

8)、Source 按分區(qū) Watermark

Flink 對(duì)于 Kafka 支持發(fā)送按分區(qū)的 watermark。Watermark 在 Kafka consumer 中生成。 按分區(qū) watermark 的合并方式和在流 shuffle 時(shí)合并 Watermark 的方式一致。 Source 輸出的 watermark 由讀取的分區(qū)中最小的 watermark 決定。 如果 topic 中的某些分區(qū)閑置,watermark 生成器將不會(huì)向前推進(jìn)。 你可以在表配置中設(shè)置 ‘table.exec.source.idle-timeout’ 選項(xiàng)來(lái)避免上述問(wèn)題。

請(qǐng)參閱 7、Flink四大基石之Time和WaterMaker詳解與詳細(xì)示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實(shí)現(xiàn)) 策略 以獲取更多細(xì)節(jié)。

9)、安全

要啟用加密和認(rèn)證相關(guān)的安全配置,只需將安全配置加上 “properties.” 前綴配置在 Kafka 表上即可。下面的代碼片段展示了如何配置 Kafka 表以使用 PLAIN 作為 SASL 機(jī)制并提供 JAAS 配置:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)

另一個(gè)更復(fù)雜的例子,使用 SASL_SSL 作為安全協(xié)議并使用 SCRAM-SHA-256 作為 SASL 機(jī)制:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /* SSL 配置 */
  /* 配置服務(wù)端提供的 truststore (CA 證書(shū)) 的路徑 */
  'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /* 如果要求客戶端認(rèn)證,則需要配置 keystore (私鑰) 的路徑 */
  'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /* SASL 配置 */
  /* 將 SASL 機(jī)制配置為 as SCRAM-SHA-256 */
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /* 配置 JAAS */
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
)

如果在作業(yè) JAR 中 Kafka 客戶端依賴的類(lèi)路徑被重置了(relocate class),登錄模塊(login module)的類(lèi)路徑可能會(huì)不同,因此請(qǐng)根據(jù)登錄模塊在 JAR 中實(shí)際的類(lèi)路徑來(lái)改寫(xiě)以上配置。例如在 SQL client JAR 中,Kafka client 依賴被重置在了 org.apache.flink.kafka.shaded.org.apache.kafka 路徑下, 因此 plain 登錄模塊的類(lèi)路徑應(yīng)寫(xiě)為 org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule。

關(guān)于安全配置的詳細(xì)描述,請(qǐng)參閱 Apache Kafka 文檔中的"安全"一節(jié)。

6、數(shù)據(jù)類(lèi)型映射

Kafka 將消息鍵值以二進(jìn)制進(jìn)行存儲(chǔ),因此 Kafka 并不存在 schema 或數(shù)據(jù)類(lèi)型。Kafka 消息使用格式配置進(jìn)行序列化和反序列化,例如 csv,json,avro。 因此,數(shù)據(jù)類(lèi)型映射取決于使用的格式。請(qǐng)參閱 格式 頁(yè)面以獲取更多細(xì)節(jié)。

以上,簡(jiǎn)單的介紹了關(guān)于通過(guò)flink sql與kafka的進(jìn)行交互的內(nèi)容。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-692231.html

到了這里,關(guān)于16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫(xiě)外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink sql】kafka連接器

    Kafka 連接器提供從 Kafka topic 中消費(fèi)和寫(xiě)入數(shù)據(jù)的能力。 前面已經(jīng)介紹了flink sql創(chuàng)建表的語(yǔ)法及說(shuō)明:【flink sql】創(chuàng)建表 這篇博客聊聊怎么通過(guò)flink sql連接kafka 以下的連接器元數(shù)據(jù)可以在表定義中通過(guò)元數(shù)據(jù)列的形式獲取。 R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫(xiě)的(

    2024年02月08日
    瀏覽(22)
  • Flink系列之:Elasticsearch SQL 連接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 連接器允許將數(shù)據(jù)寫(xiě)入到 Elasticsearch 引擎的索引中。本文檔描述運(yùn)行 SQL 查詢時(shí)如何設(shè)置 Elasticsearch 連接器。 連接器可以工作在 upsert 模式,使用 DDL 中定義的主鍵與外部系統(tǒng)交換 UPDATE/DELETE 消息。 如果 DDL 中沒(méi)有定義主鍵,那么

    2024年02月04日
    瀏覽(22)
  • Flink系列之:JDBC SQL 連接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 連接器允許使用 JDBC 驅(qū)動(dòng)向任意類(lèi)型的關(guān)系型數(shù)據(jù)庫(kù)讀取或者寫(xiě)入數(shù)據(jù)。本文檔描述了針對(duì)關(guān)系型數(shù)據(jù)庫(kù)如何通過(guò)建立 JDBC 連接器來(lái)執(zhí)行 SQL 查詢。 如果在 DDL 中定義了主鍵,JDBC sink 將以 upsert 模式與外

    2024年02月02日
    瀏覽(24)
  • Flink系列之:Apache Kafka SQL 連接器

    Scan Source: Unbounded Sink: Streaming Append Mode Kafka 連接器提供從 Kafka topic 中消費(fèi)和寫(xiě)入數(shù)據(jù)的能力。 以下示例展示了如何創(chuàng)建 Kafka 表: 以下的連接器元數(shù)據(jù)可以在表定義中通過(guò)元數(shù)據(jù)列的形式獲取。 R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫(xiě)的(W)。 只讀列必須聲明為 VI

    2024年02月01日
    瀏覽(29)
  • Flink系列之:Upsert Kafka SQL 連接器

    Scan Source: Unbounded 、 Sink: Streaming Upsert Mode Upsert Kafka 連接器支持以 upsert 方式從 Kafka topic 中讀取數(shù)據(jù)并將數(shù)據(jù)寫(xiě)入 Kafka topic。 作為 source,upsert-kafka 連接器生產(chǎn) changelog 流,其中每條數(shù)據(jù)記錄代表一個(gè)更新或刪除事件。更準(zhǔn)確地說(shuō),數(shù)據(jù)記錄中的 value 被解釋為同一 key 的最后一

    2024年01月16日
    瀏覽(26)
  • 【Flink實(shí)戰(zhàn)】Flink hint更靈活、更細(xì)粒度的設(shè)置Flink sql行為與簡(jiǎn)化hive連接器參數(shù)設(shè)置

    SQL 提示(SQL Hints)是和 SQL 語(yǔ)句一起使用來(lái)改變執(zhí)行計(jì)劃的。本章介紹如何使用 SQL 提示來(lái)實(shí)現(xiàn)各種干預(yù)。 SQL 提示一般可以用于以下: 增強(qiáng) planner:沒(méi)有完美的 planner, SQL 提示讓用戶更好地控制執(zhí)行; 增加元數(shù)據(jù)(或者統(tǒng)計(jì)信息):如\\\"已掃描的表索引\\\"和\\\"一些混洗鍵(shu

    2024年04月25日
    瀏覽(25)
  • Flink 讀寫(xiě)MySQL數(shù)據(jù)(DataStream和Table API)

    Flink 讀寫(xiě)MySQL數(shù)據(jù)(DataStream和Table API)

    Flink提供了基于JDBC的方式,可以將讀取到的數(shù)據(jù)寫(xiě)入到MySQL中;本文通過(guò)兩種方式將數(shù)據(jù)下入到MySQL數(shù)據(jù)庫(kù),其他的基于JDBC的數(shù)據(jù)庫(kù)類(lèi)似,另外,Table API方式的Catalog指定為Hive Catalog方式,持久化DDL操作。 另外,JDBC 連接器允許使用 JDBC 驅(qū)動(dòng)程序從任何關(guān)系數(shù)據(jù)庫(kù)讀取數(shù)據(jù)并將

    2023年04月09日
    瀏覽(32)
  • flink-sql讀寫(xiě)hive-1.16

    本文檔內(nèi)容基于 flink-1.16.x ,其他版本的整理,請(qǐng)查看本人博客的 flink 專(zhuān)欄其他文章。 Apache Hive 已經(jīng)成為了數(shù)據(jù)倉(cāng)庫(kù)生態(tài)系統(tǒng)中的核心。它不僅僅是一個(gè)用于大數(shù)據(jù)分析和ETL場(chǎng)景的SQL引擎,同樣也是一個(gè)數(shù)據(jù)管理平臺(tái),可用于發(fā)現(xiàn),定義,和演化數(shù)據(jù)。 Flink 與 Hive 的集成包

    2024年02月16日
    瀏覽(33)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通過(guò)Table API和SQL創(chuàng)建表

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(24)
  • Flink Table API 與 SQL 編程整理

    Flink Table API 與 SQL 編程整理

    Flink API 總共分為 4 層這里主要整理 Table API 的使用 Table API 是流處理和批處理通用的關(guān)系型 API , Table API 可以基于流輸入或者批輸入來(lái)運(yùn)行而不需要進(jìn)行任何修改。 Table API 是 SQL 語(yǔ)言的超集并專(zhuān)門(mén)為 Apache Flink 設(shè)計(jì)的, Table API 是 Scala 和 Java 語(yǔ)言集成式的 API 。與常規(guī) SQL 語(yǔ)言

    2024年02月04日
    瀏覽(25)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包