簡(jiǎn)介
本文介紹 Flink SQL如何流式寫(xiě)入 Apache Doris,分為一下幾個(gè)部分:
- Flink Doris connector
- Doris FE 節(jié)點(diǎn)配置
- Flink SQL 寫(xiě) Doris
Flink Doris connector
Flink Doris connector 本質(zhì)是通過(guò)Stream Load來(lái)時(shí)實(shí)現(xiàn)數(shù)據(jù)的查詢和寫(xiě)入功能。
支持二階段提交,可實(shí)現(xiàn)Exatly Once的寫(xiě)入。
Doris FE 節(jié)點(diǎn)配置
1)需在 apache-doris/fe/fe.conf 配置文件添加如下配置:
enable_http_server_v2 = true
- 重啟 FE 節(jié)點(diǎn)
apache-doris/fe/bin/stop_fe.sh
apache-doris/fe/bin/start_fe.sh --daemon
Doris BE 節(jié)點(diǎn)配置
1)需在 apache-doris/be/be.conf 配置文件添加如下配置:
enable_stream_load_record = true
- 重啟 BE 節(jié)點(diǎn)
apache-doris/be/bin/stop_be.sh
apache-doris/be/bin/start_be.sh --daemon
Doris 建表語(yǔ)句
CREATE TABLE order_info (
order_date date NOT NULL COMMENT '下單日期',
order_id int(11) NOT NULL COMMENT '訂單id',
buy_num tinyint(4) NULL COMMENT '購(gòu)買件數(shù)',
user_id int(11) NULL COMMENT '[-9223372036854775808, 9223372036854775807]',
create_time datetime NULL COMMENT '創(chuàng)建時(shí)間',
update_time datetime NULL COMMENT '更新時(shí)間'
) ENGINE=OLAP
DUPLICATE KEY(order_date, order_id)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(order_id) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
本例使用的明細(xì)模型,僅insert操作,如需update/delete,則需選擇Unique模型
生成測(cè)試數(shù)據(jù)
通過(guò)Flink SQL自帶的datagen生成測(cè)試數(shù)據(jù):
CREATE TABLE order_info_source (
order_date DATE,
order_id INT,
buy_num INT,
user_id INT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.order_id.min' = '30001',
'fields.order_id.max' = '30500',
'fields.user_id.min' = '10001',
'fields.user_id.max' = '20001',
'fields.buy_num.min' = '10',
'fields.buy_num.max' = '20',
'number-of-rows' = '100'
)
datagen參數(shù):'rows-per-second' = '10'
: 每秒發(fā)送10條數(shù)據(jù)'fields.order_id.min' = '30001'
: order_id最小值為30001'fields.order_id.max' = '30500'
: order_id最大值為30500'fields.user_id.min' = '10001'
: user_id最小值為10001'fields.user_id.max' = '20001'
: user_id最大值為20001'fields.buy_num.min' = '10'
: buy_num最小值為10'fields.buy_num.max' = '20'
: buy_num最大值為20'number-of-rows' = '100'
: 共發(fā)送100條數(shù)據(jù), 不設(shè)置的話會(huì)無(wú)限量發(fā)送數(shù)據(jù)
更多細(xì)節(jié),請(qǐng)前往官網(wǎng)DataGen SQL Connector
注冊(cè)Doris Sink表
CREATE TABLE order_info_sink (
order_date DATE,
order_id INT,
buy_num INT,
user_id INT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3)
)
WITH (
'connector' = 'doris',
'fenodes' = '192.168.56.104:8030',
'table.identifier' = 'test.order_info_example',
'username' = 'test',
'password' = 'password123',
'sink.label-prefix' = 'sink_doris_label_8'
)
寫(xiě)入Doris Sink表
insert into order_info_sink select * from order_info_source
通過(guò)Mysql客戶端查看Doris表的數(shù)據(jù)文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-403945.html
mysql> select * from test.order_info_example limit 10;
+------------+----------+---------+---------+---------------------+---------------------+
| order_date | order_id | buy_num | user_id | create_time | update_time |
+------------+----------+---------+---------+---------------------+---------------------+
| 2022-09-22 | 30007 | 10 | 10560 | 2022-09-22 07:42:21 | 2022-09-22 07:42:21 |
| 2022-09-22 | 30125 | 16 | 17591 | 2022-09-22 07:42:26 | 2022-09-22 07:42:26 |
| 2022-09-22 | 30176 | 17 | 10871 | 2022-09-22 07:42:24 | 2022-09-22 07:42:24 |
| 2022-09-22 | 30479 | 16 | 19847 | 2022-09-22 07:42:25 | 2022-09-22 07:42:25 |
| 2022-09-22 | 30128 | 16 | 19807 | 2022-09-22 07:42:24 | 2022-09-22 07:42:24 |
| 2022-09-22 | 30039 | 13 | 18237 | 2022-09-22 07:42:28 | 2022-09-22 07:42:28 |
| 2022-09-22 | 30060 | 10 | 18309 | 2022-09-22 07:42:24 | 2022-09-22 07:42:24 |
| 2022-09-22 | 30246 | 18 | 10855 | 2022-09-22 07:42:24 | 2022-09-22 07:42:24 |
| 2022-09-22 | 30288 | 19 | 12347 | 2022-09-22 07:42:26 | 2022-09-22 07:42:26 |
| 2022-09-22 | 30449 | 17 | 11488 | 2022-09-22 07:42:23 | 2022-09-22 07:42:23 |
+------------+----------+---------+---------+---------------------+---------------------+
10 rows in set (0.05 sec)
完整代碼
src/main/java/FlinkSQLSinkExample.java文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-403945.html
到了這里,關(guān)于Apache Doris 系列: 基礎(chǔ)篇-Flink SQL寫(xiě)入Doris的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!