国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用flink的sql-client.sh,測(cè)試mysql-->kafka-->kafka-->mysql實(shí)時(shí)流

這篇具有很好參考價(jià)值的文章主要介紹了使用flink的sql-client.sh,測(cè)試mysql-->kafka-->kafka-->mysql實(shí)時(shí)流。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

目錄

1. 環(huán)境介紹

2. mysql建表

3. flinksql建表

3.1 進(jìn)入flinksql客戶(hù)端?

?3.2 配置輸出格式

?3.3 flink建表

3.4 任務(wù)流配置

4. 測(cè)試

4.1 插入測(cè)試數(shù)據(jù)

4.2 查看結(jié)果表數(shù)據(jù)?

4.3 新增測(cè)試數(shù)據(jù)

4.4 再次查看結(jié)果表數(shù)據(jù)


1. 環(huán)境介紹

服務(wù) 版本
zookeeper 3.8.0
kafka 3.3.1
flink 1.13.5
mysql 5.7.34
jdk 1.8
scala 2.12

連接器 作用
flink-sql-connector-upsert-kafka_2.11-1.13.6.jar 連接kafka,支持主鍵更新
flink-connector-mysql-cdc-2.0.2.jar 讀mysql
flink-connector-jdbc_2.11-1.13.6.jar 寫(xiě)mysql
mysql-connector-java-5.1.37.jar 連接mysql

2. mysql中建表

CREATE TABLE src_mysql_order(
 order_id BIGINT,
 store_id BIGINT,
 sales_amt double,
 PRIMARY KEY (`order_id`)
);

CREATE TABLE src_mysql_order_detail(
 order_id BIGINT,
 store_id BIGINT,
 goods_id BIGINT,
 sales_amt double,
 PRIMARY KEY (order_id,store_id,goods_id)
);

CREATE TABLE dim_store(
 store_id BIGINT,
 store_name varchar(100),
 PRIMARY KEY (`store_id`) 
);

CREATE TABLE dim_goods(
 goods_id BIGINT,
 goods_name varchar(100),
 PRIMARY KEY (`goods_id`)
);

CREATE TABLE dwa_mysql_order_analysis (
	store_id BIGINT,
	store_name varchar(100),
	sales_goods_distinct_nums bigint,
	sales_amt double,
	order_nums bigint,
	PRIMARY KEY (store_id,store_name)
);

3. flinksql建表

3.1 進(jìn)入flinksql客戶(hù)端?

sql-client.sh embedded

flink 連接mysql,flinksql,mysql,flink,kafka?3.2 配置輸出格式

SET sql-client.execution.result-mode=tableau;

flink 連接mysql,flinksql,mysql,flink,kafka?
3.3 flink建表

--mysql中的 訂單主表
CREATE TABLE src_mysql_order(
 order_id BIGINT,
 store_id BIGINT,
 sales_amt double,
 PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'hadoop002',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'src_mysql_order',
 'scan.incremental.snapshot.enabled' = 'false'
);

--mysql中的 訂單明細(xì)表
CREATE TABLE src_mysql_order_detail(
 order_id BIGINT,
 store_id BIGINT,
 goods_id BIGINT,
 sales_amt double,
 PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'hadoop002',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'src_mysql_order_detail',
 'scan.incremental.snapshot.enabled' = 'false'
);

--mysql中的 商店維表
CREATE TABLE dim_store(
 store_id BIGINT,
 store_name varchar(100),
 PRIMARY KEY (`store_id`) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'hadoop002',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'dim_store',
 'scan.incremental.snapshot.enabled' = 'false'
);

--mysql中的 商品維表
CREATE TABLE dim_goods(
 goods_id BIGINT,
 goods_name varchar(100),
 PRIMARY KEY (`goods_id`) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'hadoop002',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'dim_goods',
 'scan.incremental.snapshot.enabled' = 'false'
);

--kafka中的 ods層 訂單表
CREATE TABLE ods_kafka_order (
 order_id BIGINT,
 store_id BIGINT,
 sales_amt double,
 PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'ods_kafka_order',
 'properties.bootstrap.servers' = 'hadoop001:9092',
 'properties.group.id' = 'ods_group1',
  'key.format' = 'json',
 'value.format' = 'json'
);

----kafka中的 ods層 訂單明細(xì)表
CREATE TABLE ods_kafka_order_detail (
 order_id BIGINT,
 store_id BIGINT,
 goods_id BIGINT,
 sales_amt double,
 PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'ods_kafka_order_detail',
 'properties.bootstrap.servers' = 'hadoop001:9092',
 'properties.group.id' = 'ods_group1',
  'key.format' = 'json',
 'value.format' = 'json'
);

