国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等)

這篇具有很好參考價值的文章主要介紹了30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關基礎內(nèi)容。

  • 2、Flink基礎系列
    本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應用系列
    本部分是table api 和sql的應用部分,和實際的生產(chǎn)應用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實際的運維、監(jiān)控工作相關。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點擊:Flink 系列文章匯總索引



本文介紹了flink cli的啟動、使用以及通過創(chuàng)建kafka、filesystem等例子介紹了配置文件的使用,同時也簡單的介紹了視圖、臨時表等內(nèi)容。
本文依賴環(huán)境是flink、kafka環(huán)境可用,flink的版本是1.13.5、jdk8.

一、SQL客戶端

Flink 的 Table & SQL API 可以處理 SQL 語言編寫的查詢語句,但是這些查詢需要嵌入用 Java 或 Scala 編寫的表程序中。此外,這些程序在提交到集群前需要用構建工具打包。這或多或少限制了 Java/Scala 程序員對 Flink 的使用。

SQL 客戶端 的目的是提供一種簡單的方式來編寫、調(diào)試和提交表程序到 Flink 集群上,而無需寫一行 Java 或 Scala 代碼。SQL 客戶端命令行界面(CLI) 能夠在命令行中檢索和可視化分布式應用中實時產(chǎn)生的結果。
部署請參考:1、Flink1.12.7或1.13.5詳細介紹及本地安裝部署、驗證 和 2、Flink1.13.5二種部署方式(Standalone、Standalone HA )、四種提交任務方式(前兩種及session和per-job)驗證詳細步驟

1、啟動 SQL 客戶端命令行界面

SQL Client 腳本也位于 Flink 的 bin 目錄中。將來,用戶可以通過啟動嵌入式 standalone 進程或通過連接到遠程 SQL 客戶端網(wǎng)關來啟動 SQL 客戶端命令行界面。目前僅支持 embedded 模式??梢酝ㄟ^以下方式啟動 CLI:

#啟動flink sql客戶端
./bin/sql-client.sh embedded -d 配置文件
#或
./bin/sql-client.sh embedded
#或
./bin/sql-client.sh

#注意:需要使用部署flink集群的用戶啟動
[alanchan@server1 bin]$ sql-client.sh embedded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/flink-1.13.5/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
No default environment specified.
Searching for '/usr/local/flink-1.13.5/conf/sql-client-defaults.yaml'...not found.
Command history file path: /home/alanchan/.flink-sql-history

                                   ?▓██▓██?
                               ▓████??█▓?▓███▓?
                            ▓███▓??        ???▓██?  ?
                          ?██?   ??▓▓█▓▓??      ?████
                          ██?         ??▓███?    ?█?█?
                            ?▓█            ███   ▓??██
                              ▓█       ?????▓██▓???▓▓█
                            █? █   ???       ███▓▓█ ?█???
                            ████?   ?▓█▓      ██??? ▓███?
                         ??█▓▓██       ▓█?    ▓█?▓██▓ ?█?
                   ▓??▓████? ██         ?█    █▓??█???█?
                  ███▓?██▓  ▓█           █   █▓ ?▓█▓▓█?
                ?██▓  ?█?            █  █? ?█████▓? ██▓??
               ███? ? █?          ▓ ?█ █████???    ?█?▓  ▓?
              ██▓█ ??▓?          ▓███████▓?       ?█? ?▓ ▓██▓
           ?██▓ ▓█ █▓█       ??█████▓▓??         ██??  █ ?  ▓█?
           ▓█▓  ▓█ ██▓ ?▓▓▓▓▓▓▓?              ?██▓           ?█?
           ▓█    █ ▓███▓??              ?▓▓▓███▓          ??? ▓█
           ██▓    ██?    ??▓▓███▓▓▓▓▓██████▓?            ▓███  █
          ▓███? ███   ?▓▓???   ?▓████▓?                  ??▓?  █▓
          █▓??▓▓██  ??????????▓██▓?                            █▓
          ██ ▓??█   ▓▓▓▓???  ?█▓       ?▓▓██▓    ▓?          ??▓
          ▓█▓ ▓?█  █▓?  ??▓▓██?            ?▓█?   ??????▓█████?
           ██? ▓█?█?  ?▓▓?  ▓█                █?      ????   ?█?
           ▓█   ?█▓   ?     █?                ?█              █▓
            █▓   ██         █?                 ▓▓        ?█▓▓▓?█?
             █▓ ?▓██?       ▓?                  ▓█▓?????▓█?    ?█
              ██   ▓█▓?      ?                    ??█?██?      ▓▓
               ▓█?   ?█▓??                         ?? █?█▓?????██
                ?██?    ?▓▓?                     ▓██▓?█? ?▓▓▓▓?█▓
                  ?▓██?                          ▓?  ?█▓█  ?????
                      ?▓▓▓▓▓?????????????????????????▓▓  ▓??█?
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL> select 1;


默認情況下,SQL 客戶端將從 ./conf/sql-client-defaults.yaml 中讀取配置。有關環(huán)境配置文件結構的更多信息,請參見本文配置部分。

2、執(zhí)行 SQL 查詢

