国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)

這篇具有很好參考價(jià)值的文章主要介紹了16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫(kù)、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開(kāi)發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說(shuō)明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過(guò)鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文簡(jiǎn)單的介紹了flink sql讀取外部系統(tǒng)的jdbc示例(每個(gè)示例均是驗(yàn)證通過(guò)的,并且具體給出了運(yùn)行環(huán)境的版本)。
本文依賴環(huán)境是hadoop、kafka、mysql環(huán)境好用,如果是ha環(huán)境則需要zookeeper的環(huán)境。

一、Table & SQL Connectors 示例:JDBC

1、maven依賴(java編碼依賴)

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>3.1.0-1.17</version>
</dependency>


在連接到具體數(shù)據(jù)庫(kù)時(shí),也需要對(duì)應(yīng)的驅(qū)動(dòng)依賴,目前支持的驅(qū)動(dòng)如下:
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4),# Flink專欄,flink,sql,大數(shù)據(jù),flink sql,flink jdbc,flink 流批一體化,flink connector
驅(qū)動(dòng)jar需放在flink的安裝目錄lib下,且需要重啟服務(wù)。
本示例jar包有

flink-connector-jdbc_2.11-1.13.6.jar
mysql-connector-java-5.1.5.jar 或
mysql-connector-java-6.0.6.jar(1.17版本中使用的mysql驅(qū)動(dòng),用上面mysql驅(qū)動(dòng)有異常信息)

2、創(chuàng)建 JDBC 表

JDBC table 可以按如下定義,以下示例中包含創(chuàng)建表、批量插入以及l(fā)eft join的維表。

1)、創(chuàng)建jdbc表,并插入、查詢

-- 在 Flink SQL 中注冊(cè)一張 MySQL 表 'users'
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);

-------------------具體事例----------------------------------
-- 在 Flink SQL 中注冊(cè)一張 MySQL 表 'user'
CREATE TABLE Alan_JDBC_User_Table (
  id BIGINT,
  name STRING,
  age INT,
  balance DOUBLE,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.10.44:3306/test',
   'table-name' = 'user'
);

-- mysql中的數(shù)據(jù)
mysql> select * from user;
+----+-------------+------+---------+-----------------------+------------+
| id | name        | age  | balance | email                 | pwd        |
+----+-------------+------+---------+-----------------------+------------+
|  1 | aa6         |   61 |   60000 | 6@163.com             | 123456     |
|  2 | aa4         |   71 |   70000 | 7@163.com             | 7123       |
|  4 | test        | NULL |    NULL | NULL                  | NULL       |
|  5 | test2       | NULL |    NULL | NULL                  | NULL       |
|  7 | alanchanchn |   19 |     800 | alan.chan.chn@163.com | vx         |
|  8 | alanchan    |   19 |     800 | alan.chan.chn@163.com | sink mysql |
+----+-------------+------+---------+-----------------------+------------+
6 rows in set (0.00 sec)

---------在flink sql中建表并查詢--------
Flink SQL> CREATE TABLE Alan_JDBC_User_Table (
>   id BIGINT,
>   name STRING,
>   age INT,
>   balance DOUBLE,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://192.168.10.44:3306/test',
>    'table-name' = 'user'
> );
[INFO] Execute statement succeed.

Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op |                   id |                           name |         age |                        balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                            aa6 |          61 |                        60000.0 |
| +I |                    2 |                            aa4 |          71 |                        70000.0 |
| +I |                    4 |                           test |      (NULL) |                         (NULL) |
| +I |                    5 |                          test2 |      (NULL) |                         (NULL) |
| +I |                    7 |                    alanchanchn |          19 |                          800.0 |
| +I |                    8 |                       alanchan |          19 |                          800.0 |
+----+----------------------+--------------------------------+-------------+--------------------------------+
Received a total of 6 rows

2)、批量插入表數(shù)據(jù)

-- 從另一張表 "T" 將數(shù)據(jù)寫入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;