--kafka中的 dwd層 訂單表
CREATE TABLE dwd_kafka_order (
 order_id BIGINT,
 store_id BIGINT,
 sales_amt double,
 PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'dwd_kafka_order',
 'properties.bootstrap.servers' = 'hadoop001:9092',
 'properties.group.id' = 'dwd_group1',
  'key.format' = 'json',
 'value.format' = 'json'
);

--kafka中的 dwd層 訂單明細(xì)表
CREATE TABLE dwd_kafka_order_detail (
 order_id BIGINT,
 store_id BIGINT,
 goods_id BIGINT,
 sales_amt double,
 PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'dwd_kafka_order_detail',
 'properties.bootstrap.servers' = 'hadoop001:9092',
 'properties.group.id' = 'dwd_group1',
  'key.format' = 'json',
 'value.format' = 'json'
);

--mysql中的dwa 訂單指標(biāo)統(tǒng)計(jì)
CREATE TABLE dwa_mysql_order_analysis (
	store_id BIGINT,
	store_name varchar(100),
	sales_goods_distinct_nums bigint,
	sales_amt double,
	order_nums bigint,
	PRIMARY KEY (store_id,store_name) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://hadoop002:3306/test',
   'table-name' = 'dwa_mysql_order_analysis',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = 'root',
	'sink.buffer-flush.max-rows' = '10'
);

3.4 任務(wù)流配置

--任務(wù)流配置
insert into ods_kafka_order select * from src_mysql_order;
insert into ods_kafka_order_detail select * from src_mysql_order_detail;
insert into dwd_kafka_order select * from ods_kafka_order;
insert into dwd_kafka_order_detail select * from ods_kafka_order_detail;

insert into dwa_mysql_order_analysis
select 
	orde.store_id as store_id
	,store.store_name as store_name
	,count(distinct order_detail.goods_id) as sales_goods_distinct_nums
	,sum(order_detail.sales_amt) as sales_amt
	,count(distinct orde.order_id) as order_nums
from dwd_kafka_order 		as orde
join dwd_kafka_order_detail	as order_detail
on 	 orde.order_id = order_detail.order_id
join dim_store 				as store 
on 	 orde.store_id = store.store_id 
group by 
	orde.store_id
	,store.store_name 
;

查看flink管理界面,可以看到有5個(gè)正在運(yùn)行的任務(wù),實(shí)時(shí)流就配置好了

flink 連接mysql,flinksql,mysql,flink,kafka?

4. 測(cè)試

4.1 插入測(cè)試數(shù)據(jù)

insert into src_mysql_order values 
(20221210001,10000,50),
(20221210002,10000,20),
(20221210003,10001,10);

insert into src_mysql_order_detail values 
(20221210001,10000,100000,30),
(20221210001,10000,100001,20),
(20221210002,10000,100001,20),
(20221210003,10001,100000,10);

insert into dim_store values 
(10000, '宇唐總店'),
(10001, '宇唐一店'),
(10002, '宇唐二店'),
(10003, '宇唐三店');

insert into dim_goods values 
(100000, '天獅達(dá)特濃縮棗漿'),
(100001, '蜜煉柚子茶');

4.2 查看結(jié)果表數(shù)據(jù)
flink 連接mysql,flinksql,mysql,flink,kafka?

4.3 新增測(cè)試數(shù)據(jù)

insert into src_mysql_order values  
(20221210004,10002,50), 
(20221210005,10003,30);

insert into src_mysql_order_detail values 
(20221210004,10002,100000,30),
(20221210004,10002,100001,20),
(20221210005,10003,100000,10),
(20221210005,10003,100001,20);

?4.4 再次查看結(jié)果表數(shù)據(jù)

flink 連接mysql,flinksql,mysql,flink,kafka?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-674164.html