命令行界面啟動后,你可以使用 HELP 命令列出所有可用的 SQL 語句。輸入第一條 SQL 查詢語句并按 Enter 鍵執(zhí)行,可以驗證你的設置及集群連接是否正確:

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set

Flink SQL> use default_database;
[INFO] Execute statement succeed.

Flink SQL> show tables;
Empty set

Flink SQL> SELECT 'Hello World';

30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

該查詢不需要 table source,并且只產(chǎn)生一行結果。CLI 將從集群中檢索結果并將其可視化。按 Q 鍵退出結果視圖。

CLI 為維護和可視化結果提供三種模式。

  • 表格模式(table mode)在內(nèi)存中實體化結果,并將結果用規(guī)則的分頁表格可視化展示出來。執(zhí)行如下命令啟用:
SET execution.result-mode=table;

Flink SQL> SET execution.result-mode=table;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.

  • 變更日志模式(changelog mode)不會實體化和可視化結果,而是由插入(+)和撤銷(-)組成的持續(xù)查詢產(chǎn)生結果流。
SET execution.result-mode=changelog;

Flink SQL> SET execution.result-mode=changelog;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.

  • Tableau模式(tableau mode)更接近傳統(tǒng)的數(shù)據(jù)庫,會將執(zhí)行的結果以制表的形式直接打在屏幕之上。具體顯示的內(nèi)容會取決于作業(yè) 執(zhí)行模式的不同(execution.type):
SET execution.result-mode=tableau;

Flink SQL> SET execution.result-mode=tableau;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.

注意當你使用這個模式運行一個流式查詢的時候,F(xiàn)link 會將結果持續(xù)的打印在當前的屏幕之上。如果這個流式查詢的輸入是有限的數(shù)據(jù)集, 那么Flink在處理完所有的數(shù)據(jù)之后,會自動的停止作業(yè),同時屏幕上的打印也會相應的停止。如果你想提前結束這個查詢,那么可以直接使用 CTRL-C 按鍵,這個會停掉作業(yè)同時停止屏幕上的打印。

你可以用如下查詢來查看三種結果模式的運行情況:


SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

此查詢執(zhí)行一個有限字數(shù)示例:

變更日志模式 下,看到的結果應該類似于:
30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算
表格模式 下,可視化結果表將不斷更新,直到表程序以如下內(nèi)容結束:
30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

Tableau模式 下,如果這個查詢以流的方式執(zhí)行,那么將顯示以下內(nèi)容:
30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

如果這個查詢以批的方式執(zhí)行,顯示的內(nèi)容如下:

±------±----+
| name | cnt |
±------±----+
| Alice | 1 |
| Bob | 2 |
| Greg | 1 |
±------±----+
3 rows in set

這幾種結果模式在 SQL 查詢的原型設計過程中都非常有用。這些模式的結果都存儲在 SQL 客戶端 的 Java 堆內(nèi)存中。為了保持 CLI 界面及時響應,變更日志模式僅顯示最近的 1000 個更改。表格模式支持瀏覽更大的結果,這些結果僅受可用主內(nèi)存和配置的最大行數(shù)(max-table-result-rows)的限制。

在批處理環(huán)境下執(zhí)行的查詢只能用表格模式或者Tableau模式進行檢索。

定義查詢語句后,可以將其作為長時間運行的獨立 Flink 作業(yè)提交給集群。為此,其目標系統(tǒng)需要使用 INSERT INTO 語句指定存儲結果。配置部分解釋如何聲明讀取數(shù)據(jù)的 table source,寫入數(shù)據(jù)的 sink 以及配置其他表程序屬性的方法。

3、配置

SQL 客戶端啟動時可以添加 CLI 選項,具體如下。

./bin/sql-client.sh embedded --help

