1、JDBC SQL 連接器
FlinkSQL允許使用 JDBC連接器,向任意類型的關(guān)系型數(shù)據(jù)庫讀取或者寫入數(shù)據(jù)
添加Maven依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
注意:如果使用?sql-client客戶端,需保證?flink-1.17.1/lib 目錄下 存在相應(yīng)的jar包
?相關(guān)jar可以通過官網(wǎng)下載:JDBC SQL 連接器?
2、讀取 MySQL
FlinkSQL讀取MySQL表時(shí),為批式處理,在流式計(jì)算任務(wù)中,通常被做維表來使用
-- 在FlinkSQL中創(chuàng)建 MySQL Source 表
drop table mysql_source_table;
CREATE TABLE mysql_source_table (
`id` INT,
`title` STRING,
`author` STRING,
`price` DOUBLE,
`qty` INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://worker01/flink',
'driver' = 'com.mysql.jdbc.Driver', -- 【可選】不設(shè)置時(shí),將自動(dòng)從url中推導(dǎo)
'username' = 'xxxx',
'password' = 'xxxx',
'table-name' = 'books'
);
-- 批式 sql,查看 JDBC 表中的數(shù)據(jù)
select * from mysql_source_table;
運(yùn)行結(jié)果:
3、寫入MySQL
3.1 何時(shí)批量寫入MySQL呢?
FlinkSQL往MySQL寫入數(shù)據(jù)時(shí),默認(rèn)會(huì)在客戶端緩存數(shù)據(jù),當(dāng)觸發(fā)設(shè)置的閾值后,才會(huì)向服務(wù)端發(fā)送數(shù)據(jù)
開啟checkpoint :
# TODO 開啟checkpoint,當(dāng)checkpoint后,會(huì)觸發(fā)jdbc的flush操作
set execution.checkpointing.interval=300sec;
設(shè)置 flush 前緩存記錄的最大值 、flush 間隔時(shí)間:
-- TODO 創(chuàng)建sink mysql table
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (
`id` INT,
`title` STRING,
`author` STRING,
`price` DOUBLE,
`qty` INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
'username' = 'xxxx',
'password' = 'xxxx',
'table-name' = 'books',
'sink.buffer-flush.max-rows' = '100', -- flush 前緩存記錄的最大值,默認(rèn)值為100,設(shè)置為0時(shí),表示不緩存數(shù)據(jù)(來一條寫入一條)
'sink.buffer-flush.interval' = '50s' -- flush 間隔時(shí)間,超過該時(shí)間后異步線程將 flush 數(shù)據(jù)。默認(rèn)為1s
);
使用說明:
FLinkSQL寫入MySQL時(shí),常通過?sink.buffer-flush.max-rows、sink.buffer-flush.interval 來控制寫入數(shù)據(jù)的延遲程度
? ? ? ? 當(dāng) 對寫入實(shí)時(shí)性要求較高時(shí),可以將?sink.buffer-flush.max-rows = 0 ,表示到來一條數(shù)據(jù)后立即寫入MySQL,但帶來的后果是 長時(shí)間占有mysql連接
? ? ? ? 當(dāng) 數(shù)據(jù)量大且對實(shí)時(shí)要求不高時(shí),可根據(jù)業(yè)務(wù)需求調(diào)大配置,可使實(shí)時(shí)行和性能最優(yōu)
3.2 sink mysql table 中主鍵的作用
在FLinkSQL中創(chuàng)建sink mysql table時(shí),如果表中定義了主鍵,則連接器將以 upsert 模式工作
否則連接器將以 append 模式工作
? ? ? ? ?upsert 模式:Flink 將根據(jù)主鍵判斷插入新行或者更新已存在的行
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?使用這種模式時(shí),確保MySQL中的底表定義主鍵和添加唯一性約束
? ? ? ?append 模式:對MySQL庫中底表做insert操作
?upsert 模式:
-- TODO 創(chuàng)建MySQL 表
CREATE TABLE `books` (
`id` int(11) NOT NULL,
`title` varchar(99) DEFAULT NULL,
`author` varchar(99) DEFAULT NULL,
`price` double DEFAULT NULL,
`qty` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- TODO 創(chuàng)建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (
`id` INT,
`title` STRING,
`author` STRING,
`price` DOUBLE,
`qty` INT,
PRIMARY KEY (id) NOT ENFORCED -- 指定主鍵字段
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
'username' = 'root',
'password' = 'xxxx',
'table-name' = 'books',
'sink.buffer-flush.max-rows' = '0' -- 實(shí)時(shí)寫入
);
-- TODO 往 mysql中寫入數(shù)據(jù)(相同key的數(shù)據(jù)寫入后,會(huì)做upsert操作)
insert into mysql_sink_table
SELECT * FROM (VALUES
(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);
append 模式:
-- TODO 創(chuàng)建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (
`id` INT,
`title` STRING,
`author` STRING,
`price` DOUBLE,
`qty` INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
'username' = 'root',
'password' = 'xxx',
'table-name' = 'books',
'sink.buffer-flush.max-rows' = '0' -- 實(shí)時(shí)寫入
);
-- TODO 往 mysql中寫入數(shù)據(jù)(相同key的數(shù)據(jù)寫入后,會(huì)做操作)
insert into mysql_sink_table
SELECT * FROM (VALUES
(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);
注意:使用 append模式時(shí),如果MySQL底表中存在主鍵或唯一性約束時(shí),INSERT 插入可能會(huì)失敗
insert into 失?。?mark hidden color="red">文章來源:http://www.zghlxwxcb.cn/news/detail-735003.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-735003.html
到了這里,關(guān)于2.3 如何使用FlinkSQL讀取&寫入到JDBC(MySQL)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!