---------創(chuàng)建數(shù)據(jù)表----------------------
CREATE TABLE source_table (
 userId INT,
 age INT,
 balance DOUBLE,
 userName STRING,
 t_insert_time AS localtimestamp,
 WATERMARK FOR t_insert_time AS t_insert_time
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.userId.kind'='sequence',
 'fields.userId.start'='1',
 'fields.userId.end'='5000',

 'fields.balance.kind'='random',
 'fields.balance.min'='1',
 'fields.balance.max'='100',

 'fields.age.min'='1',
 'fields.age.max'='1000',

 'fields.userName.length'='10'
);

-- 從另一張表 "source_table" 將數(shù)據(jù)寫入到 JDBC 表中
INSERT INTO Alan_JDBC_User_Table
SELECT userId,  userName, age, balance FROM source_table;

-- 查看 JDBC 表中的數(shù)據(jù)
select * from Alan_JDBC_User_Table;

---------------flink sql中查詢----------------------------------
Flink SQL> INSERT INTO Alan_JDBC_User_Table
> SELECT userId,  userName, age, balance FROM source_table;
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e91cd3c41ac20aaf8eab79f0094f9e46


Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op |                   id |                           name |         age |                        balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                     ead5352794 |         513 |                            4.0 |
| +I |                    2 |                     728297a8d9 |         410 |                           35.0 |
| +I |                    3 |                     643c2226cd |         142 |                           80.0 |
......
-------------驗(yàn)證mysql中的數(shù)據(jù)是否寫入,此處只查總數(shù)----------------
mysql> select count(*) from user;
+----------+
| count(*) |
+----------+
|     2005 |
+----------+
1 row in set (0.00 sec)

3)、JDBC 表在時(shí)態(tài)表關(guān)聯(lián)中作為維表

-- 1、創(chuàng)建 JDBC 表在時(shí)態(tài)表關(guān)聯(lián)中作為維表
CREATE TABLE Alan_JDBC_User_Table (
  id BIGINT,
  name STRING,
  age INT,
  balance DOUBLE,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.10.44:3306/test',
   'table-name' = 'user'
);
-----2、查詢表中的數(shù)據(jù)(實(shí)際數(shù)據(jù)是之前測(cè)試的結(jié)果)   -----
Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op |                   id |                           name |         age |                        balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                     ead5352794 |         513 |                            4.0 |
| +I |                    2 |                     728297a8d9 |         410 |                           35.0 |
| +I |                    3 |                     643c2226cd |         142 |                           80.0 |
| +I |                    4 |                     6115f11f01 |         633 |                           69.0 |
| +I |                    5 |                     044ba5fa2f |          74 |                           71.0 |
| +I |                    6 |                     98a112dc87 |         729 |                           54.0 |
| +I |                    7 |                     705326a369 |         846 |                           99.0 |
| +I |                    8 |                     532692924f |         872 |                           79.0 |
| +I |                    9 |                     b816802948 |         475 |                           67.0 |
| +I |                   10 |                     06906bebb2 |         109 |                           57.0 |
......