Mode "embedded" submits Flink jobs from the local machine.

  Syntax: embedded [OPTIONS]
  "embedded" mode options:
     -d,--defaults <environment file>      The environment properties with which every new session is initialized.
                                           Properties might be overwritten by session properties.
     -e,--environment <environment file>   The environment properties to be  imported into the session. It might
                                           overwrite default environment properties.
     -h,--help                             Show the help message with descriptions of all options.
     -hist,--history <History file path>   The file which you want to save the command history into. If not
                                           specified, we will auto-generate one under your user's home directory.
     -j,--jar <JAR file>                   A JAR file to be imported into the
                                           session. The file might contain
                                           user-defined classes needed for the
                                           execution of statements such as
                                           functions, table sources, or sinks.
                                           Can be used multiple times.
     -l,--library <JAR directory>          A JAR file directory with which every
                                           new session is initialized. The files
                                           might contain user-defined classes
                                           needed for the execution of
                                           statements such as functions, table
                                           sources, or sinks. Can be used
                                           multiple times.
     -pyarch,--pyArchives <arg>            Add python archive files for job. The
                                           archive files will be extracted to
                                           the working directory of python UDF
                                           worker. Currently only zip-format is
                                           supported. For each archive file, a
                                           target directory be specified. If the
                                           target directory name is specified,
                                           the archive file will be extracted to
                                           a name can directory with the
                                           specified name. Otherwise, the
                                           archive file will be extracted to a
                                           directory with the same name of the
                                           archive file. The files uploaded via
                                           this option are accessible via
                                           relative path. '#' could be used as
                                           the separator of the archive file
                                           path and the target directory name.
                                           Comma (',') could be used as the
                                           separator to specify multiple archive
                                           files. This option can be used to
                                           upload the virtual environment, the
                                           data files used in Python UDF (e.g.:
                                           --pyArchives
                                           file:///tmp/py37.zip,file:///tmp/data
                                           .zip#data --pyExecutable
                                           py37.zip/py37/bin/python). The data
                                           files could be accessed in Python
                                           UDF, e.g.: f = open('data/data.txt',
                                           'r').
     -pyexec,--pyExecutable <arg>          Specify the path of the python
                                           interpreter used to execute the
                                           python UDF worker (e.g.:
                                           --pyExecutable
                                           /usr/local/bin/python3). The python
                                           UDF worker depends on Python 3.5+,
                                           Apache Beam (version == 2.23.0), Pip
                                           (version >= 7.1.0) and SetupTools
                                           (version >= 37.0.0). Please ensure
                                           that the specified environment meets
                                           the above requirements.
     -pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.
                                           These files will be added to the
                                           PYTHONPATH of both the local client
                                           and the remote python UDF worker. The
                                           standard python resource file
                                           suffixes such as .py/.egg/.zip or
                                           directory are all supported. Comma
                                           (',') could be used as the separator
                                           to specify multiple files (e.g.:
                                           --pyFiles
                                           file:///tmp/myresource.zip,hdfs:///$n
                                           amenode_address/myresource2.zip).
     -pyreq,--pyRequirements <arg>         Specify a requirements.txt file which
                                           defines the third-party dependencies.
                                           These dependencies will be installed
                                           and added to the PYTHONPATH of the
                                           python UDF worker. A directory which
                                           contains the installation packages of
                                           these dependencies could be specified
                                           optionally. Use '#' as the separator
                                           if the optional parameter exists
                                           (e.g.: --pyRequirements
                                           file:///tmp/requirements.txt#file:///
                                           tmp/cached_dir).
     -s,--session <session identifier>     The identifier for a session.
                                           'default' is the default identifier.
     -u,--update <SQL update statement>    Experimental (for testing only!):
                                           Instructs the SQL Client to
                                           immediately execute the given update
                                           statement after starting up. The
                                           process is shut down after the
                                           statement has been submitted to the
                                           cluster and returns an appropriate
                                           return code. Currently, this feature
                                           is only supported for INSERT INTO
                                           statements that declare the target
                                           sink table.

1、環(huán)境配置文件

1、flink cli配置文件說明如下

默認情況下,SQL 客戶端將從 ./conf/sql-client-defaults.yaml 中讀取配置。
SQL 查詢執(zhí)行前需要配置相關環(huán)境變量。環(huán)境配置文件 定義了 catalog、table sources、table sinks、用戶自定義函數(shù)和其他執(zhí)行或部署所需屬性。
每個環(huán)境配置文件是常規(guī)的 YAML 文件,例子如下。

tables:
  - name: Users_Testing_alanchan
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/usr/local/bigdata/testdata/flink/users.csv"
    format:
      type: csv
      fields:
        - name: u_id
          data-type: INT
        - name: u_name
          data-type: VARCHAR
        - name: u_age
          data-type: INT
        - name: u_balance
          data-type: DOUBLE
      line-delimiter: "\n"
      comment-prefix: "#"

    schema:
      - name: u_id
        data-type: INT
      - name: u_name
        data-type: VARCHAR
      - name: u_age
        data-type: INT
      - name: u_balance
        data-type: DOUBLE

  - name: Users_Testing_alanchan_View
    type: view
    query: "SELECT u_id,u_name,u_age FROM Users_Testing_alanchan"

# 定義用戶自定義函數(shù)
# functions:
#   - name: myUDF
#     from: class
#     class: foo.bar.AggregateUDF
#     constructor:
#       - 7.6
#       - false

# 定義 catalogs
# catalogs:
#   - name: catalog_1
#      type: hive
#      property-version: 1
#      hive-conf-dir: ...
#    - name: catalog_2
#      type: hive
#      property-version: 1
#      default-database: mydb2
#      hive-conf-dir: ...


# 改變表程序基本的執(zhí)行行為屬性。
execution:
  planner: blink                    # 可選: 'blink' (默認)或 'old'
  type: streaming                   # 必選:執(zhí)行模式為 'batch' 或 'streaming'
  result-mode: table                # 必選:'table' 或 'changelog'
  max-table-result-rows: 1000000    # 可選:'table' 模式下可維護的最大行數(shù)(默認為 1000000,小于 1 則表示無限制)
  time-characteristic: event-time   # 可選: 'processing-time' 或 'event-time' (默認)
  parallelism: 1                    # 可選:Flink 的并行數(shù)量(默認為 1)
  periodic-watermarks-interval: 200 # 可選:周期性 watermarks 的間隔時間(默認 200 ms)
  max-parallelism: 16               # 可選:Flink 的最大并行數(shù)量(默認 128)
  min-idle-state-retention: 0       # 可選:表程序的最小空閑狀態(tài)時間
  max-idle-state-retention: 0       # 可選:表程序的最大空閑狀態(tài)時間
  current-catalog: default_catalog  # 可選:當前會話 catalog 的名稱(默認為 'default_catalog')
  current-database: default_database # 可選:當前 catalog 的當前數(shù)據(jù)庫名稱(默認為當前 catalog 的默認數(shù)據(jù)庫)
  restart-strategy:                 # 可選:重啟策略(restart-strategy)
    type: fallback                  #   默認情況下“回退”到全局重啟策略


