目錄
1. 環(huán)境介紹
2. mysql建表
3. flinksql建表
3.1 進(jìn)入flinksql客戶(hù)端?
?3.2 配置輸出格式
?3.3 flink建表
3.4 任務(wù)流配置
4. 測(cè)試
4.1 插入測(cè)試數(shù)據(jù)
4.2 查看結(jié)果表數(shù)據(jù)?
4.3 新增測(cè)試數(shù)據(jù)
4.4 再次查看結(jié)果表數(shù)據(jù)
1. 環(huán)境介紹
服務(wù) | 版本 |
---|---|
zookeeper | 3.8.0 |
kafka | 3.3.1 |
flink | 1.13.5 |
mysql | 5.7.34 |
jdk | 1.8 |
scala | 2.12 |
連接器 | 作用 |
---|---|
flink-sql-connector-upsert-kafka_2.11-1.13.6.jar | 連接kafka,支持主鍵更新 |
flink-connector-mysql-cdc-2.0.2.jar | 讀mysql |
flink-connector-jdbc_2.11-1.13.6.jar | 寫(xiě)mysql |
mysql-connector-java-5.1.37.jar | 連接mysql |
2. mysql中建表
CREATE TABLE src_mysql_order(
order_id BIGINT,
store_id BIGINT,
sales_amt double,
PRIMARY KEY (`order_id`)
);
CREATE TABLE src_mysql_order_detail(
order_id BIGINT,
store_id BIGINT,
goods_id BIGINT,
sales_amt double,
PRIMARY KEY (order_id,store_id,goods_id)
);
CREATE TABLE dim_store(
store_id BIGINT,
store_name varchar(100),
PRIMARY KEY (`store_id`)
);
CREATE TABLE dim_goods(
goods_id BIGINT,
goods_name varchar(100),
PRIMARY KEY (`goods_id`)
);
CREATE TABLE dwa_mysql_order_analysis (
store_id BIGINT,
store_name varchar(100),
sales_goods_distinct_nums bigint,
sales_amt double,
order_nums bigint,
PRIMARY KEY (store_id,store_name)
);
3. flinksql建表
3.1 進(jìn)入flinksql客戶(hù)端?
sql-client.sh embedded
?3.2 配置輸出格式
SET sql-client.execution.result-mode=tableau;
?
3.3 flink建表
--mysql中的 訂單主表
CREATE TABLE src_mysql_order(
order_id BIGINT,
store_id BIGINT,
sales_amt double,
PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop002',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'test',
'table-name' = 'src_mysql_order',
'scan.incremental.snapshot.enabled' = 'false'
);
--mysql中的 訂單明細(xì)表
CREATE TABLE src_mysql_order_detail(
order_id BIGINT,
store_id BIGINT,
goods_id BIGINT,
sales_amt double,
PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop002',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'test',
'table-name' = 'src_mysql_order_detail',
'scan.incremental.snapshot.enabled' = 'false'
);
--mysql中的 商店維表
CREATE TABLE dim_store(
store_id BIGINT,
store_name varchar(100),
PRIMARY KEY (`store_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop002',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'test',
'table-name' = 'dim_store',
'scan.incremental.snapshot.enabled' = 'false'
);
--mysql中的 商品維表
CREATE TABLE dim_goods(
goods_id BIGINT,
goods_name varchar(100),
PRIMARY KEY (`goods_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop002',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'test',
'table-name' = 'dim_goods',
'scan.incremental.snapshot.enabled' = 'false'
);
--kafka中的 ods層 訂單表
CREATE TABLE ods_kafka_order (
order_id BIGINT,
store_id BIGINT,
sales_amt double,
PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'ods_kafka_order',
'properties.bootstrap.servers' = 'hadoop001:9092',
'properties.group.id' = 'ods_group1',
'key.format' = 'json',
'value.format' = 'json'
);
----kafka中的 ods層 訂單明細(xì)表
CREATE TABLE ods_kafka_order_detail (
order_id BIGINT,
store_id BIGINT,
goods_id BIGINT,
sales_amt double,
PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'ods_kafka_order_detail',
'properties.bootstrap.servers' = 'hadoop001:9092',
'properties.group.id' = 'ods_group1',
'key.format' = 'json',
'value.format' = 'json'
);
--kafka中的 dwd層 訂單表
CREATE TABLE dwd_kafka_order (
order_id BIGINT,
store_id BIGINT,
sales_amt double,
PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'dwd_kafka_order',
'properties.bootstrap.servers' = 'hadoop001:9092',
'properties.group.id' = 'dwd_group1',
'key.format' = 'json',
'value.format' = 'json'
);
--kafka中的 dwd層 訂單明細(xì)表
CREATE TABLE dwd_kafka_order_detail (
order_id BIGINT,
store_id BIGINT,
goods_id BIGINT,
sales_amt double,
PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'dwd_kafka_order_detail',
'properties.bootstrap.servers' = 'hadoop001:9092',
'properties.group.id' = 'dwd_group1',
'key.format' = 'json',
'value.format' = 'json'
);
--mysql中的dwa 訂單指標(biāo)統(tǒng)計(jì)
CREATE TABLE dwa_mysql_order_analysis (
store_id BIGINT,
store_name varchar(100),
sales_goods_distinct_nums bigint,
sales_amt double,
order_nums bigint,
PRIMARY KEY (store_id,store_name) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop002:3306/test',
'table-name' = 'dwa_mysql_order_analysis',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.max-rows' = '10'
);
3.4 任務(wù)流配置
--任務(wù)流配置
insert into ods_kafka_order select * from src_mysql_order;
insert into ods_kafka_order_detail select * from src_mysql_order_detail;
insert into dwd_kafka_order select * from ods_kafka_order;
insert into dwd_kafka_order_detail select * from ods_kafka_order_detail;
insert into dwa_mysql_order_analysis
select
orde.store_id as store_id
,store.store_name as store_name
,count(distinct order_detail.goods_id) as sales_goods_distinct_nums
,sum(order_detail.sales_amt) as sales_amt
,count(distinct orde.order_id) as order_nums
from dwd_kafka_order as orde
join dwd_kafka_order_detail as order_detail
on orde.order_id = order_detail.order_id
join dim_store as store
on orde.store_id = store.store_id
group by
orde.store_id
,store.store_name
;
查看flink管理界面,可以看到有5個(gè)正在運(yùn)行的任務(wù),實(shí)時(shí)流就配置好了
?文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-674164.html
4. 測(cè)試
4.1 插入測(cè)試數(shù)據(jù)
insert into src_mysql_order values
(20221210001,10000,50),
(20221210002,10000,20),
(20221210003,10001,10);
insert into src_mysql_order_detail values
(20221210001,10000,100000,30),
(20221210001,10000,100001,20),
(20221210002,10000,100001,20),
(20221210003,10001,100000,10);
insert into dim_store values
(10000, '宇唐總店'),
(10001, '宇唐一店'),
(10002, '宇唐二店'),
(10003, '宇唐三店');
insert into dim_goods values
(100000, '天獅達(dá)特濃縮棗漿'),
(100001, '蜜煉柚子茶');
4.2 查看結(jié)果表數(shù)據(jù)
?
4.3 新增測(cè)試數(shù)據(jù)
insert into src_mysql_order values
(20221210004,10002,50),
(20221210005,10003,30);
insert into src_mysql_order_detail values
(20221210004,10002,100000,30),
(20221210004,10002,100001,20),
(20221210005,10003,100000,10),
(20221210005,10003,100001,20);
?4.4 再次查看結(jié)果表數(shù)據(jù)
?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-674164.html
到了這里,關(guān)于使用flink的sql-client.sh,測(cè)試mysql-->kafka-->kafka-->mysql實(shí)時(shí)流的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!