-----3、創(chuàng)建事實(shí)表,以kafka表作為代表   -----
CREATE TABLE Alan_KafkaTable_3 (
    user_id BIGINT, -- 用戶id
    item_id BIGINT, -- 商品id
    action STRING,  -- 用戶行為
    ts     BIGINT,  -- 用戶行為發(fā)生的時(shí)間戳
    proctime as PROCTIME(),   -- 通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件時(shí)間
    WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND  -- 在eventTime上定義watermark
) WITH (
  'connector' = 'kafka',
  'topic' = 'testtopic',
  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

-----4、發(fā)送kafka消息,同時(shí)觀察事實(shí)表中的數(shù)據(jù)   -----
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic testtopic
>1,1001,"login",1692593500222
>2,1002,"p_read",1692593502242
>

Flink SQL> select * from Alan_KafkaTable_3;

+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op |              user_id |              item_id |                         action |                   ts |                proctime |              event_time |
+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I |                    1 |                 1001 |                          login |        1692593500222 | 2023-08-22 05:33:38.830 | 2023-08-22 05:39:54.439 |
| +I |                    2 |                 1002 |                         p_read |        1692593502242 | 2023-08-22 05:33:38.833 | 2023-08-22 05:40:41.284 |
Query terminated, received a total of 2 rows



-----5、以jdbc的維表進(jìn)行關(guān)聯(lián)查詢事實(shí)表數(shù)據(jù)-----
SELECT
  kafkamessage.user_id, 
  kafkamessage.item_id,
  kafkamessage.action,  
  jdbc_dim_table.name,
  jdbc_dim_table.age,
  jdbc_dim_table.balance
FROM Alan_KafkaTable_3 AS kafkamessage 
LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;

Flink SQL> SELECT
>   kafkamessage.user_id, 
>   kafkamessage.item_id,
>   kafkamessage.action,  
>   jdbc_dim_table.name,
>   jdbc_dim_table.age,
>   jdbc_dim_table.balance
> FROM Alan_KafkaTable_3 AS kafkamessage 
> LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;

+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| op |              user_id |              item_id |                         action |                           name |         age |                        balance |
+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                 1001 |                          login |                     ead5352794 |         513 |                            4.0 |
| +I |                    2 |                 1002 |                         p_read |                     728297a8d9 |         410 |                           35.0 |


  • java
    該部分示例僅僅是以java實(shí)現(xiàn)創(chuàng)建表及查詢,簡(jiǎn)單示例。
// 注冊(cè)名為 “jdbcOutputTable” 的JDBC表
		String sinkDDL = "create table jdbcOutputTable (" + 
									 "id bigint not null, " + 
									 "name varchar(20) , " + 
									 "age int ,"+
									 "balance bigint,"+
									 "pwd varchar(20),"+
									 "email varchar(20) , PRIMARY KEY (id) NOT ENFORCED" +
									 ") with (" + 
									 " 'connector.type' = 'jdbc', "				+ 
									 " 'connector.url' = 'jdbc:mysql://192.168.10.44:3306/test', " + 
									 " 'connector.table' = 'user', " + 
									 " 'connector.driver' = 'com.mysql.jdbc.Driver', "				+ 
									 " 'connector.username' = 'root', " + 
									 " 'connector.password' = '123456' )";

		tenv.executeSql(sinkDDL);
		
		String sql = "SELECT *  FROM jdbcOutputTable ";
		String sql2 = "SELECT *  FROM jdbcOutputTable  where name like '%alan%'";
		Table table = tenv.sqlQuery(sql2);
		table.printSchema();
		DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
		result.print();
		env.execute();

//運(yùn)行結(jié)果
(
  `id` BIGINT NOT NULL,
  `name` VARCHAR(20),
  `age` INT,
  `balance` BIGINT,
  `pwd` VARCHAR(20),
  `email` VARCHAR(20)
)


15> (true,+I[7, alanchanchn, 19, 800, vx, alan.chan.chn@163.com])
15> (true,+I[8, alanchan, 19, 800, sink mysql, alan.chan.chn@163.com])

3、連接器參數(shù)

16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4),# Flink專欄,flink,sql,大數(shù)據(jù),flink sql,flink jdbc,flink 流批一體化,flink connector
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4),# Flink專欄,flink,sql,大數(shù)據(jù),flink sql,flink jdbc,flink 流批一體化,flink connector

4、已棄用的配置

這些棄用配置已經(jīng)被上述的新配置代替,而且最終會(huì)被棄用。請(qǐng)優(yōu)先考慮使用新配置。
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4),# Flink專欄,flink,sql,大數(shù)據(jù),flink sql,flink jdbc,flink 流批一體化,flink connector

5、特性

1)、鍵處理

當(dāng)寫入數(shù)據(jù)到外部數(shù)據(jù)庫(kù)時(shí),F(xiàn)link 會(huì)使用 DDL 中定義的主鍵。如果定義了主鍵,則連接器將以 upsert 模式工作,否則連接器將以 append 模式工作。