# 用于調(diào)整和調(diào)優(yōu)表程序的配置選項。
# 在專用的”配置”頁面上可以找到完整的選項列表及其默認值。
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# 描述表程序提交集群的屬性。
deployment:
  response-timeout: 5000

上述配置:

  • 定義一個從 CSV 文件中讀取的 table source Users_Testing_alanchan 所需的環(huán)境,
  • 定義了一個視圖 Users_Testing_alanchan_View ,該視圖是用 SQL 查詢聲明的虛擬表,
  • 定義了一個用戶自定義函數(shù) myUDF,該函數(shù)可以使用類名和兩個構造函數(shù)參數(shù)進行實例化,已注釋掉
  • 連接到兩個 Hive catalogs 并用 catalog_1 來作為當前目錄,用 mydb1 來作為該目錄的當前數(shù)據(jù)庫,已注釋掉
  • streaming 模式下用 blink planner 來運行時間特征為 event-time 和并行度為 1 的語句,
  • 在 table 結果模式下運行試探性的(exploratory)的查詢,
  • 并通過配置選項對聯(lián)結(join)重排序和溢出進行一些計劃調(diào)整。

重新啟動flink cli,內(nèi)容如下:

  • users.csv文件內(nèi)容
    需要將該文件放在flink集群中的每臺機器上
1,alan,18,20
2,alanchan,19,30
3,alanchanchn,20,40
4,alan_chan,25,60
  • flink cli查詢

查詢表和視圖

Flink SQL> select * from Users_Testing_alanchan ;

  • 表顯示如下
    30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算
  • 視圖顯示如下
Flink SQL> select * from Users_Testing_alanchan_View;

30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

根據(jù)使用情況,配置可以被拆分為多個文件。因此,一般情況下(用 --defaults 指定默認環(huán)境配置文件)以及基于每個會話(用 --environment 指定會話環(huán)境配置文件)來創(chuàng)建環(huán)境配置文件。每個 CLI 會話均會被屬于 session 屬性的默認屬性初始化。例如,默認環(huán)境配置文件可以指定在每個會話中都可用于查詢的所有 table source,而會話環(huán)境配置文件僅聲明特定的狀態(tài)保留時間和并行性。啟動 CLI 應用程序時,默認環(huán)境配置文件和會話環(huán)境配置文件都可以被指定。如果未指定默認環(huán)境配置文件,則 SQL 客戶端將在 Flink 的配置目錄中搜索 ./conf/sql-client-defaults.yaml。

在 CLI 會話中設置的屬性(如 SET 命令)優(yōu)先級最高:
CLI commands > session environment file > defaults environment file

2、重啟策略(Restart Strategies)

重啟策略控制 Flink 作業(yè)失敗時的重啟方式。與 Flink 集群的全局重啟策略相似,更細精度的重啟配置可以在環(huán)境配置文件中聲明。

Flink 支持以下策略:

execution:
  # 退回到 flink-conf.yaml 中定義的全局策略
  restart-strategy:
    type: fallback

  # 作業(yè)直接失敗并且不嘗試重啟
  restart-strategy:
    type: none

  # 最多重啟作業(yè)的給定次數(shù)
  restart-strategy:
    type: fixed-delay
    attempts: 3      # 作業(yè)被宣告失敗前的重試次數(shù)(默認:Integer.MAX_VALUE)
    delay: 10000     # 重試之間的間隔時間,以毫秒為單位(默認:10 秒)

  # 只要不超過每個時間間隔的最大故障數(shù)就繼續(xù)嘗試
  restart-strategy:
    type: failure-rate
    max-failures-per-interval: 1   # 每個間隔重試的最大次數(shù)(默認:1)
    failure-rate-interval: 60000   # 監(jiān)測失敗率的間隔時間,以毫秒為單位
    delay: 10000                   # 重試之間的間隔時間,以毫秒為單位(默認:10 秒)
    

2)、依賴

SQL 客戶端不要求用 Maven 或者 SBT 設置 Java 項目。相反,你可以以常規(guī)的 JAR 包給集群提交依賴項。你也可以分別(用 --jar)指定每一個 JAR 包或者(用 --library)定義整個 library 依賴庫。為連接擴展系統(tǒng)(如 Apache Kafka)和相應的數(shù)據(jù)格式(如 JSON),F(xiàn)link提供了開箱即用型 JAR 捆綁包(ready-to-use JAR bundles)。這些 JAR 包各個發(fā)行版都可以從 Maven 中央庫中下載到。

更多的關于外部連接系統(tǒng)示例參考:
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache HBase示例(5)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)

