Flink 系列文章
一、Flink 專欄
Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 -
2、Flink基礎(chǔ)系列
本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 -
3、Flik Table API和SQL基礎(chǔ)系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫(kù)、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。 -
4、Flik Table API和SQL提高與應(yīng)用系列
本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開(kāi)發(fā)難度的內(nèi)容。 -
5、Flink 監(jiān)控系列
本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說(shuō)明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過(guò)鏈接即可看出介紹的內(nèi)容。
兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引
本文簡(jiǎn)單的介紹了flink sql讀取外部系統(tǒng)的jdbc示例(每個(gè)示例均是驗(yàn)證通過(guò)的,并且具體給出了運(yùn)行環(huán)境的版本)。
本文依賴環(huán)境是hadoop、kafka、mysql環(huán)境好用,如果是ha環(huán)境則需要zookeeper的環(huán)境。
一、Table & SQL Connectors 示例:JDBC
1、maven依賴(java編碼依賴)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
在連接到具體數(shù)據(jù)庫(kù)時(shí),也需要對(duì)應(yīng)的驅(qū)動(dòng)依賴,目前支持的驅(qū)動(dòng)如下:
驅(qū)動(dòng)jar需放在flink的安裝目錄lib下,且需要重啟服務(wù)。
本示例jar包有
flink-connector-jdbc_2.11-1.13.6.jar
mysql-connector-java-5.1.5.jar 或
mysql-connector-java-6.0.6.jar(1.17版本中使用的mysql驅(qū)動(dòng),用上面mysql驅(qū)動(dòng)有異常信息)
2、創(chuàng)建 JDBC 表
JDBC table 可以按如下定義,以下示例中包含創(chuàng)建表、批量插入以及l(fā)eft join的維表。
1)、創(chuàng)建jdbc表,并插入、查詢
-- 在 Flink SQL 中注冊(cè)一張 MySQL 表 'users'
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
-------------------具體事例----------------------------------
-- 在 Flink SQL 中注冊(cè)一張 MySQL 表 'user'
CREATE TABLE Alan_JDBC_User_Table (
id BIGINT,
name STRING,
age INT,
balance DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.10.44:3306/test',
'table-name' = 'user'
);
-- mysql中的數(shù)據(jù)
mysql> select * from user;
+----+-------------+------+---------+-----------------------+------------+
| id | name | age | balance | email | pwd |
+----+-------------+------+---------+-----------------------+------------+
| 1 | aa6 | 61 | 60000 | 6@163.com | 123456 |
| 2 | aa4 | 71 | 70000 | 7@163.com | 7123 |
| 4 | test | NULL | NULL | NULL | NULL |
| 5 | test2 | NULL | NULL | NULL | NULL |
| 7 | alanchanchn | 19 | 800 | alan.chan.chn@163.com | vx |
| 8 | alanchan | 19 | 800 | alan.chan.chn@163.com | sink mysql |
+----+-------------+------+---------+-----------------------+------------+
6 rows in set (0.00 sec)
---------在flink sql中建表并查詢--------
Flink SQL> CREATE TABLE Alan_JDBC_User_Table (
> id BIGINT,
> name STRING,
> age INT,
> balance DOUBLE,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.168.10.44:3306/test',
> 'table-name' = 'user'
> );
[INFO] Execute statement succeed.
Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op | id | name | age | balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I | 1 | aa6 | 61 | 60000.0 |
| +I | 2 | aa4 | 71 | 70000.0 |
| +I | 4 | test | (NULL) | (NULL) |
| +I | 5 | test2 | (NULL) | (NULL) |
| +I | 7 | alanchanchn | 19 | 800.0 |
| +I | 8 | alanchan | 19 | 800.0 |
+----+----------------------+--------------------------------+-------------+--------------------------------+
Received a total of 6 rows
2)、批量插入表數(shù)據(jù)
-- 從另一張表 "T" 將數(shù)據(jù)寫入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
---------創(chuàng)建數(shù)據(jù)表----------------------
CREATE TABLE source_table (
userId INT,
age INT,
balance DOUBLE,
userName STRING,
t_insert_time AS localtimestamp,
WATERMARK FOR t_insert_time AS t_insert_time
) WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.userId.kind'='sequence',
'fields.userId.start'='1',
'fields.userId.end'='5000',
'fields.balance.kind'='random',
'fields.balance.min'='1',
'fields.balance.max'='100',
'fields.age.min'='1',
'fields.age.max'='1000',
'fields.userName.length'='10'
);
-- 從另一張表 "source_table" 將數(shù)據(jù)寫入到 JDBC 表中
INSERT INTO Alan_JDBC_User_Table
SELECT userId, userName, age, balance FROM source_table;
-- 查看 JDBC 表中的數(shù)據(jù)
select * from Alan_JDBC_User_Table;
---------------flink sql中查詢----------------------------------
Flink SQL> INSERT INTO Alan_JDBC_User_Table
> SELECT userId, userName, age, balance FROM source_table;
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e91cd3c41ac20aaf8eab79f0094f9e46
Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op | id | name | age | balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I | 1 | ead5352794 | 513 | 4.0 |
| +I | 2 | 728297a8d9 | 410 | 35.0 |
| +I | 3 | 643c2226cd | 142 | 80.0 |
......
-------------驗(yàn)證mysql中的數(shù)據(jù)是否寫入,此處只查總數(shù)----------------
mysql> select count(*) from user;
+----------+
| count(*) |
+----------+
| 2005 |
+----------+
1 row in set (0.00 sec)
3)、JDBC 表在時(shí)態(tài)表關(guān)聯(lián)中作為維表
-- 1、創(chuàng)建 JDBC 表在時(shí)態(tài)表關(guān)聯(lián)中作為維表
CREATE TABLE Alan_JDBC_User_Table (
id BIGINT,
name STRING,
age INT,
balance DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.10.44:3306/test',
'table-name' = 'user'
);
-----2、查詢表中的數(shù)據(jù)(實(shí)際數(shù)據(jù)是之前測(cè)試的結(jié)果) -----
Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op | id | name | age | balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I | 1 | ead5352794 | 513 | 4.0 |
| +I | 2 | 728297a8d9 | 410 | 35.0 |
| +I | 3 | 643c2226cd | 142 | 80.0 |
| +I | 4 | 6115f11f01 | 633 | 69.0 |
| +I | 5 | 044ba5fa2f | 74 | 71.0 |
| +I | 6 | 98a112dc87 | 729 | 54.0 |
| +I | 7 | 705326a369 | 846 | 99.0 |
| +I | 8 | 532692924f | 872 | 79.0 |
| +I | 9 | b816802948 | 475 | 67.0 |
| +I | 10 | 06906bebb2 | 109 | 57.0 |
......
-----3、創(chuàng)建事實(shí)表,以kafka表作為代表 -----
CREATE TABLE Alan_KafkaTable_3 (
user_id BIGINT, -- 用戶id
item_id BIGINT, -- 商品id
action STRING, -- 用戶行為
ts BIGINT, -- 用戶行為發(fā)生的時(shí)間戳
proctime as PROCTIME(), -- 通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件時(shí)間
WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND -- 在eventTime上定義watermark
) WITH (
'connector' = 'kafka',
'topic' = 'testtopic',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
-----4、發(fā)送kafka消息,同時(shí)觀察事實(shí)表中的數(shù)據(jù) -----
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic testtopic
>1,1001,"login",1692593500222
>2,1002,"p_read",1692593502242
>
Flink SQL> select * from Alan_KafkaTable_3;
+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op | user_id | item_id | action | ts | proctime | event_time |
+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I | 1 | 1001 | login | 1692593500222 | 2023-08-22 05:33:38.830 | 2023-08-22 05:39:54.439 |
| +I | 2 | 1002 | p_read | 1692593502242 | 2023-08-22 05:33:38.833 | 2023-08-22 05:40:41.284 |
Query terminated, received a total of 2 rows
-----5、以jdbc的維表進(jìn)行關(guān)聯(lián)查詢事實(shí)表數(shù)據(jù)-----
SELECT
kafkamessage.user_id,
kafkamessage.item_id,
kafkamessage.action,
jdbc_dim_table.name,
jdbc_dim_table.age,
jdbc_dim_table.balance
FROM Alan_KafkaTable_3 AS kafkamessage
LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;
Flink SQL> SELECT
> kafkamessage.user_id,
> kafkamessage.item_id,
> kafkamessage.action,
> jdbc_dim_table.name,
> jdbc_dim_table.age,
> jdbc_dim_table.balance
> FROM Alan_KafkaTable_3 AS kafkamessage
> LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;
+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| op | user_id | item_id | action | name | age | balance |
+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| +I | 1 | 1001 | login | ead5352794 | 513 | 4.0 |
| +I | 2 | 1002 | p_read | 728297a8d9 | 410 | 35.0 |
- java
該部分示例僅僅是以java實(shí)現(xiàn)創(chuàng)建表及查詢,簡(jiǎn)單示例。
// 注冊(cè)名為 “jdbcOutputTable” 的JDBC表
String sinkDDL = "create table jdbcOutputTable (" +
"id bigint not null, " +
"name varchar(20) , " +
"age int ,"+
"balance bigint,"+
"pwd varchar(20),"+
"email varchar(20) , PRIMARY KEY (id) NOT ENFORCED" +
") with (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://192.168.10.44:3306/test', " +
" 'connector.table' = 'user', " +
" 'connector.driver' = 'com.mysql.jdbc.Driver', " +
" 'connector.username' = 'root', " +
" 'connector.password' = '123456' )";
tenv.executeSql(sinkDDL);
String sql = "SELECT * FROM jdbcOutputTable ";
String sql2 = "SELECT * FROM jdbcOutputTable where name like '%alan%'";
Table table = tenv.sqlQuery(sql2);
table.printSchema();
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
//運(yùn)行結(jié)果
(
`id` BIGINT NOT NULL,
`name` VARCHAR(20),
`age` INT,
`balance` BIGINT,
`pwd` VARCHAR(20),
`email` VARCHAR(20)
)
15> (true,+I[7, alanchanchn, 19, 800, vx, alan.chan.chn@163.com])
15> (true,+I[8, alanchan, 19, 800, sink mysql, alan.chan.chn@163.com])
3、連接器參數(shù)
4、已棄用的配置
這些棄用配置已經(jīng)被上述的新配置代替,而且最終會(huì)被棄用。請(qǐng)優(yōu)先考慮使用新配置。
5、特性
1)、鍵處理
當(dāng)寫入數(shù)據(jù)到外部數(shù)據(jù)庫(kù)時(shí),F(xiàn)link 會(huì)使用 DDL 中定義的主鍵。如果定義了主鍵,則連接器將以 upsert 模式工作,否則連接器將以 append 模式工作。
在 upsert 模式下,F(xiàn)link 將根據(jù)主鍵判斷插入新行或者更新已存在的行,這種方式可以確保冪等性。為了確保輸出結(jié)果是符合預(yù)期的,推薦為表定義主鍵并且確保主鍵是底層數(shù)據(jù)庫(kù)中表的唯一鍵或主鍵。在 append 模式下,F(xiàn)link 會(huì)把所有記錄解釋為 INSERT 消息,如果違反了底層數(shù)據(jù)庫(kù)中主鍵或者唯一約束,INSERT 插入可能會(huì)失敗。
有關(guān) PRIMARY KEY 語(yǔ)法的更多詳細(xì)信息,請(qǐng)參見(jiàn) 22、Flink 的table api與sql之創(chuàng)建表的DDL。
2)、分區(qū)掃描
為了在并行 Source task 實(shí)例中加速讀取數(shù)據(jù),F(xiàn)link 為 JDBC table 提供了分區(qū)掃描的特性。
如果下述分區(qū)掃描參數(shù)中的任一項(xiàng)被指定,則下述所有的分區(qū)掃描參數(shù)必須都被指定。這些參數(shù)描述了在多個(gè) task 并行讀取數(shù)據(jù)時(shí)如何對(duì)表進(jìn)行分區(qū)。 scan.partition.column 必須是相關(guān)表中的數(shù)字、日期或時(shí)間戳列。
scan.partition.lower-bound 和 scan.partition.upper-bound 用于決定分區(qū)的起始位置和過(guò)濾表中的數(shù)據(jù)。如果是批處理作業(yè),也可以在提交 flink 作業(yè)之前獲取最大值和最小值。
- scan.partition.column:輸入用于進(jìn)行分區(qū)的列名。
- scan.partition.num:分區(qū)數(shù)。
- scan.partition.lower-bound:第一個(gè)分區(qū)的最小值。
- scan.partition.upper-bound:最后一個(gè)分區(qū)的最大值。
3)、Lookup Cache
JDBC 連接器可以用在時(shí)態(tài)表關(guān)聯(lián)中作為一個(gè)可 lookup 的 source (又稱為維表),當(dāng)前只支持同步的查找模式。
默認(rèn)情況下,lookup cache 是未啟用的,你可以將 lookup.cache 設(shè)置為 PARTIAL 參數(shù)來(lái)啟用。
lookup cache 的主要目的是用于提高時(shí)態(tài)表關(guān)聯(lián) JDBC 連接器的性能。
默認(rèn)情況下,lookup cache 不開(kāi)啟,所以所有請(qǐng)求都會(huì)發(fā)送到外部數(shù)據(jù)庫(kù)。
當(dāng) lookup cache 被啟用時(shí),每個(gè)進(jìn)程(即 TaskManager)將維護(hù)一個(gè)緩存。Flink 將優(yōu)先查找緩存,只有當(dāng)緩存未查找到時(shí)才向外部數(shù)據(jù)庫(kù)發(fā)送請(qǐng)求,并使用返回的數(shù)據(jù)更新緩存。 當(dāng)緩存命中最大緩存行 lookup.partial-cache.max-rows 或當(dāng)行超過(guò) lookup.partial-cache.expire-after-write 或 lookup.partial-cache.expire-after-access 指定的最大存活時(shí)間時(shí),緩存中的行將被設(shè)置為已過(guò)期。 緩存中的記錄可能不是最新的,用戶可以將緩存記錄超時(shí)設(shè)置為一個(gè)更小的值以獲得更好的刷新數(shù)據(jù),但這可能會(huì)增加發(fā)送到數(shù)據(jù)庫(kù)的請(qǐng)求數(shù)。
所以要做好吞吐量和正確性之間的平衡。
默認(rèn)情況下,flink 會(huì)緩存主鍵的空查詢結(jié)果,你可以通過(guò)將 lookup.partial-cache.cache-missing-key 設(shè)置為 false 來(lái)切換行為。
4)、冪等寫入
如果在 DDL 中定義了主鍵,JDBC sink 將使用 upsert 語(yǔ)義而不是普通的 INSERT 語(yǔ)句。upsert 語(yǔ)義指的是如果底層數(shù)據(jù)庫(kù)中存在違反唯一性約束,則原子地添加新行或更新現(xiàn)有行,這種方式確保了冪等性。
如果出現(xiàn)故障,F(xiàn)link 作業(yè)會(huì)從上次成功的 checkpoint 恢復(fù)并重新處理,這可能導(dǎo)致在恢復(fù)過(guò)程中重復(fù)處理消息。強(qiáng)烈推薦使用 upsert 模式,因?yàn)槿绻枰貜?fù)處理記錄,它有助于避免違反數(shù)據(jù)庫(kù)主鍵約束和產(chǎn)生重復(fù)數(shù)據(jù)。
除了故障恢復(fù)場(chǎng)景外,數(shù)據(jù)源(kafka topic)也可能隨著時(shí)間的推移自然地包含多個(gè)具有相同主鍵的記錄,這使得 upsert 模式是用戶期待的。
由于 upsert 沒(méi)有標(biāo)準(zhǔn)的語(yǔ)法,因此下表描述了不同數(shù)據(jù)庫(kù)的 DML 語(yǔ)法:
5、JDBC Catalog
JdbcCatalog 允許用戶通過(guò) JDBC 協(xié)議將 Flink 連接到關(guān)系數(shù)據(jù)庫(kù)。
目前,JDBC Catalog 有兩個(gè)實(shí)現(xiàn),即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。
// Postgres Catalog & MySQL Catalog 支持的方法
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);
其他的 Catalog 方法現(xiàn)在尚不支持。
1)、JDBC Catalog 的使用
本小節(jié)主要描述如果創(chuàng)建并使用 Postgres Catalog 或 MySQL Catalog。
本處描述的版本是flink 1.17,flink1.13版本只支持postgresql,在1.13版本中執(zhí)行會(huì)出現(xiàn)如下異常:
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Catalog for 'org.apache.flink.connector.jdbc.dialect.MySQLDialect@1bc49bc5' is not supported yet.
JDBC catalog 支持以下參數(shù):
-
name:必填,catalog 的名稱。
-
default-database:必填,默認(rèn)要連接的數(shù)據(jù)庫(kù)。
-
username:必填,Postgres/MySQL 賬戶的用戶名。
-
password:必填,賬戶的密碼。
-
base-url:必填,(不應(yīng)該包含數(shù)據(jù)庫(kù)名)
對(duì)于 Postgres Catalog base-url 應(yīng)為 “jdbc:postgresql://:” 的格式。
對(duì)于 MySQL Catalog base-url 應(yīng)為 “jdbc:mysql://:” 的格式。 -
sql
---需要將mysql-connector-java-6.0.6.jar、flink-connector-jdbc-3.1.0-1.17.jar放在flink的lib目錄,并重啟flink集群
CREATE CATALOG alan_catalog WITH(
'type' = 'jdbc',
'default-database' = 'test',
'username' = 'root',
'password' = '123456',
'base-url' = 'jdbc:mysql://192.168.10.44:3306'
);
USE CATALOG alan_catalog;
---------------------------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
> 'type' = 'jdbc',
> 'default-database' = 'test?useSSL=false',
> 'username' = 'root',
> 'password' = '123456',
> 'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.
Flink SQL> show CATALOGS;
+-----------------+
| catalog name |
+-----------------+
| alan_catalog |
| default_catalog |
+-----------------+
2 rows in set
Flink SQL> use CATALOG alan_catalog;
[INFO] Execute statement succeed.
- java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "my_catalog";
String defaultDatabase = "mydb";
String username = "...";
String password = "...";
String baseUrl = "..."
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("my_catalog", catalog);
// 設(shè)置 JdbcCatalog 為會(huì)話的當(dāng)前 catalog
tableEnv.useCatalog("my_catalog");
- yaml
execution:
...
current-catalog: alan_catalog # 設(shè)置目標(biāo) JdbcCatalog 為會(huì)話的當(dāng)前 catalog
current-database: test
catalogs:
- name:alan_catalog
type: jdbc
default-database: test
username: ...
password: ...
base-url: ...
2)、JDBC Catalog for PostgreSQL
- PostgreSQL 元空間映射
除了數(shù)據(jù)庫(kù)之外,postgreSQL 還有一個(gè)額外的命名空間 schema。一個(gè) Postgres 實(shí)例可以擁有多個(gè)數(shù)據(jù)庫(kù),每個(gè)數(shù)據(jù)庫(kù)可以擁有多個(gè) schema,其中一個(gè) schema 默認(rèn)名為 “public”,每個(gè) schema 可以包含多張表。 在 Flink 中,當(dāng)查詢由 Postgres catalog 注冊(cè)的表時(shí),用戶可以使用 schema_name.table_name 或只有 table_name,其中 schema_name 是可選的,默認(rèn)值為 “public”。
因此,F(xiàn)link Catalog 和 Postgres 之間的元空間映射如下:
Flink 中的 Postgres 表的完整路徑應(yīng)該是 “..<schema.table>
”。如果指定了 schema,請(qǐng)注意需要轉(zhuǎn)義 <schema.table>。
這里提供了一些訪問(wèn) Postgres 表的例子:
-- 掃描 'public' schema(即默認(rèn) schema)中的 'test_table' 表,schema 名稱可以省略
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
-- 掃描 'custom_schema' schema 中的 'test_table2' 表,
-- 自定義 schema 不能省略,并且必須與表一起轉(zhuǎn)義。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
3)、JDBC Catalog for MySQL
- MySQL 元空間映射
MySQL 實(shí)例中的數(shù)據(jù)庫(kù)與 MySQL Catalog 注冊(cè)的 catalog 下的數(shù)據(jù)庫(kù)處于同一個(gè)映射層級(jí)。一個(gè) MySQL 實(shí)例可以擁有多個(gè)數(shù)據(jù)庫(kù),每個(gè)數(shù)據(jù)庫(kù)可以包含多張表。 在 Flink 中,當(dāng)查詢由 MySQL catalog 注冊(cè)的表時(shí),用戶可以使用 database.table_name 或只使用 table_name,其中 database 是可選的,默認(rèn)值為創(chuàng)建 MySQL Catalog 時(shí)指定的默認(rèn)數(shù)據(jù)庫(kù)。
因此,F(xiàn)link Catalog 和 MySQL catalog 之間的元空間映射如下:
Flink 中的 MySQL 表的完整路徑應(yīng)該是 “<catalog>
.<db>
.<table>
”。
這里提供了一些訪問(wèn) MySQL 表的例子(在版本1.17中完成):
-- 掃描 默認(rèn)數(shù)據(jù)庫(kù)(test)中的 'person' 表
select * from alan_catalog.test.person;
select * from test.person;
select * from person;
-- 掃描 'cdhhive' 數(shù)據(jù)庫(kù)中的 'version' 表,
select * from alan_catalog.cdhhive.version;
select * from cdhhive.version;
select * from version;
---------------具體操作詳見(jiàn)下文------------------
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.
Flink SQL> CREATE CATALOG alan_catalog WITH(
> 'type' = 'jdbc',
> 'default-database' = 'test?useSSL=false',
> 'username' = 'root',
> 'password' = '123456',
> 'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| alan_catalog |
| default_catalog |
+-----------------+
2 rows in set
Flink SQL> select * from alan_catalog.test.person;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 11 | 測(cè)試修改go語(yǔ)言 | 30 |
| +I | 13 | NameUpdate | 22 |
| +I | 14 | updatejson | 23 |
| +I | 189 | 再試一試 | 12 |
| +I | 191 | test-full-update | 3333 |
| +I | 889 | zhangsanswagger2 | 88 |
| +I | 892 | update | 189 |
| +I | 1001 | testupdate | 19 |
| +I | 1002 | 測(cè)試go語(yǔ)言 | 23 |
| +I | 1013 | slene | 0 |
| +I | 1014 | testing | 0 |
| +I | 1015 | testing | 18 |
| +I | 1016 | astaxie | 19 |
| +I | 1017 | alan | 18 |
| +I | 1018 | chan | 19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows
Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.
Flink SQL> select * from test.person;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 11 | 測(cè)試修改go語(yǔ)言 | 30 |
| +I | 13 | NameUpdate | 22 |
| +I | 14 | updatejson | 23 |
| +I | 189 | 再試一試 | 12 |
| +I | 191 | test-full-update | 3333 |
| +I | 889 | zhangsanswagger2 | 88 |
| +I | 892 | update | 189 |
| +I | 1001 | testupdate | 19 |
| +I | 1002 | 測(cè)試go語(yǔ)言 | 23 |
| +I | 1013 | slene | 0 |
| +I | 1014 | testing | 0 |
| +I | 1015 | testing | 18 |
| +I | 1016 | astaxie | 19 |
| +I | 1017 | alan | 18 |
| +I | 1018 | chan | 19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows
Flink SQL> use alan_catalog.test;
[INFO] Execute statement succeed.
Flink SQL> select * from person;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 11 | 測(cè)試修改go語(yǔ)言 | 30 |
| +I | 13 | NameUpdate | 22 |
| +I | 14 | updatejson | 23 |
| +I | 189 | 再試一試 | 12 |
| +I | 191 | test-full-update | 3333 |
| +I | 889 | zhangsanswagger2 | 88 |
| +I | 892 | update | 189 |
| +I | 1001 | testupdate | 19 |
| +I | 1002 | 測(cè)試go語(yǔ)言 | 23 |
| +I | 1013 | slene | 0 |
| +I | 1014 | testing | 0 |
| +I | 1015 | testing | 18 |
| +I | 1016 | astaxie | 19 |
| +I | 1017 | alan | 18 |
| +I | 1018 | chan | 19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows
Flink SQL> select * from alan_catalog.cdhhive.version;
+----+----------------------+--------------------------------+--------------------------------+
| op | VER_ID | SCHEMA_VERSION | VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I | 1 | 2.1.1 | Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row
Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.
Flink SQL> select * from cdhhive.version;
+----+----------------------+--------------------------------+--------------------------------+
| op | VER_ID | SCHEMA_VERSION | VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I | 1 | 2.1.1 | Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row
Flink SQL> use alan_catalog.cdhhive;
[INFO] Execute statement succeed.
Flink SQL> select * from version;
+----+----------------------+--------------------------------+--------------------------------+
| op | VER_ID | SCHEMA_VERSION | VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I | 1 | 2.1.1 | Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row
6、數(shù)據(jù)類型映射
Flink 支持連接到多個(gè)使用方言(dialect)的數(shù)據(jù)庫(kù),如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于測(cè)試目的。下表列出了從關(guān)系數(shù)據(jù)庫(kù)數(shù)據(jù)類型到 Flink SQL 數(shù)據(jù)類型的類型映射,映射表可以使得在 Flink 中定義 JDBC 表更加簡(jiǎn)單。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-694412.html
以上,簡(jiǎn)單的介紹了flink sql讀取外部系統(tǒng)的jdbc示例。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-694412.html
到了這里,關(guān)于16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!