在 upsert 模式下,F(xiàn)link 將根據(jù)主鍵判斷插入新行或者更新已存在的行,這種方式可以確保冪等性。為了確保輸出結(jié)果是符合預(yù)期的,推薦為表定義主鍵并且確保主鍵是底層數(shù)據(jù)庫(kù)中表的唯一鍵或主鍵。在 append 模式下,F(xiàn)link 會(huì)把所有記錄解釋為 INSERT 消息,如果違反了底層數(shù)據(jù)庫(kù)中主鍵或者唯一約束,INSERT 插入可能會(huì)失敗。

有關(guān) PRIMARY KEY 語(yǔ)法的更多詳細(xì)信息,請(qǐng)參見(jiàn) 22、Flink 的table api與sql之創(chuàng)建表的DDL。

2)、分區(qū)掃描

為了在并行 Source task 實(shí)例中加速讀取數(shù)據(jù),F(xiàn)link 為 JDBC table 提供了分區(qū)掃描的特性。

如果下述分區(qū)掃描參數(shù)中的任一項(xiàng)被指定,則下述所有的分區(qū)掃描參數(shù)必須都被指定。這些參數(shù)描述了在多個(gè) task 并行讀取數(shù)據(jù)時(shí)如何對(duì)表進(jìn)行分區(qū)。 scan.partition.column 必須是相關(guān)表中的數(shù)字、日期或時(shí)間戳列。

scan.partition.lower-bound 和 scan.partition.upper-bound 用于決定分區(qū)的起始位置和過(guò)濾表中的數(shù)據(jù)。如果是批處理作業(yè),也可以在提交 flink 作業(yè)之前獲取最大值和最小值。

  • scan.partition.column:輸入用于進(jìn)行分區(qū)的列名。
  • scan.partition.num:分區(qū)數(shù)。
  • scan.partition.lower-bound:第一個(gè)分區(qū)的最小值。
  • scan.partition.upper-bound:最后一個(gè)分區(qū)的最大值。

3)、Lookup Cache

JDBC 連接器可以用在時(shí)態(tài)表關(guān)聯(lián)中作為一個(gè)可 lookup 的 source (又稱為維表),當(dāng)前只支持同步的查找模式。

默認(rèn)情況下,lookup cache 是未啟用的,你可以將 lookup.cache 設(shè)置為 PARTIAL 參數(shù)來(lái)啟用。

lookup cache 的主要目的是用于提高時(shí)態(tài)表關(guān)聯(lián) JDBC 連接器的性能。
默認(rèn)情況下,lookup cache 不開(kāi)啟,所以所有請(qǐng)求都會(huì)發(fā)送到外部數(shù)據(jù)庫(kù)。
當(dāng) lookup cache 被啟用時(shí),每個(gè)進(jìn)程(即 TaskManager)將維護(hù)一個(gè)緩存。Flink 將優(yōu)先查找緩存,只有當(dāng)緩存未查找到時(shí)才向外部數(shù)據(jù)庫(kù)發(fā)送請(qǐng)求,并使用返回的數(shù)據(jù)更新緩存。 當(dāng)緩存命中最大緩存行 lookup.partial-cache.max-rows 或當(dāng)行超過(guò) lookup.partial-cache.expire-after-write 或 lookup.partial-cache.expire-after-access 指定的最大存活時(shí)間時(shí),緩存中的行將被設(shè)置為已過(guò)期。 緩存中的記錄可能不是最新的,用戶可以將緩存記錄超時(shí)設(shè)置為一個(gè)更小的值以獲得更好的刷新數(shù)據(jù),但這可能會(huì)增加發(fā)送到數(shù)據(jù)庫(kù)的請(qǐng)求數(shù)。

所以要做好吞吐量和正確性之間的平衡。

默認(rèn)情況下,flink 會(huì)緩存主鍵的空查詢結(jié)果,你可以通過(guò)將 lookup.partial-cache.cache-missing-key 設(shè)置為 false 來(lái)切換行為。