到了這里,關(guān)于使用flink的sql-client.sh,測(cè)試mysql-->kafka-->kafka-->mysql實(shí)時(shí)流的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink SQL和Table API實(shí)現(xiàn)消費(fèi)kafka寫(xiě)入mysql

    Flink SQL和Table API實(shí)現(xiàn)消費(fèi)kafka寫(xiě)入mysql

    1、構(gòu)建 table環(huán)境 2、構(gòu)建source kafka 方式一:API 方式二:Flink SQL 3、構(gòu)建sink mysql? 4、寫(xiě)入將source表寫(xiě)入sink表 方式一:API 方式二:Flink SQL 5、手動(dòng)執(zhí)行 6、測(cè)試 (1)連接kafka生產(chǎn)者 (2)造數(shù)據(jù) (3)mysql查看入庫(kù)情況

    2024年01月16日
    瀏覽(24)
  • kerberos認(rèn)證Flink的kafka connector和kafka client配置

    kerberos認(rèn)證Flink的kafka connector和kafka client配置

    1. kafka配置文件 kafka jaas必須配置,如果缺少,則報(bào)一下錯(cuò)誤。 對(duì)于Flink只能通過(guò)配置 java.security.auth.login.config 的方式。 jaas配置 1.1 方式一: System.setProperty配置系統(tǒng)變量: kafka_client_jaas_keytab.conf文件內(nèi)容如下: 1.2 方法二:在IDEA中添加jvm參數(shù): 注意:將參數(shù)添加至kafka 的pr

    2024年02月04日
    瀏覽(24)
  • 20、Flink SQL之SQL Client: 不用編寫(xiě)代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上

    20、Flink SQL之SQL Client: 不用編寫(xiě)代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月11日
    瀏覽(23)
  • 【flink番外篇】21、Flink 通過(guò)SQL client 和 table api注冊(cè)catalog示例

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月21日
    瀏覽(25)
  • 30、Flink SQL之SQL 客戶(hù)端(通過(guò)kafka和filesystem的例子介紹了配置文件使用-表、視圖等)

    30、Flink SQL之SQL 客戶(hù)端(通過(guò)kafka和filesystem的例子介紹了配置文件使用-表、視圖等)

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月14日
    瀏覽(38)
  • 使用Flink實(shí)現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換:一個(gè)基于Flink的實(shí)踐指南

    使用Flink實(shí)現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換:一個(gè)基于Flink的實(shí)踐指南

    在現(xiàn)代數(shù)據(jù)處理架構(gòu)中,Kafka和MySQL是兩種非常流行的技術(shù)。Kafka作為一個(gè)高吞吐量的分布式消息系統(tǒng),常用于構(gòu)建實(shí)時(shí)數(shù)據(jù)流管道。而MySQL則是廣泛使用的關(guān)系型數(shù)據(jù)庫(kù),適用于存儲(chǔ)和查詢(xún)數(shù)據(jù)。在某些場(chǎng)景下,我們需要將Kafka中的數(shù)據(jù)實(shí)時(shí)地寫(xiě)入到MySQL數(shù)據(jù)庫(kù)中,本文將介紹

    2024年04月15日
    瀏覽(25)
  • 基于Flink CDC實(shí)時(shí)同步PostgreSQL與Tidb【Flink SQL Client模式下親測(cè)可行,詳細(xì)教程】

    操作系統(tǒng):ubuntu-22.04,運(yùn)行于wsl 2【 注意,請(qǐng)務(wù)必使用wsl 2 ;wsl 1會(huì)出現(xiàn)各種各樣的問(wèn)題】 軟件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳過(guò)此步 (1)pg安裝 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出現(xiàn)的問(wèn)題 sudo -u postgres psql 報(bào)錯(cuò): psql: err

    2024年02月11日
    瀏覽(30)
  • 使用Flink MySQL cdc分別sink到ES、Kafka、Hudi

    使用Flink MySQL cdc分別sink到ES、Kafka、Hudi

    [flink-1.13.1-bin-scala_2.11.tgz](https://archive.apache.org/dist/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.11.tgz) [hadoop-2.7.3.tar.gz](https://archive.apache.org/dist/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz) [flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)(git clone源碼編譯) [hudi](https://github.com/apache/hudi)(git

    2024年02月03日
    瀏覽(56)
  • 【flink sql】kafka連接器

    Kafka 連接器提供從 Kafka topic 中消費(fèi)和寫(xiě)入數(shù)據(jù)的能力。 前面已經(jīng)介紹了flink sql創(chuàng)建表的語(yǔ)法及說(shuō)明:【flink sql】創(chuàng)建表 這篇博客聊聊怎么通過(guò)flink sql連接kafka 以下的連接器元數(shù)據(jù)可以在表定義中通過(guò)元數(shù)據(jù)列的形式獲取。 R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫(xiě)的(

    2024年02月08日
    瀏覽(22)
  • Flink Upsert Kafka SQL Connector 介紹

    Flink Upsert Kafka SQL Connector 介紹

    在某些場(chǎng)景中,比方GROUP BY聚合之后的后果,須要去更新之前的結(jié)果值。這個(gè)時(shí)候,須要將 Kafka 記錄的 key 當(dāng)成主鍵解決,用來(lái)確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來(lái)解決。在 Flink1.11 中,能夠通過(guò) flink-cdc-connectors 項(xiàng)目提供的 changelog-json format 來(lái)實(shí)現(xiàn)該性能。 在

    2024年02月20日
    瀏覽(22)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包