国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink Upsert Kafka SQL Connector 介紹

這篇具有很好參考價(jià)值的文章主要介紹了Flink Upsert Kafka SQL Connector 介紹。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一 前言

在某些場景中,比方GROUP BY聚合之后的后果,須要去更新之前的結(jié)果值。這個(gè)時(shí)候,須要將 Kafka 記錄的 key 當(dāng)成主鍵解決,用來確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來解決。在 Flink1.11 中,能夠通過 flink-cdc-connectors 項(xiàng)目提供的 changelog-json format 來實(shí)現(xiàn)該性能。

在 Flink1.12 版本中, 新增了一個(gè) upsert connector(upsert-kafka),該 connector 擴(kuò)大自現(xiàn)有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既能夠作為 source 應(yīng)用,也能夠作為 sink 應(yīng)用,并且提供了與現(xiàn)有的 kafka connector 雷同的基本功能和持久性保障,因?yàn)閮烧咧g復(fù)用了大部分代碼。

二 upsert kafka connector

Upsert Kafka Connector容許用戶以upsert的形式從Kafka主題讀取數(shù)據(jù)或?qū)?shù)據(jù)寫入Kafka主題。

作為 source,upsert-kafka 連接器生產(chǎn) changelog 流,其中每條數(shù)據(jù)記錄代表一個(gè)更新或刪除事件。更準(zhǔn)確地說,數(shù)據(jù)記錄中的 value 被解釋為同一 key 的最后一個(gè) value 的 UPDATE,如果有這個(gè) key(如果不存在相應(yīng)的 key,則該更新被視為 INSERT)。用表來類比,changelog 流中的數(shù)據(jù)記錄被解釋為 UPSERT,也稱為 INSERT/UPDATE,因?yàn)槿魏尉哂邢嗤?key 的現(xiàn)有行都被覆蓋。另外,value 為空的消息將會被視作為 DELETE 消息。

作為 sink,upsert-kafka 連接器可以消費(fèi) changelog 流。它會將 INSERT/UPDATE_AFTER 數(shù)據(jù)作為正常的 Kafka 消息寫入,并將 DELETE 數(shù)據(jù)以 value 為空的 Kafka 消息寫入(表示對應(yīng) key 的消息被刪除)。Flink 將根據(jù)主鍵列的值對數(shù)據(jù)進(jìn)行分區(qū),從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區(qū)中。

其中每條數(shù)據(jù)記錄代表一個(gè)更新或刪除事件,原理如下:

  • Kafka Topic中存在相應(yīng)的Key,則以UPDATE操作將Key的值更新為數(shù)據(jù)記錄中的Value。
  • Kafka Topic中不存在相應(yīng)的Key,則以INSERT操作將Key的值寫入Kafka Topic。
  • Key對應(yīng)的Value為空,會被視作DELETE操作。

三 案例

3.1 kafka 處理后寫入kafka

3.1.1 創(chuàng)建kafka topic
$ kafka-topics --create --topic user-behavior --partitions 3 --replication-factor 2 --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-topics --create --topic after-user-behavior --partitions 3 --replication-factor 2 --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-console-producer --topic user-behavior --broker-list cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-console-consumer --topic user-behavior --from-beginning --group test-user --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092      
$ kafka-console-consumer --topic after-user-behavior --from-beginning --group test --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
3.2 Flink SQL
3.2.1 source
%flink.ssql
drop table if exists user_behavior;
CREATE TABLE user_behavior (
    id BIGINT,
    name STRING,
    flag STRING
) WITH (
    'connector' = 'kafka',  -- 使用 kafka connector
    'topic' = 'user-behavior',  -- kafka topic
    'properties.group.id'='cdc', -- 消費(fèi)者組
    'scan.startup.mode' = 'latest-offset',  -- 從起始 offset 開始讀取
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'properties.bootstrap.servers' = 'cdh68:9092,cdh69:9092,cdh70:9092',  -- kafka broker 地址
    'format' = 'json'  -- 數(shù)據(jù)源格式為 json
);
3.2.2 sink
%flink.ssql
drop table if exists after_user_behavior;
CREATE TABLE after_user_behavior (
  name STRING,
  pv BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'after-user-behavior',
  'properties.bootstrap.servers' = 'cdh68:9092,cdh69:9092,cdh70:9092',
  'value.json.fail-on-missing-field' = 'false',
  'key.json.ignore-parse-errors' = 'true',
  'key.format' = 'json',
  'value.format' = 'json'
);