4)、冪等寫入

如果在 DDL 中定義了主鍵,JDBC sink 將使用 upsert 語(yǔ)義而不是普通的 INSERT 語(yǔ)句。upsert 語(yǔ)義指的是如果底層數(shù)據(jù)庫(kù)中存在違反唯一性約束,則原子地添加新行或更新現(xiàn)有行,這種方式確保了冪等性。

如果出現(xiàn)故障,F(xiàn)link 作業(yè)會(huì)從上次成功的 checkpoint 恢復(fù)并重新處理,這可能導(dǎo)致在恢復(fù)過(guò)程中重復(fù)處理消息。強(qiáng)烈推薦使用 upsert 模式,因?yàn)槿绻枰貜?fù)處理記錄,它有助于避免違反數(shù)據(jù)庫(kù)主鍵約束和產(chǎn)生重復(fù)數(shù)據(jù)。

除了故障恢復(fù)場(chǎng)景外,數(shù)據(jù)源(kafka topic)也可能隨著時(shí)間的推移自然地包含多個(gè)具有相同主鍵的記錄,這使得 upsert 模式是用戶期待的。

由于 upsert 沒(méi)有標(biāo)準(zhǔn)的語(yǔ)法,因此下表描述了不同數(shù)據(jù)庫(kù)的 DML 語(yǔ)法:
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4),# Flink專欄,flink,sql,大數(shù)據(jù),flink sql,flink jdbc,flink 流批一體化,flink connector

5、JDBC Catalog

JdbcCatalog 允許用戶通過(guò) JDBC 協(xié)議將 Flink 連接到關(guān)系數(shù)據(jù)庫(kù)。

目前,JDBC Catalog 有兩個(gè)實(shí)現(xiàn),即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。

// Postgres Catalog & MySQL Catalog 支持的方法
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);
其他的 Catalog 方法現(xiàn)在尚不支持。

1)、JDBC Catalog 的使用

本小節(jié)主要描述如果創(chuàng)建并使用 Postgres Catalog 或 MySQL Catalog。
本處描述的版本是flink 1.17,flink1.13版本只支持postgresql,在1.13版本中執(zhí)行會(huì)出現(xiàn)如下異常:

[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Catalog for 'org.apache.flink.connector.jdbc.dialect.MySQLDialect@1bc49bc5' is not supported yet.

JDBC catalog 支持以下參數(shù):

  • name:必填,catalog 的名稱。

  • default-database:必填,默認(rèn)要連接的數(shù)據(jù)庫(kù)。

  • username:必填,Postgres/MySQL 賬戶的用戶名。

  • password:必填,賬戶的密碼。

  • base-url:必填,(不應(yīng)該包含數(shù)據(jù)庫(kù)名)
    對(duì)于 Postgres Catalog base-url 應(yīng)為 “jdbc:postgresql://:” 的格式。
    對(duì)于 MySQL Catalog base-url 應(yīng)為 “jdbc:mysql://:” 的格式。

  • sql

---需要將mysql-connector-java-6.0.6.jar、flink-connector-jdbc-3.1.0-1.17.jar放在flink的lib目錄,并重啟flink集群
CREATE CATALOG alan_catalog WITH(
    'type' = 'jdbc',
    'default-database' = 'test',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://192.168.10.44:3306'
);


USE CATALOG alan_catalog;
---------------------------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
>     'type' = 'jdbc',
>     'default-database' = 'test?useSSL=false',
>     'username' = 'root',
>     'password' = '123456',
>     'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.


Flink SQL> show CATALOGS;
+-----------------+
|    catalog name |
+-----------------+
|    alan_catalog |
| default_catalog |
+-----------------+
2 rows in set

Flink SQL> use CATALOG alan_catalog;
[INFO] Execute statement succeed.

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "my_catalog";
String defaultDatabase = "mydb";
String username        = "...";
String password        = "...";
String baseUrl         = "..."

JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("my_catalog", catalog);

// 設(shè)置 JdbcCatalog 為會(huì)話的當(dāng)前 catalog
tableEnv.useCatalog("my_catalog");
  • yaml
execution:
    ...
    current-catalog: alan_catalog  # 設(shè)置目標(biāo) JdbcCatalog 為會(huì)話的當(dāng)前 catalog
    current-database: test

catalogs:
   - name:alan_catalog
     type: jdbc
     default-database: test
     username: ...
     password: ...
     base-url: ...

2)、JDBC Catalog for PostgreSQL

  • PostgreSQL 元空間映射
    除了數(shù)據(jù)庫(kù)之外,postgreSQL 還有一個(gè)額外的命名空間 schema。一個(gè) Postgres 實(shí)例可以擁有多個(gè)數(shù)據(jù)庫(kù),每個(gè)數(shù)據(jù)庫(kù)可以擁有多個(gè) schema,其中一個(gè) schema 默認(rèn)名為 “public”,每個(gè) schema 可以包含多張表。 在 Flink 中,當(dāng)查詢由 Postgres catalog 注冊(cè)的表時(shí),用戶可以使用 schema_name.table_name 或只有 table_name,其中 schema_name 是可選的,默認(rèn)值為 “public”。

