一 前言
在某些場景中,比方GROUP BY聚合之后的后果,須要去更新之前的結(jié)果值。這個(gè)時(shí)候,須要將 Kafka 記錄的 key 當(dāng)成主鍵解決,用來確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來解決。在 Flink1.11 中,能夠通過 flink-cdc-connectors 項(xiàng)目提供的 changelog-json format 來實(shí)現(xiàn)該性能。
在 Flink1.12 版本中, 新增了一個(gè) upsert connector(upsert-kafka),該 connector 擴(kuò)大自現(xiàn)有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既能夠作為 source 應(yīng)用,也能夠作為 sink 應(yīng)用,并且提供了與現(xiàn)有的 kafka connector 雷同的基本功能和持久性保障,因?yàn)閮烧咧g復(fù)用了大部分代碼。
二 upsert kafka connector
Upsert Kafka Connector容許用戶以upsert的形式從Kafka主題讀取數(shù)據(jù)或?qū)?shù)據(jù)寫入Kafka主題。
作為 source,upsert-kafka 連接器生產(chǎn) changelog 流,其中每條數(shù)據(jù)記錄代表一個(gè)更新或刪除事件。更準(zhǔn)確地說,數(shù)據(jù)記錄中的 value 被解釋為同一 key 的最后一個(gè) value 的 UPDATE,如果有這個(gè) key(如果不存在相應(yīng)的 key,則該更新被視為 INSERT)。用表來類比,changelog 流中的數(shù)據(jù)記錄被解釋為 UPSERT,也稱為 INSERT/UPDATE,因?yàn)槿魏尉哂邢嗤?key 的現(xiàn)有行都被覆蓋。另外,value 為空的消息將會被視作為 DELETE 消息。
作為 sink,upsert-kafka 連接器可以消費(fèi) changelog 流。它會將 INSERT/UPDATE_AFTER 數(shù)據(jù)作為正常的 Kafka 消息寫入,并將 DELETE 數(shù)據(jù)以 value 為空的 Kafka 消息寫入(表示對應(yīng) key 的消息被刪除)。Flink 將根據(jù)主鍵列的值對數(shù)據(jù)進(jìn)行分區(qū),從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區(qū)中。
其中每條數(shù)據(jù)記錄代表一個(gè)更新或刪除事件,原理如下:
- Kafka Topic中存在相應(yīng)的Key,則以UPDATE操作將Key的值更新為數(shù)據(jù)記錄中的Value。
- Kafka Topic中不存在相應(yīng)的Key,則以INSERT操作將Key的值寫入Kafka Topic。
- Key對應(yīng)的Value為空,會被視作DELETE操作。
三 案例
3.1 kafka 處理后寫入kafka
3.1.1 創(chuàng)建kafka topic
$ kafka-topics --create --topic user-behavior --partitions 3 --replication-factor 2 --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-topics --create --topic after-user-behavior --partitions 3 --replication-factor 2 --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-console-producer --topic user-behavior --broker-list cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-console-consumer --topic user-behavior --from-beginning --group test-user --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-console-consumer --topic after-user-behavior --from-beginning --group test --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
3.2 Flink SQL
3.2.1 source
%flink.ssql
drop table if exists user_behavior;
CREATE TABLE user_behavior (
id BIGINT,
name STRING,
flag STRING
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user-behavior', -- kafka topic
'properties.group.id'='cdc', -- 消費(fèi)者組
'scan.startup.mode' = 'latest-offset', -- 從起始 offset 開始讀取
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.bootstrap.servers' = 'cdh68:9092,cdh69:9092,cdh70:9092', -- kafka broker 地址
'format' = 'json' -- 數(shù)據(jù)源格式為 json
);
3.2.2 sink
%flink.ssql
drop table if exists after_user_behavior;
CREATE TABLE after_user_behavior (
name STRING,
pv BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'after-user-behavior',
'properties.bootstrap.servers' = 'cdh68:9092,cdh69:9092,cdh70:9092',
'value.json.fail-on-missing-field' = 'false',
'key.json.ignore-parse-errors' = 'true',
'key.format' = 'json',
'value.format' = 'json'
);
一定要設(shè)置主鍵 Primar要使用 upsert-kafka connector,DDL語句中,一定要設(shè)置 PRIMARY KEY 主鍵,并為鍵(key.format)和值(value.format)指定序列化反序列化格式。
當(dāng)數(shù)據(jù)源端進(jìn)行了增刪改,對應(yīng)的 pv 結(jié)果就會同步更新,這就是 upsert kafka 的魅力。
這是基于kafka的統(tǒng)計(jì)計(jì)算,前提條件是 topic user-behavior中的數(shù)據(jù)是 changelog 格式的。
3.2.3 transform
%flink.ssql
INSERT INTO after_user_behavior
SELECT
name,
COUNT(*)
FROM user_behavior
GROUP BY name;
注意:after_user_behavior 必須為 upsert-kafka connector
如果after_user_behavior為 kafka connector,執(zhí)行此語句則會報(bào)如下錯(cuò)誤:
java.io.IOException: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.after_user_behavior' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS EXPR$1])
因?yàn)檎Z句SELECT name, COUNT(*) FROM user_behavior GROUP BY name;
通過group by后數(shù)據(jù)是不斷更新變化的,因此只能用 upsert-kafka connector。
3.3 輸出結(jié)果
3.3.1 kafka user-behavior producer
[song@cdh68 ~]$ kafka-console-producer --topic user-behavior --broker-list cdh68:9092,cdh69:9092,cdh70:9092
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"info","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"info","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"UPDATE","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"DELETE","table":"user","ts":6852139698555588608}
topic user-behavior中的數(shù)據(jù)是 changelog 格式的。
3.3.2 kafka user-behavior consumer
[song@cdh70 ~]$ kafka-console-consumer --topic user-behavior --group test-user --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"info","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"info","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"UPDATE","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"DELETE","table":"user","ts":6852139698555588608}
3.3.3 kafka after-user-behavior consumer
[song@cdh69 ~]$ kafka-console-consumer --topic after-user-behavior --group test --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
{"name":"Mars","pv":1}
{"name":"Lucy","pv":1}
{"name":"Mars","pv":2}
{"name":"Lucy","pv":2}
{"name":"Mars","pv":3}
{"name":"Mars","pv":4}
{"name":"Mars","pv":5}
{"name":"Mars","pv":6}
3.3.4 FlinkSQL user_behavior
從此結(jié)果可以看出 kafka 和 upsert-kafka 的區(qū)別:
kafka 的結(jié)果則顯示所有數(shù)據(jù),upsert-kafka則顯示更新后的最新數(shù)據(jù)。
3.3.5 FlinkSQL alfter_user_behavior
此結(jié)果是動態(tài)變化的,變化與kafka after-user-behavior consumer相同。
可見,upsert-kafka 表存儲了所有變化的數(shù)據(jù),但是讀取時(shí),只讀取最新的數(shù)據(jù)。
3.2 flink-pageviews-demo
https://github.com/fsk119/flink-pageviews-demo
3.2.1 測試數(shù)據(jù)準(zhǔn)備
在 Mysql 中執(zhí)行以下命令:
CREATE DATABASE flink;
USE flink;
CREATE TABLE users (
user_id BIGINT,
user_name VARCHAR(1000),
region VARCHAR(1000)
);
INSERT INTO users VALUES
(1, 'Timo', 'Berlin'),
(2, 'Tom', 'Beijing'),
(3, 'Apple', 'Beijing');
現(xiàn)在,我們利用Sql client在Flink中創(chuàng)建相應(yīng)的表。
CREATE TABLE users (
user_id BIGINT,
user_name STRING,
region STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'database-name' = 'flink',
'table-name' = 'users',
'username' = 'root',
'password' = '123456'
);
CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
view_time TIMESTAMP(3),
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
并利用Flink 往 Kafka中灌入相應(yīng)的數(shù)據(jù)
INSERT INTO pageviews VALUES
(1, 101, TO_TIMESTAMP('2020-11-23 15:00:00')),
(2, 104, TO_TIMESTAMP('2020-11-23 15:00:01.00'));
3.2.2 將 left join 結(jié)果寫入 kafka
我們首先測試是否能將Left join的結(jié)果灌入到 Kafka 之中。
首先,我們在 Sql client 中創(chuàng)建相應(yīng)的表
CREATE TABLE enriched_pageviews (
user_id BIGINT,
user_region STRING,
page_id BIGINT,
view_time TIMESTAMP(3),
WATERMARK FOR view_time as view_time - INTERVAL '5' SECOND,
PRIMARY KEY (user_id, page_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'enriched_pageviews',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);
并利用以下語句將left join的結(jié)果插入到kafka對應(yīng)的topic之中。
INSERT INTO enriched_pageviews
SELECT pageviews.user_id, region, pageviews.page_id, pageviews.view_time
FROM pageviews
LEFT JOIN users ON pageviews.user_id = users.user_id;
利用以下命令,我們可以打印topic內(nèi)的數(shù)據(jù)kafka-console-consumer.sh --bootstrap-server kafka:9094 --topic "enriched_pageviews" --from-beginning --property print.key=true
#預(yù)期結(jié)果
{"user_id":1,"page_id":101} {"user_id":1,"user_region":null,"page_id":101,"view_time":"2020-11-23 15:00:00"}
{"user_id":2,"page_id":104} {"user_id":2,"user_region":null,"page_id":104,"view_time":"2020-11-23 15:00:01"}
{"user_id":1,"page_id":101} null
{"user_id":1,"page_id":101} {"user_id":1,"user_region":"Berlin","page_id":101,"view_time":"2020-11-23 15:00:00"}
{"user_id":2,"page_id":104} null
{"user_id":2,"page_id":104} {"user_id":2,"user_region":"Beijing","page_id":104,"view_time":"2020-11-23 15:00:01"}
Left join中,右流發(fā)現(xiàn)左流沒有join上但已經(jīng)發(fā)射了,此時(shí)會發(fā)送DELETE
消息,而非UPDATE-BEFORE
消息清理之前發(fā)送的消息。詳見org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator#processElement
我們可以進(jìn)一步在mysql中刪除或者修改一些數(shù)據(jù),來觀察進(jìn)一步的變化。
UPDATE users SET region = 'Beijing' WHERE user_id = 1;
DELETE FROM users WHERE user_id = 1;
3.2.3 將聚合結(jié)果寫入kafka
我們進(jìn)一步測試將聚合的結(jié)果寫入到 Kafka 之中。
在Sql client 中構(gòu)建以下表
CREATE TABLE pageviews_per_region (
user_region STRING,
cnt BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
)
我們再用以下命令將數(shù)據(jù)插入到upsert-kafka之中。
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*)
FROM enriched_pageviews
WHERE user_region is not null
GROUP BY user_region;
我們可以通過以下命令查看 Kafka 中對應(yīng)的數(shù)據(jù)文章來源:http://www.zghlxwxcb.cn/news/detail-828927.html
./kafka-console-consumer.sh --bootstrap-server kafka:9094 --topic "pageviews_per_region" --from-beginning --property print.key=true
# 預(yù)期結(jié)果
{"user_region":"Berlin"} {"user_region":"Berlin","cnt":1}
{"user_region":"Beijing"} {"user_region":"Beijing","cnt":1}
{"user_region":"Berlin"} null
{"user_region":"Beijing"} {"user_region":"Beijing","cnt":2}
{"user_region":"Beijing"} {"user_region":"Beijing","cnt":1}
文章來源地址http://www.zghlxwxcb.cn/news/detail-828927.html
到了這里,關(guān)于Flink Upsert Kafka SQL Connector 介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!