FlinkSQL說明
- Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設(shè)計的一套符合標(biāo)準(zhǔn) SQL 語義的開發(fā)語言。
- 自 2015 年開始,阿里巴巴開始調(diào)研開源流計算引擎,最終決定基于 Flink 打造新一代計算引擎,針對 Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初將最終代碼開源,也就是我們熟知的 Blink。Blink 在原來的 Flink 基礎(chǔ)上最顯著的一個貢獻(xiàn)就是 Flink SQL 的實現(xiàn)。
- Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計算領(lǐng)域,比如 Storm、Spark Streaming 都會提供一些 Function 或者 Datastream API,用戶通過 Java 或 Scala 寫業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。
- 在 flink sql 中,對表名、字段名、函數(shù)名等是嚴(yán)格區(qū)分大小寫的,為了兼容 hive 等其他倉庫,建議建表時,表名和字段名都采用下劃線連接單詞的方式,以避免大小寫問題。比如 hive ,是不區(qū)分大小寫的,所有大寫字母最終都會被系統(tǒng)轉(zhuǎn)化為小寫字母,此時使用 flink sql 去讀寫 hive ,出現(xiàn)大寫字母時,會出現(xiàn)找不到表或字段的錯誤。關(guān)鍵字是不區(qū)分大小寫的,比如 insert、select、create等。flink sql 中所有的字符串常量都需要使用英文單引號括起來,不要使用英文雙引號以及中文符號。
前期準(zhǔn)備
依賴的環(huán)境
環(huán)境:Linux(Centos7)
Flink : 1.13.6
進(jìn)入Flink的lib目錄
cd flink-1.13.6/lib
上傳相關(guān)的依賴包,這幾個包在網(wǎng)上很容易找到
flink-sql-connector-mysql-cdc-2.1.0.jar
mysql-connector-java-8.0.13.jar
flink-sql-connector-postgres-cdc-1.2.0.jar
postgresql-42.6.0.jar
啟動 Flink客戶端
./flink-1.13.1/bin/sql-client.sh
Flink-SQL腳本
1、postgresql ->postgresql
-- pg中映射表,source
CREATE TABLE cdc_pg_source (
id INT,
age INT,
name STRING
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '10.254.21.3',
'port' = '54432',
'database-name' = 'postgres',
'schema-name' = 'public',
'username' = 'gpadmin',
'password' = 'xxxxxxx',
'table-name' = 'cdc_pg_source',
'decoding.plugin.name' = 'pgoutput',
'debezium.slot.name' = 'cdc_pg_source');
-- pg中映射表,sink
CREATE TABLE cdc_pg_sink (
id INT,
age INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://10.254.21.3:54432/postgres',
'username' = 'gpadmin',
'password' = 'xxxxxx',
'table-name' = 'cdc_pg_sink',
'sink.buffer-flush.max-rows' = '1');
-- flink job
INSERT INTO cdc_pg_sink select * from cdc_pg_source;
2、mysql -> mysql
CREATE TABLE t_test (
id bigint,
username string,
password string,
create_time time
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.252.92.4',
'port' = '3306',
'database-name' = 'flink_cdc_test',
'username' = 'root',
'password' = 'xxxx',
'table-name' = 't_test'
);
CREATE TABLE t_test_ods (
id bigint primary key,
username string,
password string,
create_time time
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.252.92.4:3306/flink_cdc_test_ods',
'username' = 'root',
'password' = 'xxxx',
'table-name' = 't_test',
'sink.buffer-flush.max-rows' = '1'
);
insert into t_test_ods select * from t_test;
遇到的問題
1、Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.? 或? Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
解決方法:文章來源:http://www.zghlxwxcb.cn/news/detail-426255.html
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
下載改JAR包,把它加到Flink下的lib路徑下,然后重啟sql-client;文章來源地址http://www.zghlxwxcb.cn/news/detail-426255.html
到了這里,關(guān)于Flink實戰(zhàn)-(6)FlinkSQL實現(xiàn)CDC的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!