因此,F(xiàn)link Catalog 和 Postgres 之間的元空間映射如下:
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4),# Flink專欄,flink,sql,大數(shù)據(jù),flink sql,flink jdbc,flink 流批一體化,flink connector
Flink 中的 Postgres 表的完整路徑應(yīng)該是 “..<schema.table>”。如果指定了 schema,請(qǐng)注意需要轉(zhuǎn)義 <schema.table>。

這里提供了一些訪問(wèn) Postgres 表的例子:

-- 掃描 'public' schema(即默認(rèn) schema)中的 'test_table' 表,schema 名稱可以省略
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 掃描 'custom_schema' schema 中的 'test_table2' 表,
-- 自定義 schema 不能省略,并且必須與表一起轉(zhuǎn)義。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

3)、JDBC Catalog for MySQL

  • MySQL 元空間映射
    MySQL 實(shí)例中的數(shù)據(jù)庫(kù)與 MySQL Catalog 注冊(cè)的 catalog 下的數(shù)據(jù)庫(kù)處于同一個(gè)映射層級(jí)。一個(gè) MySQL 實(shí)例可以擁有多個(gè)數(shù)據(jù)庫(kù),每個(gè)數(shù)據(jù)庫(kù)可以包含多張表。 在 Flink 中,當(dāng)查詢由 MySQL catalog 注冊(cè)的表時(shí),用戶可以使用 database.table_name 或只使用 table_name,其中 database 是可選的,默認(rèn)值為創(chuàng)建 MySQL Catalog 時(shí)指定的默認(rèn)數(shù)據(jù)庫(kù)。

因此,F(xiàn)link Catalog 和 MySQL catalog 之間的元空間映射如下:
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4),# Flink專欄,flink,sql,大數(shù)據(jù),flink sql,flink jdbc,flink 流批一體化,flink connector
Flink 中的 MySQL 表的完整路徑應(yīng)該是 “<catalog>.<db>.<table>”。

這里提供了一些訪問(wèn) MySQL 表的例子(在版本1.17中完成):

-- 掃描 默認(rèn)數(shù)據(jù)庫(kù)(test)中的 'person' 表
select * from alan_catalog.test.person;
select * from test.person;
select * from person;

-- 掃描 'cdhhive' 數(shù)據(jù)庫(kù)中的 'version' 表,
select * from alan_catalog.cdhhive.version;
select * from cdhhive.version;
select * from version;

---------------具體操作詳見(jiàn)下文------------------
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.