一定要設(shè)置主鍵 Primar要使用 upsert-kafka connector,DDL語句中,一定要設(shè)置 PRIMARY KEY 主鍵,并為鍵(key.format)和值(value.format)指定序列化反序列化格式。

當(dāng)數(shù)據(jù)源端進(jìn)行了增刪改,對應(yīng)的 pv 結(jié)果就會同步更新,這就是 upsert kafka 的魅力。

這是基于kafka的統(tǒng)計(jì)計(jì)算,前提條件是 topic user-behavior中的數(shù)據(jù)是 changelog 格式的。

3.2.3 transform
%flink.ssql
INSERT INTO after_user_behavior
SELECT
  name,
  COUNT(*)
FROM user_behavior 
GROUP BY name;

注意:after_user_behavior 必須為 upsert-kafka connector

如果after_user_behavior為 kafka connector,執(zhí)行此語句則會報(bào)如下錯(cuò)誤:

java.io.IOException: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.after_user_behavior' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS EXPR$1])

因?yàn)檎Z句SELECT name, COUNT(*) FROM user_behavior GROUP BY name; 通過group by后數(shù)據(jù)是不斷更新變化的,因此只能用 upsert-kafka connector。

3.3 輸出結(jié)果
3.3.1 kafka user-behavior producer
[song@cdh68 ~]$ kafka-console-producer --topic user-behavior --broker-list cdh68:9092,cdh69:9092,cdh70:9092
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"info","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"info","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"UPDATE","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"DELETE","table":"user","ts":6852139698555588608}

topic user-behavior中的數(shù)據(jù)是 changelog 格式的。

3.3.2 kafka user-behavior consumer
[song@cdh70 ~]$ kafka-console-consumer --topic user-behavior --group test-user --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"info","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"info","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"UPDATE","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"DELETE","table":"user","ts":6852139698555588608}
3.3.3 kafka after-user-behavior consumer
[song@cdh69 ~]$ kafka-console-consumer --topic after-user-behavior --group test --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
{"name":"Mars","pv":1}
{"name":"Lucy","pv":1}
{"name":"Mars","pv":2}
{"name":"Lucy","pv":2}
{"name":"Mars","pv":3}
{"name":"Mars","pv":4}
{"name":"Mars","pv":5}
{"name":"Mars","pv":6}
3.3.4 FlinkSQL user_behavior

Flink Upsert Kafka SQL Connector 介紹,大數(shù)據(jù)從入門到精通,flink,kafka

從此結(jié)果可以看出 kafka 和 upsert-kafka 的區(qū)別:

kafka 的結(jié)果則顯示所有數(shù)據(jù),upsert-kafka則顯示更新后的最新數(shù)據(jù)。

3.3.5 FlinkSQL alfter_user_behavior

Flink Upsert Kafka SQL Connector 介紹,大數(shù)據(jù)從入門到精通,flink,kafka

此結(jié)果是動態(tài)變化的,變化與kafka after-user-behavior consumer相同。

可見,upsert-kafka 表存儲了所有變化的數(shù)據(jù),但是讀取時(shí),只讀取最新的數(shù)據(jù)。

3.2 flink-pageviews-demo

https://github.com/fsk119/flink-pageviews-demo

3.2.1 測試數(shù)據(jù)準(zhǔn)備

在 Mysql 中執(zhí)行以下命令:

CREATE DATABASE flink;
USE flink;

CREATE TABLE users (
  user_id BIGINT,
  user_name VARCHAR(1000),
  region VARCHAR(1000)
);

INSERT INTO users VALUES 
(1, 'Timo', 'Berlin'),
(2, 'Tom', 'Beijing'),
(3, 'Apple', 'Beijing');

現(xiàn)在,我們利用Sql client在Flink中創(chuàng)建相應(yīng)的表。

CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  region STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'database-name' = 'flink',
  'table-name' = 'users',
  'username' = 'root',
  'password' = '123456'
);

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  view_time TIMESTAMP(3),
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = 'localhost:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

并利用Flink 往 Kafka中灌入相應(yīng)的數(shù)據(jù)

INSERT INTO pageviews VALUES
  (1, 101, TO_TIMESTAMP('2020-11-23 15:00:00')),
  (2, 104, TO_TIMESTAMP('2020-11-23 15:00:01.00'));
3.2.2 將 left join 結(jié)果寫入 kafka

我們首先測試是否能將Left join的結(jié)果灌入到 Kafka 之中。

首先,我們在 Sql client 中創(chuàng)建相應(yīng)的表