如下例子展示了從 Apache Kafka 中讀取 csv 文件并作為 table source 的環(huán)境配置文件。

# kafka表
tables:
  - name: Test_Flink_Kafka_Source_SQL
    type: source-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "universal"
      topic: t_kafkasource_testing
      startup-mode: earliest-offset
      properties:
        bootstrap.servers: "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092"
        group.id: flink_alan_kafka
    format:
      property-version: 1
      type: csv
      schema: "ROW<t_id LONG, t_name STRING, t_balance DOUBLE, t_age INT, t_insert_time TIMESTAMP>"
    schema:
      - name: t_id
        data-type: BIGINT
      - name: t_name
        data-type: STRING
      - name: t_balance
        data-type: DOUBLE
      - name: t_age
        data-type: INT
      - name: rowTime
        data-type: TIMESTAMP(3)
        rowtime:
          timestamps:
            type: "from-field"
            from: "t_insert_time"
          watermarks:
            type: "periodic-bounded"
            delay: "60000"
      - name: procTime
        data-type: TIMESTAMP(3)
        proctime: true

# 改變表程序基本的執(zhí)行行為屬性。
execution:
  planner: blink                    # 可選: 'blink' (默認)或 'old'
  type: streaming                   # 必選:執(zhí)行模式為 'batch' 或 'streaming'
  result-mode: table                # 必選:'table' 或 'changelog'
  max-table-result-rows: 1000000    # 可選:'table' 模式下可維護的最大行數(shù)(默認為 1000000,小于 1 則表示無限制)
  time-characteristic: event-time   # 可選: 'processing-time' 或 'event-time' (默認)
  parallelism: 1                    # 可選:Flink 的并行數(shù)量(默認為 1)
  periodic-watermarks-interval: 200 # 可選:周期性 watermarks 的間隔時間(默認 200 ms)
  max-parallelism: 16               # 可選:Flink 的最大并行數(shù)量(默認 128)
  min-idle-state-retention: 0       # 可選:表程序的最小空閑狀態(tài)時間
  max-idle-state-retention: 0       # 可選:表程序的最大空閑狀態(tài)時間
  current-catalog: default_catalog  # 可選:當前會話 catalog 的名稱(默認為 'default_catalog')
  current-database: default_database # 可選:當前 catalog 的當前數(shù)據(jù)庫名稱(默認為當前 catalog 的默認數(shù)據(jù)庫)
  restart-strategy:                 # 可選:重啟策略(restart-strategy)
    type: fallback                  #   默認情況下“回退”到全局重啟策略


# 用于調(diào)整和調(diào)優(yōu)表程序的配置選項。
# 在專用的”配置”頁面上可以找到完整的選項列表及其默認值。
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# 描述表程序提交集群的屬性。
deployment:
  response-timeout: 5000
        

運行上面的配置需要在flink的lib文件夾下增加關于kafka的依賴包,上述例子中增加了如下包
本文的依賴環(huán)境是flink 1.13.5

flink-sql-connector-kafka_2.11-1.13.5.jar
# 要重啟集群

Test_Flink_Kafka_Source_SQL 表的結果格式與絕大多數(shù)的 csv 格式相似。此外,它還添加了 rowtime 屬性 rowTime 和 processing-time 屬性 procTime。
connector 和 format 都允許定義屬性版本(當前版本為 1 )以便將來向后兼容。

  • 驗證
# kafka主題操作
kafka-topics.sh --delete --topic t_kafkasource_testing --bootstrap-server server1:9092
kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafkasource_testing --partitions 1 --replication-factor 1
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_testing

# 測試數(shù)據(jù)
1,alan,15,18,2022-08-07 09:18:25
2,alanchan,20,19,2022-08-07 09:20:25
3,alanchanchn,25,20,2022-08-07 09:22:25
4,alan_chan,30,21,2022-08-07 09:24:25
5,alan_chan_chn,45,22,2022-08-07 09:26:25

# kafka客戶端操作
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource_testing
>1,alan,15,18,2022-08-07 09:18:25
>2,alanchan,20,19,2022-08-07 09:20:25
>3,alanchanchn,25,20,2022-08-07 09:22:25
>4,alan_chan,30,21,2022-08-07 09:24:25
>5,alan_chan_chn,45,22,2022-08-07 09:26:25


  • 查詢結果
    30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

3)、自定義函數(shù)(User-defined Functions)

SQL 客戶端允許用戶創(chuàng)建用戶自定義的函數(shù)來進行 SQL 查詢。當前,這些自定義函數(shù)僅限于 Java/Scala 編寫的類以及 Python 文件。

為提供 Java/Scala 的自定義函數(shù),你首先需要實現(xiàn)和編譯函數(shù)類,該函數(shù)繼承自 ScalarFunction、 AggregateFunction 或 TableFunction(19、Flink 的table api與sql之內(nèi)置函數(shù): Table API 和 SQL 中的內(nèi)置函數(shù))。一個或多個函數(shù)可以打包到 SQL 客戶端的 JAR 依賴中。