Flink SQL> CREATE CATALOG alan_catalog WITH(
>     'type' = 'jdbc',
>     'default-database' = 'test?useSSL=false',
>     'username' = 'root',
>     'password' = '123456',
>     'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
|    alan_catalog |
| default_catalog |
+-----------------+
2 rows in set

Flink SQL> select * from alan_catalog.test.person;

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 測(cè)試修改go語(yǔ)言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再試一試 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     測(cè)試go語(yǔ)言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows

Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.

Flink SQL> select * from test.person;

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 測(cè)試修改go語(yǔ)言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再試一試 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     測(cè)試go語(yǔ)言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows

Flink SQL> use alan_catalog.test;
[INFO] Execute statement succeed.

Flink SQL> select * from person;

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 測(cè)試修改go語(yǔ)言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再試一試 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     測(cè)試go語(yǔ)言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows



Flink SQL> select * from alan_catalog.cdhhive.version;

+----+----------------------+--------------------------------+--------------------------------+
| op |               VER_ID |                 SCHEMA_VERSION |                VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          2.1.1 |     Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row

Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.

Flink SQL> select * from cdhhive.version;

+----+----------------------+--------------------------------+--------------------------------+
| op |               VER_ID |                 SCHEMA_VERSION |                VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          2.1.1 |     Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row

Flink SQL> use alan_catalog.cdhhive;
[INFO] Execute statement succeed.

Flink SQL> select * from version;

+----+----------------------+--------------------------------+--------------------------------+
| op |               VER_ID |                 SCHEMA_VERSION |                VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          2.1.1 |     Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row

6、數(shù)據(jù)類型映射

Flink 支持連接到多個(gè)使用方言(dialect)的數(shù)據(jù)庫(kù),如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于測(cè)試目的。下表列出了從關(guān)系數(shù)據(jù)庫(kù)數(shù)據(jù)類型到 Flink SQL 數(shù)據(jù)類型的類型映射,映射表可以使得在 Flink 中定義 JDBC 表更加簡(jiǎn)單。

16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4),# Flink專欄,flink,sql,大數(shù)據(jù),flink sql,flink jdbc,flink 流批一體化,flink connector
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4),# Flink專欄,flink,sql,大數(shù)據(jù),flink sql,flink jdbc,flink 流批一體化,flink connector
以上,簡(jiǎn)單的介紹了flink sql讀取外部系統(tǒng)的jdbc示例。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-694412.html

