Flink 系列文章
一、Flink 專欄
Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關基礎內(nèi)容。 -
2、Flink基礎系列
本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 -
3、Flik Table API和SQL基礎系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。 -
4、Flik Table API和SQL提高與應用系列
本部分是table api 和sql的應用部分,和實際的生產(chǎn)應用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。 -
5、Flink 監(jiān)控系列
本部分和實際的運維、監(jiān)控工作相關。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。
兩專欄的所有文章入口點擊:Flink 系列文章匯總索引
本文介紹了flink cli的啟動、使用以及通過創(chuàng)建kafka、filesystem等例子介紹了配置文件的使用,同時也簡單的介紹了視圖、臨時表等內(nèi)容。
本文依賴環(huán)境是flink、kafka環(huán)境可用,flink的版本是1.13.5、jdk8.
一、SQL客戶端
Flink 的 Table & SQL API 可以處理 SQL 語言編寫的查詢語句,但是這些查詢需要嵌入用 Java 或 Scala 編寫的表程序中。此外,這些程序在提交到集群前需要用構建工具打包。這或多或少限制了 Java/Scala 程序員對 Flink 的使用。
SQL 客戶端 的目的是提供一種簡單的方式來編寫、調(diào)試和提交表程序到 Flink 集群上,而無需寫一行 Java 或 Scala 代碼。SQL 客戶端命令行界面(CLI) 能夠在命令行中檢索和可視化分布式應用中實時產(chǎn)生的結果。
部署請參考:1、Flink1.12.7或1.13.5詳細介紹及本地安裝部署、驗證 和 2、Flink1.13.5二種部署方式(Standalone、Standalone HA )、四種提交任務方式(前兩種及session和per-job)驗證詳細步驟
1、啟動 SQL 客戶端命令行界面
SQL Client 腳本也位于 Flink 的 bin 目錄中。將來,用戶可以通過啟動嵌入式 standalone 進程或通過連接到遠程 SQL 客戶端網(wǎng)關來啟動 SQL 客戶端命令行界面。目前僅支持 embedded 模式??梢酝ㄟ^以下方式啟動 CLI:
#啟動flink sql客戶端
./bin/sql-client.sh embedded -d 配置文件
#或
./bin/sql-client.sh embedded
#或
./bin/sql-client.sh
#注意:需要使用部署flink集群的用戶啟動
[alanchan@server1 bin]$ sql-client.sh embedded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/flink-1.13.5/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
No default environment specified.
Searching for '/usr/local/flink-1.13.5/conf/sql-client-defaults.yaml'...not found.
Command history file path: /home/alanchan/.flink-sql-history
?▓██▓██?
▓████??█▓?▓███▓?
▓███▓?? ???▓██? ?
?██? ??▓▓█▓▓?? ?████
██? ??▓███? ?█?█?
?▓█ ███ ▓??██
▓█ ?????▓██▓???▓▓█
█? █ ??? ███▓▓█ ?█???
████? ?▓█▓ ██??? ▓███?
??█▓▓██ ▓█? ▓█?▓██▓ ?█?
▓??▓████? ██ ?█ █▓??█???█?
███▓?██▓ ▓█ █ █▓ ?▓█▓▓█?
?██▓ ?█? █ █? ?█████▓? ██▓??
███? ? █? ▓ ?█ █████??? ?█?▓ ▓?
██▓█ ??▓? ▓███████▓? ?█? ?▓ ▓██▓
?██▓ ▓█ █▓█ ??█████▓▓?? ██?? █ ? ▓█?
▓█▓ ▓█ ██▓ ?▓▓▓▓▓▓▓? ?██▓ ?█?
▓█ █ ▓███▓?? ?▓▓▓███▓ ??? ▓█
██▓ ██? ??▓▓███▓▓▓▓▓██████▓? ▓███ █
▓███? ███ ?▓▓??? ?▓████▓? ??▓? █▓
█▓??▓▓██ ??????????▓██▓? █▓
██ ▓??█ ▓▓▓▓??? ?█▓ ?▓▓██▓ ▓? ??▓
▓█▓ ▓?█ █▓? ??▓▓██? ?▓█? ??????▓█████?
██? ▓█?█? ?▓▓? ▓█ █? ???? ?█?
▓█ ?█▓ ? █? ?█ █▓
█▓ ██ █? ▓▓ ?█▓▓▓?█?
█▓ ?▓██? ▓? ▓█▓?????▓█? ?█
██ ▓█▓? ? ??█?██? ▓▓
▓█? ?█▓?? ?? █?█▓?????██
?██? ?▓▓? ▓██▓?█? ?▓▓▓▓?█▓
?▓██? ▓? ?█▓█ ?????
?▓▓▓▓▓?????????????????????????▓▓ ▓??█?
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL> select 1;
默認情況下,SQL 客戶端將從 ./conf/sql-client-defaults.yaml 中讀取配置。有關環(huán)境配置文件結構的更多信息,請參見本文配置部分。
2、執(zhí)行 SQL 查詢
命令行界面啟動后,你可以使用 HELP 命令列出所有可用的 SQL 語句。輸入第一條 SQL 查詢語句并按 Enter 鍵執(zhí)行,可以驗證你的設置及集群連接是否正確:
Flink SQL> show databases;
+------------------+
| database name |
+------------------+
| default_database |
+------------------+
1 row in set
Flink SQL> use default_database;
[INFO] Execute statement succeed.
Flink SQL> show tables;
Empty set
Flink SQL> SELECT 'Hello World';
該查詢不需要 table source,并且只產(chǎn)生一行結果。CLI 將從集群中檢索結果并將其可視化。按 Q 鍵退出結果視圖。
CLI 為維護和可視化結果提供三種模式。
- 表格模式(table mode)在內(nèi)存中實體化結果,并將結果用規(guī)則的分頁表格可視化展示出來。執(zhí)行如下命令啟用:
SET execution.result-mode=table;
Flink SQL> SET execution.result-mode=table;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.
- 變更日志模式(changelog mode)不會實體化和可視化結果,而是由插入(+)和撤銷(-)組成的持續(xù)查詢產(chǎn)生結果流。
SET execution.result-mode=changelog;
Flink SQL> SET execution.result-mode=changelog;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.
- Tableau模式(tableau mode)更接近傳統(tǒng)的數(shù)據(jù)庫,會將執(zhí)行的結果以制表的形式直接打在屏幕之上。具體顯示的內(nèi)容會取決于作業(yè) 執(zhí)行模式的不同(execution.type):
SET execution.result-mode=tableau;
Flink SQL> SET execution.result-mode=tableau;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.
注意當你使用這個模式運行一個流式查詢的時候,F(xiàn)link 會將結果持續(xù)的打印在當前的屏幕之上。如果這個流式查詢的輸入是有限的數(shù)據(jù)集, 那么Flink在處理完所有的數(shù)據(jù)之后,會自動的停止作業(yè),同時屏幕上的打印也會相應的停止。如果你想提前結束這個查詢,那么可以直接使用 CTRL-C 按鍵,這個會停掉作業(yè)同時停止屏幕上的打印。
你可以用如下查詢來查看三種結果模式的運行情況:
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
此查詢執(zhí)行一個有限字數(shù)示例:
變更日志模式 下,看到的結果應該類似于:
表格模式 下,可視化結果表將不斷更新,直到表程序以如下內(nèi)容結束:
Tableau模式 下,如果這個查詢以流的方式執(zhí)行,那么將顯示以下內(nèi)容:
如果這個查詢以批的方式執(zhí)行,顯示的內(nèi)容如下:
±------±----+
| name | cnt |
±------±----+
| Alice | 1 |
| Bob | 2 |
| Greg | 1 |
±------±----+
3 rows in set
這幾種結果模式在 SQL 查詢的原型設計過程中都非常有用。這些模式的結果都存儲在 SQL 客戶端 的 Java 堆內(nèi)存中。為了保持 CLI 界面及時響應,變更日志模式僅顯示最近的 1000 個更改。表格模式支持瀏覽更大的結果,這些結果僅受可用主內(nèi)存和配置的最大行數(shù)(max-table-result-rows)的限制。
在批處理環(huán)境下執(zhí)行的查詢只能用表格模式或者Tableau模式進行檢索。
定義查詢語句后,可以將其作為長時間運行的獨立 Flink 作業(yè)提交給集群。為此,其目標系統(tǒng)需要使用 INSERT INTO 語句指定存儲結果。配置部分解釋如何聲明讀取數(shù)據(jù)的 table source,寫入數(shù)據(jù)的 sink 以及配置其他表程序屬性的方法。
3、配置
SQL 客戶端啟動時可以添加 CLI 選項,具體如下。
./bin/sql-client.sh embedded --help
Mode "embedded" submits Flink jobs from the local machine.
Syntax: embedded [OPTIONS]
"embedded" mode options:
-d,--defaults <environment file> The environment properties with which every new session is initialized.
Properties might be overwritten by session properties.
-e,--environment <environment file> The environment properties to be imported into the session. It might
overwrite default environment properties.
-h,--help Show the help message with descriptions of all options.
-hist,--history <History file path> The file which you want to save the command history into. If not
specified, we will auto-generate one under your user's home directory.
-j,--jar <JAR file> A JAR file to be imported into the
session. The file might contain
user-defined classes needed for the
execution of statements such as
functions, table sources, or sinks.
Can be used multiple times.
-l,--library <JAR directory> A JAR file directory with which every
new session is initialized. The files
might contain user-defined classes
needed for the execution of
statements such as functions, table
sources, or sinks. Can be used
multiple times.
-pyarch,--pyArchives <arg> Add python archive files for job. The
archive files will be extracted to
the working directory of python UDF
worker. Currently only zip-format is
supported. For each archive file, a
target directory be specified. If the
target directory name is specified,
the archive file will be extracted to
a name can directory with the
specified name. Otherwise, the
archive file will be extracted to a
directory with the same name of the
archive file. The files uploaded via
this option are accessible via
relative path. '#' could be used as
the separator of the archive file
path and the target directory name.
Comma (',') could be used as the
separator to specify multiple archive
files. This option can be used to
upload the virtual environment, the
data files used in Python UDF (e.g.:
--pyArchives
file:///tmp/py37.zip,file:///tmp/data
.zip#data --pyExecutable
py37.zip/py37/bin/python). The data
files could be accessed in Python
UDF, e.g.: f = open('data/data.txt',
'r').
-pyexec,--pyExecutable <arg> Specify the path of the python
interpreter used to execute the
python UDF worker (e.g.:
--pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
Apache Beam (version == 2.23.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
the above requirements.
-pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
These files will be added to the
PYTHONPATH of both the local client
and the remote python UDF worker. The
standard python resource file
suffixes such as .py/.egg/.zip or
directory are all supported. Comma
(',') could be used as the separator
to specify multiple files (e.g.:
--pyFiles
file:///tmp/myresource.zip,hdfs:///$n
amenode_address/myresource2.zip).
-pyreq,--pyRequirements <arg> Specify a requirements.txt file which
defines the third-party dependencies.
These dependencies will be installed
and added to the PYTHONPATH of the
python UDF worker. A directory which
contains the installation packages of
these dependencies could be specified
optionally. Use '#' as the separator
if the optional parameter exists
(e.g.: --pyRequirements
file:///tmp/requirements.txt#file:///
tmp/cached_dir).
-s,--session <session identifier> The identifier for a session.
'default' is the default identifier.
-u,--update <SQL update statement> Experimental (for testing only!):
Instructs the SQL Client to
immediately execute the given update
statement after starting up. The
process is shut down after the
statement has been submitted to the
cluster and returns an appropriate
return code. Currently, this feature
is only supported for INSERT INTO
statements that declare the target
sink table.
1、環(huán)境配置文件
1、flink cli配置文件說明如下
默認情況下,SQL 客戶端將從 ./conf/sql-client-defaults.yaml 中讀取配置。
SQL 查詢執(zhí)行前需要配置相關環(huán)境變量。環(huán)境配置文件 定義了 catalog、table sources、table sinks、用戶自定義函數(shù)和其他執(zhí)行或部署所需屬性。
每個環(huán)境配置文件是常規(guī)的 YAML 文件,例子如下。
tables:
- name: Users_Testing_alanchan
type: source-table
update-mode: append
connector:
type: filesystem
path: "/usr/local/bigdata/testdata/flink/users.csv"
format:
type: csv
fields:
- name: u_id
data-type: INT
- name: u_name
data-type: VARCHAR
- name: u_age
data-type: INT
- name: u_balance
data-type: DOUBLE
line-delimiter: "\n"
comment-prefix: "#"
schema:
- name: u_id
data-type: INT
- name: u_name
data-type: VARCHAR
- name: u_age
data-type: INT
- name: u_balance
data-type: DOUBLE
- name: Users_Testing_alanchan_View
type: view
query: "SELECT u_id,u_name,u_age FROM Users_Testing_alanchan"
# 定義用戶自定義函數(shù)
# functions:
# - name: myUDF
# from: class
# class: foo.bar.AggregateUDF
# constructor:
# - 7.6
# - false
# 定義 catalogs
# catalogs:
# - name: catalog_1
# type: hive
# property-version: 1
# hive-conf-dir: ...
# - name: catalog_2
# type: hive
# property-version: 1
# default-database: mydb2
# hive-conf-dir: ...
# 改變表程序基本的執(zhí)行行為屬性。
execution:
planner: blink # 可選: 'blink' (默認)或 'old'
type: streaming # 必選:執(zhí)行模式為 'batch' 或 'streaming'
result-mode: table # 必選:'table' 或 'changelog'
max-table-result-rows: 1000000 # 可選:'table' 模式下可維護的最大行數(shù)(默認為 1000000,小于 1 則表示無限制)
time-characteristic: event-time # 可選: 'processing-time' 或 'event-time' (默認)
parallelism: 1 # 可選:Flink 的并行數(shù)量(默認為 1)
periodic-watermarks-interval: 200 # 可選:周期性 watermarks 的間隔時間(默認 200 ms)
max-parallelism: 16 # 可選:Flink 的最大并行數(shù)量(默認 128)
min-idle-state-retention: 0 # 可選:表程序的最小空閑狀態(tài)時間
max-idle-state-retention: 0 # 可選:表程序的最大空閑狀態(tài)時間
current-catalog: default_catalog # 可選:當前會話 catalog 的名稱(默認為 'default_catalog')
current-database: default_database # 可選:當前 catalog 的當前數(shù)據(jù)庫名稱(默認為當前 catalog 的默認數(shù)據(jù)庫)
restart-strategy: # 可選:重啟策略(restart-strategy)
type: fallback # 默認情況下“回退”到全局重啟策略
# 用于調(diào)整和調(diào)優(yōu)表程序的配置選項。
# 在專用的”配置”頁面上可以找到完整的選項列表及其默認值。
configuration:
table.optimizer.join-reorder-enabled: true
table.exec.spill-compression.enabled: true
table.exec.spill-compression.block-size: 128kb
# 描述表程序提交集群的屬性。
deployment:
response-timeout: 5000
上述配置:
- 定義一個從 CSV 文件中讀取的 table source Users_Testing_alanchan 所需的環(huán)境,
- 定義了一個視圖 Users_Testing_alanchan_View ,該視圖是用 SQL 查詢聲明的虛擬表,
- 定義了一個用戶自定義函數(shù) myUDF,該函數(shù)可以使用類名和兩個構造函數(shù)參數(shù)進行實例化,已注釋掉
- 連接到兩個 Hive catalogs 并用 catalog_1 來作為當前目錄,用 mydb1 來作為該目錄的當前數(shù)據(jù)庫,已注釋掉
- streaming 模式下用 blink planner 來運行時間特征為 event-time 和并行度為 1 的語句,
- 在 table 結果模式下運行試探性的(exploratory)的查詢,
- 并通過配置選項對聯(lián)結(join)重排序和溢出進行一些計劃調(diào)整。
重新啟動flink cli,內(nèi)容如下:
- users.csv文件內(nèi)容
需要將該文件放在flink集群中的每臺機器上
1,alan,18,20
2,alanchan,19,30
3,alanchanchn,20,40
4,alan_chan,25,60
- flink cli查詢
查詢表和視圖
Flink SQL> select * from Users_Testing_alanchan ;
- 表顯示如下
- 視圖顯示如下
Flink SQL> select * from Users_Testing_alanchan_View;
根據(jù)使用情況,配置可以被拆分為多個文件。因此,一般情況下(用 --defaults 指定默認環(huán)境配置文件)以及基于每個會話(用 --environment 指定會話環(huán)境配置文件)來創(chuàng)建環(huán)境配置文件。每個 CLI 會話均會被屬于 session 屬性的默認屬性初始化。例如,默認環(huán)境配置文件可以指定在每個會話中都可用于查詢的所有 table source,而會話環(huán)境配置文件僅聲明特定的狀態(tài)保留時間和并行性。啟動 CLI 應用程序時,默認環(huán)境配置文件和會話環(huán)境配置文件都可以被指定。如果未指定默認環(huán)境配置文件,則 SQL 客戶端將在 Flink 的配置目錄中搜索 ./conf/sql-client-defaults.yaml。
在 CLI 會話中設置的屬性(如 SET 命令)優(yōu)先級最高:
CLI commands > session environment file > defaults environment file
2、重啟策略(Restart Strategies)
重啟策略控制 Flink 作業(yè)失敗時的重啟方式。與 Flink 集群的全局重啟策略相似,更細精度的重啟配置可以在環(huán)境配置文件中聲明。
Flink 支持以下策略:
execution:
# 退回到 flink-conf.yaml 中定義的全局策略
restart-strategy:
type: fallback
# 作業(yè)直接失敗并且不嘗試重啟
restart-strategy:
type: none
# 最多重啟作業(yè)的給定次數(shù)
restart-strategy:
type: fixed-delay
attempts: 3 # 作業(yè)被宣告失敗前的重試次數(shù)(默認:Integer.MAX_VALUE)
delay: 10000 # 重試之間的間隔時間,以毫秒為單位(默認:10 秒)
# 只要不超過每個時間間隔的最大故障數(shù)就繼續(xù)嘗試
restart-strategy:
type: failure-rate
max-failures-per-interval: 1 # 每個間隔重試的最大次數(shù)(默認:1)
failure-rate-interval: 60000 # 監(jiān)測失敗率的間隔時間,以毫秒為單位
delay: 10000 # 重試之間的間隔時間,以毫秒為單位(默認:10 秒)
2)、依賴
SQL 客戶端不要求用 Maven 或者 SBT 設置 Java 項目。相反,你可以以常規(guī)的 JAR 包給集群提交依賴項。你也可以分別(用 --jar)指定每一個 JAR 包或者(用 --library)定義整個 library 依賴庫。為連接擴展系統(tǒng)(如 Apache Kafka)和相應的數(shù)據(jù)格式(如 JSON),F(xiàn)link提供了開箱即用型 JAR 捆綁包(ready-to-use JAR bundles)。這些 JAR 包各個發(fā)行版都可以從 Maven 中央庫中下載到。
更多的關于外部連接系統(tǒng)示例參考:
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache HBase示例(5)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)
如下例子展示了從 Apache Kafka 中讀取 csv 文件并作為 table source 的環(huán)境配置文件。
# kafka表
tables:
- name: Test_Flink_Kafka_Source_SQL
type: source-table
update-mode: append
connector:
property-version: 1
type: kafka
version: "universal"
topic: t_kafkasource_testing
startup-mode: earliest-offset
properties:
bootstrap.servers: "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092"
group.id: flink_alan_kafka
format:
property-version: 1
type: csv
schema: "ROW<t_id LONG, t_name STRING, t_balance DOUBLE, t_age INT, t_insert_time TIMESTAMP>"
schema:
- name: t_id
data-type: BIGINT
- name: t_name
data-type: STRING
- name: t_balance
data-type: DOUBLE
- name: t_age
data-type: INT
- name: rowTime
data-type: TIMESTAMP(3)
rowtime:
timestamps:
type: "from-field"
from: "t_insert_time"
watermarks:
type: "periodic-bounded"
delay: "60000"
- name: procTime
data-type: TIMESTAMP(3)
proctime: true
# 改變表程序基本的執(zhí)行行為屬性。
execution:
planner: blink # 可選: 'blink' (默認)或 'old'
type: streaming # 必選:執(zhí)行模式為 'batch' 或 'streaming'
result-mode: table # 必選:'table' 或 'changelog'
max-table-result-rows: 1000000 # 可選:'table' 模式下可維護的最大行數(shù)(默認為 1000000,小于 1 則表示無限制)
time-characteristic: event-time # 可選: 'processing-time' 或 'event-time' (默認)
parallelism: 1 # 可選:Flink 的并行數(shù)量(默認為 1)
periodic-watermarks-interval: 200 # 可選:周期性 watermarks 的間隔時間(默認 200 ms)
max-parallelism: 16 # 可選:Flink 的最大并行數(shù)量(默認 128)
min-idle-state-retention: 0 # 可選:表程序的最小空閑狀態(tài)時間
max-idle-state-retention: 0 # 可選:表程序的最大空閑狀態(tài)時間
current-catalog: default_catalog # 可選:當前會話 catalog 的名稱(默認為 'default_catalog')
current-database: default_database # 可選:當前 catalog 的當前數(shù)據(jù)庫名稱(默認為當前 catalog 的默認數(shù)據(jù)庫)
restart-strategy: # 可選:重啟策略(restart-strategy)
type: fallback # 默認情況下“回退”到全局重啟策略
# 用于調(diào)整和調(diào)優(yōu)表程序的配置選項。
# 在專用的”配置”頁面上可以找到完整的選項列表及其默認值。
configuration:
table.optimizer.join-reorder-enabled: true
table.exec.spill-compression.enabled: true
table.exec.spill-compression.block-size: 128kb
# 描述表程序提交集群的屬性。
deployment:
response-timeout: 5000
運行上面的配置需要在flink的lib文件夾下增加關于kafka的依賴包,上述例子中增加了如下包
本文的依賴環(huán)境是flink 1.13.5
flink-sql-connector-kafka_2.11-1.13.5.jar
# 要重啟集群
Test_Flink_Kafka_Source_SQL 表的結果格式與絕大多數(shù)的 csv 格式相似。此外,它還添加了 rowtime 屬性 rowTime 和 processing-time 屬性 procTime。
connector 和 format 都允許定義屬性版本(當前版本為 1 )以便將來向后兼容。
- 驗證
# kafka主題操作
kafka-topics.sh --delete --topic t_kafkasource_testing --bootstrap-server server1:9092
kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafkasource_testing --partitions 1 --replication-factor 1
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_testing
# 測試數(shù)據(jù)
1,alan,15,18,2022-08-07 09:18:25
2,alanchan,20,19,2022-08-07 09:20:25
3,alanchanchn,25,20,2022-08-07 09:22:25
4,alan_chan,30,21,2022-08-07 09:24:25
5,alan_chan_chn,45,22,2022-08-07 09:26:25
# kafka客戶端操作
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_testing
>1,alan,15,18,2022-08-07 09:18:25
>2,alanchan,20,19,2022-08-07 09:20:25
>3,alanchanchn,25,20,2022-08-07 09:22:25
>4,alan_chan,30,21,2022-08-07 09:24:25
>5,alan_chan_chn,45,22,2022-08-07 09:26:25
- 查詢結果
3)、自定義函數(shù)(User-defined Functions)
SQL 客戶端允許用戶創(chuàng)建用戶自定義的函數(shù)來進行 SQL 查詢。當前,這些自定義函數(shù)僅限于 Java/Scala 編寫的類以及 Python 文件。
為提供 Java/Scala 的自定義函數(shù),你首先需要實現(xiàn)和編譯函數(shù)類,該函數(shù)繼承自 ScalarFunction、 AggregateFunction 或 TableFunction(19、Flink 的table api與sql之內(nèi)置函數(shù): Table API 和 SQL 中的內(nèi)置函數(shù))。一個或多個函數(shù)可以打包到 SQL 客戶端的 JAR 依賴中。
所有函數(shù)在被調(diào)用之前,必須在環(huán)境配置文件中提前聲明。functions 列表中每個函數(shù)類都必須指定
- 用來注冊函數(shù)的 name,
- 函數(shù)的來源 from(目前僅限于 class(Java/Scala UDF),
Java/Scala UDF 必須指定:
- 聲明了全限定名的函數(shù)類 class 以及用于實例化的 constructor 參數(shù)的可選列表。
functions:
- name: java_udf # required: name of the function
from: class # required: source of the function
class: ... # required: fully qualified class name of the function
constructor: # optional: constructor parameters of the function class
- ... # optional: a literal parameter with implicit type
- class: ... # optional: full class name of the parameter
constructor: # optional: constructor parameters of the parameter's class
- type: ... # optional: type of the literal parameter
value: ... # optional: value of the literal parameter
- name: python_udf # required: name of the function
from: python # required: source of the function
fully-qualified-name: ... # required: fully qualified class name of the function
對于 Java/Scala UDF,要確保函數(shù)類指定的構造參數(shù)順序和類型都要嚴格匹配。
1、構造函數(shù)參數(shù)
根據(jù)用戶自定義函數(shù)可知,在用到 SQL 語句中之前,有必要將構造參數(shù)匹配對應的類型。
如上述示例所示,當聲明一個用戶自定義函數(shù)時,可以使用構造參數(shù)來配置相應的類,有以下三種方式:
- 隱式類型的文本值:SQL 客戶端將自動根據(jù)文本推導對應的類型。目前,只支持 BOOLEAN、INT、 DOUBLE 和 VARCHAR 。
如果自動推導的類型與期望不符(例如,你需要 VARCHAR 類型的 false),可以改用顯式類型。
- true # -> BOOLEAN (case sensitive)
- 42 # -> INT
- 1234.222 # -> DOUBLE
- foo # -> VARCHAR
- 顯式類型的文本值:為保證類型安全,需明確聲明 type 和 value 屬性的參數(shù)。
- type: DECIMAL
value: 11111111111111111
下表列出支持的 Java 參數(shù)類型和與之相對應的 SQL 類型。
- (嵌套)類實例:除了文本值外,還可以通過指定構造參數(shù)的 class 和 constructor 屬性來創(chuàng)建(嵌套)類實例。這個過程可以遞歸執(zhí)行,直到最后的構造參數(shù)是用文本值來描述的。
- class: foo.bar.paramClass
constructor:
- StarryName
- class: java.lang.Integer
constructor:
- class: java.lang.String
constructor:
- type: VARCHAR
value: 3
4、Catalogs
Catalogs 可以由 YAML 屬性集合定義,并且在 SQL 客戶端啟動之前自動注冊到運行環(huán)境中。
用戶可以指定在 SQL CLI 中哪些 catalog 要被作為當前的 catalog,以及哪個數(shù)據(jù)庫的 catalog 可以用于當前數(shù)據(jù)庫。
catalogs:
- name: catalog_1
type: hive
property-version: 1
default-database: mydb2
hive-conf-dir: <path of Hive conf directory>
- name: catalog_2
type: hive
property-version: 1
hive-conf-dir: <path of Hive conf directory>
execution:
...
current-catalog: catalog_1
current-database: mydb1
更多關于 catalog 的內(nèi)容,參考 24、Flink 的table api與sql之Catalogs。
5、分離的 SQL 查詢
為定義端到端的 SQL 管道,SQL 的 INSERT INTO 語句可以向 Flink 集群提交長時間運行的分離查詢。查詢產(chǎn)生的結果輸出到除 SQL 客戶端外的擴展系統(tǒng)中。這樣可以應對更高的并發(fā)和更多的數(shù)據(jù)。CLI 自身在提交后不對分離查詢做任何控制。
INSERT INTO MyTableSink SELECT * FROM MyTableSource
sink MyTableSink 必須在環(huán)境配置文件中聲明。查看更多關于 Flink 支持的外部系統(tǒng)及其配置信息,參見
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache HBase示例(5)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)
如下展示 Apache Kafka 的 sink 示例。
CREATE TABLE Table_Sink_Kafka (
t_id BIGINT,
t_name STRING,
t_balance DOUBLE,
t_age INT,
t_insert_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 't_kafka_sink_flink',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'format' = 'csv'
);
Flink SQL> CREATE TABLE Table_Sink_Kafka (
> t_id BIGINT,
> t_name STRING,
> t_balance DOUBLE,
> t_age INT,
> t_insert_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 't_kafka_sink_flink',
> 'scan.startup.mode' = 'earliest-offset',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
Flink SQL> select * from Test_Flink_Kafka_Source_SQL;
[INFO] Result retrieval cancelled.
-- 查詢結果如下
SQL 客戶端要確保語句成功提交到集群上。一旦提交查詢,CLI 將展示關于 Flink 作業(yè)的相關信息。
Flink SQL> INSERT INTO Table_Sink_Kafka SELECT t_id,t_name,t_balance,t_age,rowTime FROM Test_Flink_Kafka_Source_SQL;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 40769216d90fa12fd0696e8e0a6dec2d
Flink SQL> select * from Table_Sink_Kafka;
-- 插入后的數(shù)據(jù)如下
提交后,SQL 客戶端不追蹤正在運行的 Flink 作業(yè)狀態(tài)。提交后可以關閉 CLI 進程,并且不會影響分離的查詢。Flink 的重啟策略負責容錯。取消查詢可以用 Flink 的 web 接口、命令行或 REST API 。
6、SQL 視圖
視圖是一張?zhí)摂M表,允許通過 SQL 查詢來定義。視圖的定義會被立即解析與驗證。然而,提交常規(guī) INSERT INTO 或 SELECT 語句后不會立即執(zhí)行,在訪問視圖時才會真正執(zhí)行。
視圖可以用環(huán)境配置文件或者 CLI 會話來定義。
下例展示如何在一個文件里定義多張視圖。視圖注冊的順序和定義它們的環(huán)境配置文件一致。支持諸如 視圖 A 依賴視圖 B ,視圖 B 依賴視圖 C 的引用鏈。
tables:
# filesystem表
- name: Users_Testing_view_alanchan
type: source-table
update-mode: append
connector:
type: filesystem
path: "/usr/local/bigdata/testdata/flink/users.csv"
format:
type: csv
fields:
- name: u_id
data-type: INT
- name: u_name
data-type: VARCHAR
- name: u_age
data-type: INT
- name: u_balance
data-type: DOUBLE
line-delimiter: "\n"
comment-prefix: "#"
schema:
- name: u_id
data-type: INT
- name: u_name
data-type: VARCHAR
- name: u_age
data-type: INT
- name: u_balance
data-type: DOUBLE
- name: Users_Testing_alanchan_View1
type: view
query: "SELECT u_id,u_name,u_age FROM Users_Testing_view_alanchan"
- name: Users_Testing_alanchan_View2
type: view
query: "SELECT u_id,u_name,u_balance FROM Users_Testing_view_alanchan"
相較于 table soruce 和 sink,會話環(huán)境配置文件中定義的視圖具有最高優(yōu)先級。
-驗證
Flink SQL> select * from Users_Testing_view_alanchan;
Flink SQL> select * from Users_Testing_alanchan_View2;
視圖還可以在 CLI 會話中用 CREATE VIEW 語句來創(chuàng)建:
CREATE VIEW MyNewView AS select u_name from Users_Testing_view_alanchan;
Flink SQL> select * from MyNewView;
視圖能在 CLI 會話中創(chuàng)建,也能用 DROP VIEW 語句刪除:
Flink SQL> DROP VIEW MyNewView;
[INFO] Execute statement succeed.
Flink SQL> show tables;
+------------------------------+
| table name |
+------------------------------+
| Users_Testing_alanchan_View1 |
| Users_Testing_alanchan_View2 |
| Users_Testing_view_alanchan |
+------------------------------+
3 rows in set
CLI 中視圖的定義僅限于上述語法。將來版本會支持定義視圖結構以及在表名中加入轉義的空格。
7、臨時表(Temporal Table)
臨時表是在變化的歷史記錄表上的(參數(shù)化)視圖,該視圖在某個特定時間點返回表的內(nèi)容。這對于在特定的時間戳將一張表的內(nèi)容聯(lián)結另一張表是非常有用的。更多信息見15、Flink 的table api與sql之流式概念-詳解的介紹了動態(tài)表、時間屬性配置(如何處理更新結果)、時態(tài)表、流上的join、流上的確定性以及查詢配置頁面。
下例展示如何定義一張臨時表 SourceTemporalTable:
tables:
# 定義包含對臨時表的更新的 table source (或視圖)
- name: HistorySource
type: source-table
update-mode: append
connector: # ...
format: # ...
schema:
- name: integerField
data-type: INT
- name: stringField
data-type: STRING
- name: rowtimeField
data-type: TIMESTAMP(3)
rowtime:
timestamps:
type: from-field
from: rowtimeField
watermarks:
type: from-source
# 在具有時間屬性和主鍵的變化歷史記錄表上定義臨時表
- name: SourceTemporalTable
type: temporal-table
history-table: HistorySource
primary-key: integerField
time-attribute: rowtimeField # could also be a proctime field
如例子中所示,table source,視圖和臨時表的定義可以相互混合。它們按照在環(huán)境配置文件中定義的順序進行注冊。例如,臨時表可以引用一個視圖,該視圖依賴于另一個視圖或 table source。
8、局限與未來
當前的 SQL 客戶端僅支持嵌入式模式。在1.17版本后提供基于 REST 的 SQL 客戶端網(wǎng)關(Gateway) 的功能。文章來源:http://www.zghlxwxcb.cn/news/detail-634607.html
以上,介紹了flink cli的啟動、使用以及通過創(chuàng)建kafka、filesystem等例子介紹了配置文件的使用,同時也簡單的介紹了視圖、臨時表等內(nèi)容。文章來源地址http://www.zghlxwxcb.cn/news/detail-634607.html
到了這里,關于30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!