所有函數(shù)在被調(diào)用之前,必須在環(huán)境配置文件中提前聲明。functions 列表中每個函數(shù)類都必須指定

  • 用來注冊函數(shù)的 name,
  • 函數(shù)的來源 from(目前僅限于 class(Java/Scala UDF),

Java/Scala UDF 必須指定:

  • 聲明了全限定名的函數(shù)類 class 以及用于實例化的 constructor 參數(shù)的可選列表。
functions:
  - name: java_udf               # required: name of the function
    from: class                  # required: source of the function
    class: ...                   # required: fully qualified class name of the function
    constructor:                 # optional: constructor parameters of the function class
      - ...                      # optional: a literal parameter with implicit type
      - class: ...               # optional: full class name of the parameter
        constructor:             # optional: constructor parameters of the parameter's class
          - type: ...            # optional: type of the literal parameter
            value: ...           # optional: value of the literal parameter
  - name: python_udf             # required: name of the function
    from: python                 # required: source of the function 
    fully-qualified-name: ...    # required: fully qualified class name of the function    

對于 Java/Scala UDF,要確保函數(shù)類指定的構造參數(shù)順序和類型都要嚴格匹配。

1、構造函數(shù)參數(shù)

根據(jù)用戶自定義函數(shù)可知,在用到 SQL 語句中之前,有必要將構造參數(shù)匹配對應的類型。
如上述示例所示,當聲明一個用戶自定義函數(shù)時,可以使用構造參數(shù)來配置相應的類,有以下三種方式:

  • 隱式類型的文本值:SQL 客戶端將自動根據(jù)文本推導對應的類型。目前,只支持 BOOLEAN、INT、 DOUBLE 和 VARCHAR 。

如果自動推導的類型與期望不符(例如,你需要 VARCHAR 類型的 false),可以改用顯式類型。

- true         # -> BOOLEAN (case sensitive)
- 42           # -> INT
- 1234.222     # -> DOUBLE
- foo          # -> VARCHAR
  • 顯式類型的文本值:為保證類型安全,需明確聲明 type 和 value 屬性的參數(shù)。
- type: DECIMAL
  value: 11111111111111111

下表列出支持的 Java 參數(shù)類型和與之相對應的 SQL 類型。
30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

  • (嵌套)類實例:除了文本值外,還可以通過指定構造參數(shù)的 class 和 constructor 屬性來創(chuàng)建(嵌套)類實例。這個過程可以遞歸執(zhí)行,直到最后的構造參數(shù)是用文本值來描述的。
- class: foo.bar.paramClass
  constructor:
    - StarryName
    - class: java.lang.Integer
      constructor:
        - class: java.lang.String
          constructor:
            - type: VARCHAR
              value: 3

4、Catalogs

Catalogs 可以由 YAML 屬性集合定義,并且在 SQL 客戶端啟動之前自動注冊到運行環(huán)境中。

用戶可以指定在 SQL CLI 中哪些 catalog 要被作為當前的 catalog,以及哪個數(shù)據(jù)庫的 catalog 可以用于當前數(shù)據(jù)庫。

catalogs:
   - name: catalog_1
     type: hive
     property-version: 1
     default-database: mydb2
     hive-conf-dir: <path of Hive conf directory>
   - name: catalog_2
     type: hive
     property-version: 1
     hive-conf-dir: <path of Hive conf directory>

execution:
   ...
   current-catalog: catalog_1
   current-database: mydb1

更多關于 catalog 的內(nèi)容,參考 24、Flink 的table api與sql之Catalogs。

5、分離的 SQL 查詢

為定義端到端的 SQL 管道,SQL 的 INSERT INTO 語句可以向 Flink 集群提交長時間運行的分離查詢。查詢產(chǎn)生的結果輸出到除 SQL 客戶端外的擴展系統(tǒng)中。這樣可以應對更高的并發(fā)和更多的數(shù)據(jù)。CLI 自身在提交后不對分離查詢做任何控制。

INSERT INTO MyTableSink SELECT * FROM MyTableSource

sink MyTableSink 必須在環(huán)境配置文件中聲明。查看更多關于 Flink 支持的外部系統(tǒng)及其配置信息,參見
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache HBase示例(5)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)

如下展示 Apache Kafka 的 sink 示例。

CREATE TABLE Table_Sink_Kafka (
    t_id BIGINT, 
    t_name STRING, 
    t_balance DOUBLE, 
    t_age INT, 
    t_insert_time TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 't_kafka_sink_flink',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
    'format' = 'csv'
);

Flink SQL> CREATE TABLE Table_Sink_Kafka (
>     t_id BIGINT, 
>     t_name STRING, 
>     t_balance DOUBLE, 
>     t_age INT, 
>     t_insert_time TIMESTAMP(3)
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 't_kafka_sink_flink',
>     'scan.startup.mode' = 'earliest-offset',
>     'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>     'format' = 'csv'
> );
[INFO] Execute statement succeed.

Flink SQL> select * from Test_Flink_Kafka_Source_SQL;
[INFO] Result retrieval cancelled.

-- 查詢結果如下

30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算
SQL 客戶端要確保語句成功提交到集群上。一旦提交查詢,CLI 將展示關于 Flink 作業(yè)的相關信息。