CREATE TABLE enriched_pageviews (
  user_id BIGINT,
  user_region STRING,
  page_id BIGINT,
  view_time TIMESTAMP(3),
  WATERMARK FOR view_time as view_time - INTERVAL '5' SECOND,
  PRIMARY KEY (user_id, page_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'enriched_pageviews',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

并利用以下語句將left join的結(jié)果插入到kafka對應(yīng)的topic之中。

INSERT INTO enriched_pageviews
SELECT pageviews.user_id, region, pageviews.page_id, pageviews.view_time
FROM pageviews
LEFT JOIN users ON pageviews.user_id = users.user_id;

利用以下命令,我們可以打印topic內(nèi)的數(shù)據(jù)kafka-console-consumer.sh --bootstrap-server kafka:9094 --topic "enriched_pageviews" --from-beginning --property print.key=true

#預(yù)期結(jié)果
{"user_id":1,"page_id":101}	{"user_id":1,"user_region":null,"page_id":101,"view_time":"2020-11-23 15:00:00"}
{"user_id":2,"page_id":104}	{"user_id":2,"user_region":null,"page_id":104,"view_time":"2020-11-23 15:00:01"}
{"user_id":1,"page_id":101}	null
{"user_id":1,"page_id":101}	{"user_id":1,"user_region":"Berlin","page_id":101,"view_time":"2020-11-23 15:00:00"}
{"user_id":2,"page_id":104}	null
{"user_id":2,"page_id":104}	{"user_id":2,"user_region":"Beijing","page_id":104,"view_time":"2020-11-23 15:00:01"}

Left join中,右流發(fā)現(xiàn)左流沒有join上但已經(jīng)發(fā)射了,此時(shí)會發(fā)送DELETE消息,而非UPDATE-BEFORE消息清理之前發(fā)送的消息。詳見org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator#processElement

我們可以進(jìn)一步在mysql中刪除或者修改一些數(shù)據(jù),來觀察進(jìn)一步的變化。

UPDATE users SET region = 'Beijing' WHERE user_id = 1;

DELETE FROM users WHERE user_id = 1;
3.2.3 將聚合結(jié)果寫入kafka

我們進(jìn)一步測試將聚合的結(jié)果寫入到 Kafka 之中。

在Sql client 中構(gòu)建以下表

CREATE TABLE pageviews_per_region (
  user_region STRING,
  cnt BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'json',
  'value.format' = 'json'
)

我們再用以下命令將數(shù)據(jù)插入到upsert-kafka之中。

INSERT INTO pageviews_per_region
SELECT
  user_region,
  COUNT(*)
FROM enriched_pageviews
WHERE user_region is not null
GROUP BY user_region;

我們可以通過以下命令查看 Kafka 中對應(yīng)的數(shù)據(jù)

./kafka-console-consumer.sh --bootstrap-server kafka:9094 --topic "pageviews_per_region" --from-beginning --property print.key=true

# 預(yù)期結(jié)果
{"user_region":"Berlin"}	{"user_region":"Berlin","cnt":1}
{"user_region":"Beijing"}	{"user_region":"Beijing","cnt":1}
{"user_region":"Berlin"}	null
{"user_region":"Beijing"}	{"user_region":"Beijing","cnt":2}
{"user_region":"Beijing"}	{"user_region":"Beijing","cnt":1}

Flink Upsert Kafka SQL Connector 介紹,大數(shù)據(jù)從入門到精通,flink,kafka文章來源地址http://www.zghlxwxcb.cn/news/detail-828927.html

到了這里,關(guān)于Flink Upsert Kafka SQL Connector 介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink-sql實(shí)戰(zhàn)】flink 主鍵聲明與upsert功能實(shí)戰(zhàn)

    主鍵用作 Flink 優(yōu)化的一種提示信息。主鍵限制表明一張表或視圖的 某個(gè)(些)列 是唯一的 并且不包含 Null 值 。 主鍵聲明的列都是非 nullable 的。因此主鍵可以被用作表行級別的唯一標(biāo)識。 主鍵可以和列的定義一起聲明,也可以獨(dú)立聲明為表的限制屬性,不管是哪種方式,

    2024年02月03日
    瀏覽(26)
  • Flink 實(shí)時(shí)數(shù)倉關(guān)鍵技術(shù)解讀:Upsert Kafka 和 動態(tài)表(Dynamic Table)

    Flink 實(shí)時(shí)數(shù)倉關(guān)鍵技術(shù)解讀:Upsert Kafka 和 動態(tài)表(Dynamic Table)

    博主歷時(shí)三年精心創(chuàng)作的《大數(shù)據(jù)平臺架構(gòu)與原型實(shí)現(xiàn):數(shù)據(jù)中臺建設(shè)實(shí)戰(zhàn)》一書現(xiàn)已由知名IT圖書品牌電子工業(yè)出版社博文視點(diǎn)出版發(fā)行,點(diǎn)擊《重磅推薦:建大數(shù)據(jù)平臺太難了!給我發(fā)個(gè)工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側(cè)

    2024年02月22日
    瀏覽(34)
  • Flink Kafka[輸入/輸出] Connector

    Flink Kafka[輸入/輸出] Connector

    本章重點(diǎn)介紹生產(chǎn)環(huán)境中最常用到的 Flink kafka connector 。使用 Flink 的同學(xué),一定會很熟悉 kafka ,它是一個(gè)分布式的、分區(qū)的、多副本的、 支持高吞吐的、發(fā)布訂閱消息系統(tǒng)。生產(chǎn)環(huán)境環(huán)境中也經(jīng)常會跟 kafka 進(jìn)行一些數(shù)據(jù)的交換,比如利用 kafka consumer 讀取數(shù)據(jù),然后進(jìn)行一系

    2024年02月04日
    瀏覽(19)
  • Iceberg從入門到精通系列之七:Flink SQL創(chuàng)建Catalog

    type:必須是iceberg catalog-type:內(nèi)置了hive和hadoop兩種catalog,也可以使用catalog-impl來自定義catalog。 catalog-impl:自定義catalog實(shí)現(xiàn)的全限定類名。如果未設(shè)置catalog-type,則必須設(shè)置。 property-version:描述屬性版本的版本號。此屬性可用于向后兼容,以防屬性格式更改。當(dāng)前屬性版本

    2024年02月11日
    瀏覽(24)
  • Iceberg從入門到精通系列之八:flink sql 創(chuàng)建Iceberg表

    建表命令支持最常用的flink建表語法,包括: PARTITION BY(column1,column2,…):配置分區(qū),apache flink不支持隱藏分區(qū)。 COMMENT ‘table document’:指定表的備注 WITH(‘key’=‘value’,…):設(shè)置表屬性

    2024年02月11日
    瀏覽(26)
  • (五)kafka從入門到精通之topic介紹

    (五)kafka從入門到精通之topic介紹

    Kafka是一個(gè)流行的分布式消息系統(tǒng),它的核心是一個(gè)由多個(gè)節(jié)點(diǎn)組成的分布式集群。在Kafka中,數(shù)據(jù)被分割成多個(gè)小塊,并通過一些復(fù)雜的算法在節(jié)點(diǎn)之間傳遞。這些小塊被稱為Kafka Topic。 一個(gè)Topic是一組具有相同主題的消息??梢詫opic看作是一個(gè)數(shù)據(jù)倉庫,在這個(gè)倉庫中存

    2024年02月12日
    瀏覽(16)
  • kerberos認(rèn)證Flink的kafka connector和kafka client配置

    kerberos認(rèn)證Flink的kafka connector和kafka client配置

    1. kafka配置文件 kafka jaas必須配置,如果缺少,則報(bào)一下錯(cuò)誤。 對于Flink只能通過配置 java.security.auth.login.config 的方式。 jaas配置 1.1 方式一: System.setProperty配置系統(tǒng)變量: kafka_client_jaas_keytab.conf文件內(nèi)容如下: 1.2 方法二:在IDEA中添加jvm參數(shù): 注意:將參數(shù)添加至kafka 的pr

    2024年02月04日
    瀏覽(24)
  • Flink SQL Hive Connector使用場景

    目錄 1.介紹 2.使用 2.1注冊HiveCatalog 2.2Hive Read 2.2.1流讀關(guān)鍵配置 2.2.2示例

    2024年02月06日
    瀏覽(24)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    ? ? ? ?今天一天爭取搞完最后這一部分,學(xué)完趕緊把 Kafka 和 Flume 學(xué)完,就要開始做實(shí)時(shí)數(shù)倉了。據(jù)說是應(yīng)屆生得把實(shí)時(shí)數(shù)倉搞個(gè) 80%~90% 才能差不多找個(gè)工作,太牛馬了。 ????????之前我們已經(jīng)用過了一些簡單的內(nèi)置連接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官網(wǎng):

    2024年01月24日
    瀏覽(51)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包