到了這里,關(guān)于16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink sql】kafka連接器

    Kafka 連接器提供從 Kafka topic 中消費(fèi)和寫入數(shù)據(jù)的能力。 前面已經(jīng)介紹了flink sql創(chuàng)建表的語(yǔ)法及說(shuō)明:【flink sql】創(chuàng)建表 這篇博客聊聊怎么通過(guò)flink sql連接kafka 以下的連接器元數(shù)據(jù)可以在表定義中通過(guò)元數(shù)據(jù)列的形式獲取。 R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫的(

    2024年02月08日
    瀏覽(22)
  • Flink系列之:Elasticsearch SQL 連接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 連接器允許將數(shù)據(jù)寫入到 Elasticsearch 引擎的索引中。本文檔描述運(yùn)行 SQL 查詢時(shí)如何設(shè)置 Elasticsearch 連接器。 連接器可以工作在 upsert 模式,使用 DDL 中定義的主鍵與外部系統(tǒng)交換 UPDATE/DELETE 消息。 如果 DDL 中沒(méi)有定義主鍵,那么

    2024年02月04日
    瀏覽(22)
  • Flink系列之:JDBC SQL 連接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 連接器允許使用 JDBC 驅(qū)動(dòng)向任意類型的關(guān)系型數(shù)據(jù)庫(kù)讀取或者寫入數(shù)據(jù)。本文檔描述了針對(duì)關(guān)系型數(shù)據(jù)庫(kù)如何通過(guò)建立 JDBC 連接器來(lái)執(zhí)行 SQL 查詢。 如果在 DDL 中定義了主鍵,JDBC sink 將以 upsert 模式與外

    2024年02月02日
    瀏覽(24)
  • Flink系列之:Apache Kafka SQL 連接器

    Scan Source: Unbounded Sink: Streaming Append Mode Kafka 連接器提供從 Kafka topic 中消費(fèi)和寫入數(shù)據(jù)的能力。 以下示例展示了如何創(chuàng)建 Kafka 表: 以下的連接器元數(shù)據(jù)可以在表定義中通過(guò)元數(shù)據(jù)列的形式獲取。 R/W 列定義了一個(gè)元數(shù)據(jù)是可讀的(R)還是可寫的(W)。 只讀列必須聲明為 VI

    2024年02月01日
    瀏覽(29)
  • Flink系列之:Upsert Kafka SQL 連接器

    Scan Source: Unbounded 、 Sink: Streaming Upsert Mode Upsert Kafka 連接器支持以 upsert 方式從 Kafka topic 中讀取數(shù)據(jù)并將數(shù)據(jù)寫入 Kafka topic。 作為 source,upsert-kafka 連接器生產(chǎn) changelog 流,其中每條數(shù)據(jù)記錄代表一個(gè)更新或刪除事件。更準(zhǔn)確地說(shuō),數(shù)據(jù)記錄中的 value 被解釋為同一 key 的最后一

    2024年01月16日
    瀏覽(26)
  • 【Flink實(shí)戰(zhàn)】Flink hint更靈活、更細(xì)粒度的設(shè)置Flink sql行為與簡(jiǎn)化hive連接器參數(shù)設(shè)置

    SQL 提示(SQL Hints)是和 SQL 語(yǔ)句一起使用來(lái)改變執(zhí)行計(jì)劃的。本章介紹如何使用 SQL 提示來(lái)實(shí)現(xiàn)各種干預(yù)。 SQL 提示一般可以用于以下: 增強(qiáng) planner:沒(méi)有完美的 planner, SQL 提示讓用戶更好地控制執(zhí)行; 增加元數(shù)據(jù)(或者統(tǒng)計(jì)信息):如\\\"已掃描的表索引\\\"和\\\"一些混洗鍵(shu

    2024年04月25日
    瀏覽(25)
  • Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)

    Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)

    Flink提供了基于JDBC的方式,可以將讀取到的數(shù)據(jù)寫入到MySQL中;本文通過(guò)兩種方式將數(shù)據(jù)下入到MySQL數(shù)據(jù)庫(kù),其他的基于JDBC的數(shù)據(jù)庫(kù)類似,另外,Table API方式的Catalog指定為Hive Catalog方式,持久化DDL操作。 另外,JDBC 連接器允許使用 JDBC 驅(qū)動(dòng)程序從任何關(guān)系數(shù)據(jù)庫(kù)讀取數(shù)據(jù)并將

    2023年04月09日
    瀏覽(33)
  • flink-sql讀寫hive-1.16

    本文檔內(nèi)容基于 flink-1.16.x ,其他版本的整理,請(qǐng)查看本人博客的 flink 專欄其他文章。 Apache Hive 已經(jīng)成為了數(shù)據(jù)倉(cāng)庫(kù)生態(tài)系統(tǒng)中的核心。它不僅僅是一個(gè)用于大數(shù)據(jù)分析和ETL場(chǎng)景的SQL引擎,同樣也是一個(gè)數(shù)據(jù)管理平臺(tái),可用于發(fā)現(xiàn),定義,和演化數(shù)據(jù)。 Flink 與 Hive 的集成包

    2024年02月16日
    瀏覽(33)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通過(guò)Table API和SQL創(chuàng)建表

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(24)
  • Flink Table API 與 SQL 編程整理

    Flink Table API 與 SQL 編程整理

    Flink API 總共分為 4 層這里主要整理 Table API 的使用 Table API 是流處理和批處理通用的關(guān)系型 API , Table API 可以基于流輸入或者批輸入來(lái)運(yùn)行而不需要進(jìn)行任何修改。 Table API 是 SQL 語(yǔ)言的超集并專門為 Apache Flink 設(shè)計(jì)的, Table API 是 Scala 和 Java 語(yǔ)言集成式的 API 。與常規(guī) SQL 語(yǔ)言

    2024年02月04日
    瀏覽(26)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包