Flink SQL> INSERT INTO Table_Sink_Kafka SELECT t_id,t_name,t_balance,t_age,rowTime FROM Test_Flink_Kafka_Source_SQL;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 40769216d90fa12fd0696e8e0a6dec2d

Flink SQL> select * from Table_Sink_Kafka;
-- 插入后的數(shù)據(jù)如下

30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

提交后,SQL 客戶端不追蹤正在運行的 Flink 作業(yè)狀態(tài)。提交后可以關閉 CLI 進程,并且不會影響分離的查詢。Flink 的重啟策略負責容錯。取消查詢可以用 Flink 的 web 接口、命令行或 REST API 。

6、SQL 視圖

視圖是一張?zhí)摂M表,允許通過 SQL 查詢來定義。視圖的定義會被立即解析與驗證。然而,提交常規(guī) INSERT INTO 或 SELECT 語句后不會立即執(zhí)行,在訪問視圖時才會真正執(zhí)行。

視圖可以用環(huán)境配置文件或者 CLI 會話來定義。

下例展示如何在一個文件里定義多張視圖。視圖注冊的順序和定義它們的環(huán)境配置文件一致。支持諸如 視圖 A 依賴視圖 B ,視圖 B 依賴視圖 C 的引用鏈。

tables:
# filesystem表
  - name: Users_Testing_view_alanchan
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/usr/local/bigdata/testdata/flink/users.csv"
    format:
      type: csv
      fields:
        - name: u_id
          data-type: INT
        - name: u_name
          data-type: VARCHAR
        - name: u_age
          data-type: INT
        - name: u_balance
          data-type: DOUBLE
      line-delimiter: "\n"
      comment-prefix: "#"

    schema:
      - name: u_id
        data-type: INT
      - name: u_name
        data-type: VARCHAR
      - name: u_age
        data-type: INT
      - name: u_balance
        data-type: DOUBLE
  - name: Users_Testing_alanchan_View1
    type: view
    query: "SELECT u_id,u_name,u_age FROM Users_Testing_view_alanchan"

  - name: Users_Testing_alanchan_View2
    type: view
    query: "SELECT u_id,u_name,u_balance FROM Users_Testing_view_alanchan"

相較于 table soruce 和 sink,會話環(huán)境配置文件中定義的視圖具有最高優(yōu)先級。
-驗證

Flink SQL> select * from Users_Testing_view_alanchan;

Flink SQL> select * from Users_Testing_alanchan_View2;

30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

視圖還可以在 CLI 會話中用 CREATE VIEW 語句來創(chuàng)建:

CREATE VIEW MyNewView AS select u_name from Users_Testing_view_alanchan;

Flink SQL> select * from MyNewView;

30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等),# Flink專欄,flink,sql,kafka,flink 流批一體化,flink sql,flink tabapi,flink 實時計算

視圖能在 CLI 會話中創(chuàng)建,也能用 DROP VIEW 語句刪除:

Flink SQL> DROP VIEW MyNewView;
[INFO] Execute statement succeed.

Flink SQL> show tables;
+------------------------------+
|                   table name |
+------------------------------+
| Users_Testing_alanchan_View1 |
| Users_Testing_alanchan_View2 |
|  Users_Testing_view_alanchan |
+------------------------------+
3 rows in set

CLI 中視圖的定義僅限于上述語法。將來版本會支持定義視圖結構以及在表名中加入轉義的空格。

7、臨時表(Temporal Table)

臨時表是在變化的歷史記錄表上的(參數(shù)化)視圖,該視圖在某個特定時間點返回表的內(nèi)容。這對于在特定的時間戳將一張表的內(nèi)容聯(lián)結另一張表是非常有用的。更多信息見15、Flink 的table api與sql之流式概念-詳解的介紹了動態(tài)表、時間屬性配置(如何處理更新結果)、時態(tài)表、流上的join、流上的確定性以及查詢配置頁面。

下例展示如何定義一張臨時表 SourceTemporalTable:

tables:

  # 定義包含對臨時表的更新的 table source (或視圖)

  - name: HistorySource
    type: source-table
    update-mode: append
    connector: # ...
    format: # ...
    schema:
      - name: integerField
        data-type: INT
      - name: stringField
        data-type: STRING
      - name: rowtimeField
        data-type: TIMESTAMP(3)
        rowtime:
          timestamps:
            type: from-field
            from: rowtimeField
          watermarks:
            type: from-source

  # 在具有時間屬性和主鍵的變化歷史記錄表上定義臨時表
  - name: SourceTemporalTable
    type: temporal-table
    history-table: HistorySource
    primary-key: integerField
    time-attribute: rowtimeField  # could also be a proctime field

如例子中所示,table source,視圖和臨時表的定義可以相互混合。它們按照在環(huán)境配置文件中定義的順序進行注冊。例如,臨時表可以引用一個視圖,該視圖依賴于另一個視圖或 table source。

8、局限與未來

當前的 SQL 客戶端僅支持嵌入式模式。在1.17版本后提供基于 REST 的 SQL 客戶端網(wǎng)關(Gateway) 的功能。

以上,介紹了flink cli的啟動、使用以及通過創(chuàng)建kafka、filesystem等例子介紹了配置文件的使用,同時也簡單的介紹了視圖、臨時表等內(nèi)容。文章來源地址http://www.zghlxwxcb.cn/news/detail-634607.html

到了這里,關于30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • kafka 02——三個重要的kafka客戶端

    kafka 02——三個重要的kafka客戶端

    請參考下面的文章: Kafka 01——Kafka的安裝及簡單入門使用. AdminClient API: 允許管理和檢測Topic、Broker以及其他Kafka對象。 Producer API: 發(fā)布消息到一個或多個API。 Consumer API: 訂閱一個或多個Topic,并處理產(chǎn)生的消息。 如下: 完整的pom 關于配置,可參考官網(wǎng): https://kafka.apa

    2024年02月13日
    瀏覽(26)
  • kafka客戶端應用參數(shù)詳解

    kafka客戶端應用參數(shù)詳解

    Kafka提供了非常簡單的客戶端API。只需要引入一個Maven依賴即可: 1、消息發(fā)送者主流程? 然后可以使用Kafka提供的Producer類,快速發(fā)送消息。 ? 整體來說,構建Producer分為三個步驟: 設置Producer核心屬性 ?:Producer可選的屬性都可以由ProducerConfig類管理。比如ProducerConfig.BOOTST

    2024年02月07日
    瀏覽(26)
  • kafka客戶端工具(Kafka Tool)的安裝

    kafka客戶端工具(Kafka Tool)的安裝

    官方下載 根據(jù)不同的系統(tǒng)下載對應的版本,點擊下載后雙擊,如何一直下一步,安裝 kafka環(huán)境搭建請參考:CentOS 搭建Kafka集群 (1)連接kafka (2)簡單使用 ?

    2024年04月23日
    瀏覽(35)
  • kafka之java客戶端實戰(zhàn)

    kafka之java客戶端實戰(zhàn)

    ????????Kafka提供了兩套客戶端API, HighLevel API和LowLevel API 。 HighLevel API封裝了kafka的運行細節(jié),使用起來比較簡單,是企業(yè)開發(fā)過程中最常用的客戶端API。 而LowLevel API則需要客戶端自己管理Kafka的運行細節(jié),Partition,Offset這些數(shù)據(jù)都由客戶端自行管理。這層API功能更靈活,

    2024年01月17日
    瀏覽(22)
  • Flink 客戶端操作命令及可視化工具

    Flink 客戶端操作命令及可視化工具

    Flink 提供了豐富的客戶端操作來提交任務和與任務進行交互。下面主要從 Flink 命令行、 Scala Shell 、 SQL Client 、 Restful API 和 Web 五個方面進行整理。 在 Flink 安裝目錄的 bin 目錄下可以看到 flink , start-scala-shell.sh 和 sql-client.sh 等文件,這些都是客戶端操作的入口。 run 運行任務

    2024年02月04日
    瀏覽(1279)
  • 自定義kafka客戶端消費topic

    使用自定義的KafkaConsumer給spring進行管理,之后在注入topic的set方法中,開單線程主動訂閱和讀取該topic的消息。 后端服務不需要啟動時就開始監(jiān)聽消費,而是根據(jù)啟動的模塊或者用戶自定義監(jiān)聽需要監(jiān)聽或者停止的topic 使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中沒

    2024年02月02日
    瀏覽(19)
  • python-kafka客戶端封裝

    本文對python的kafka包做簡單封裝,方便kafka初學者使用。包安裝: kafka_helper.py kafka_test.py Kafka入門,這一篇就夠了(安裝,topic,生產(chǎn)者,消費者)

    2024年02月09日
    瀏覽(20)
  • kafka:java集成 kafka(springboot集成、客戶端集成)

    kafka:java集成 kafka(springboot集成、客戶端集成)

    摘要 對于java的kafka集成,一般選用springboot集成kafka,但可能由于對接方kafka老舊、kafka不安全等問題導致kafak版本與spring版本不兼容,這個時候就得自己根據(jù)kafka客戶端api集成了。 一、springboot集成kafka 具體官方文檔地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    瀏覽(94)
  • c#客戶端Kafka的使用方法

    c#客戶端Kafka的使用方法

    Apache Kafka是一個分布式流處理平臺,最初由LinkedIn開發(fā),現(xiàn)在是Apache軟件基金會的頂級項目之一。Kafka能夠處理大規(guī)模的實時數(shù)據(jù)流,支持高可靠性、高可擴展性、低延遲和高吞吐量。它主要用于構建實時數(shù)據(jù)管道和流式處理應用程序。 Kafka的核心概念包括:Producer(生產(chǎn)者)

    2024年02月12日
    瀏覽(22)
  • Kafka客戶端程序無法連接到Kafka集群的解決方法

    Kafka是一個高性能、分布式的流式數(shù)據(jù)平臺,廣泛用于構建實時數(shù)據(jù)流處理應用程序。然而,有時候我們可能會遇到Kafka客戶端程序無法連接到Kafka集群的問題。在本文中,我將介紹一些可能導致連接問題的常見原因,并提供相應的解決方案。 網(wǎng)絡配置問題 首先,確保Kafka集群

    2024年01月21日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包