国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink實戰(zhàn)-(6)FlinkSQL實現(xiàn)CDC

這篇具有很好參考價值的文章主要介紹了Flink實戰(zhàn)-(6)FlinkSQL實現(xiàn)CDC。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

FlinkSQL說明

  • Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設(shè)計的一套符合標(biāo)準(zhǔn) SQL 語義的開發(fā)語言。
  • 自 2015 年開始,阿里巴巴開始調(diào)研開源流計算引擎,最終決定基于 Flink 打造新一代計算引擎,針對 Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初將最終代碼開源,也就是我們熟知的 Blink。Blink 在原來的 Flink 基礎(chǔ)上最顯著的一個貢獻(xiàn)就是 Flink SQL 的實現(xiàn)。
  • Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計算領(lǐng)域,比如 Storm、Spark Streaming 都會提供一些 Function 或者 Datastream API,用戶通過 Java 或 Scala 寫業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。
  • 在 flink sql 中,對表名、字段名、函數(shù)名等是嚴(yán)格區(qū)分大小寫的,為了兼容 hive 等其他倉庫,建議建表時,表名和字段名都采用下劃線連接單詞的方式,以避免大小寫問題。比如 hive ,是不區(qū)分大小寫的,所有大寫字母最終都會被系統(tǒng)轉(zhuǎn)化為小寫字母,此時使用 flink sql 去讀寫 hive ,出現(xiàn)大寫字母時,會出現(xiàn)找不到表或字段的錯誤。關(guān)鍵字是不區(qū)分大小寫的,比如 insert、select、create等。flink sql 中所有的字符串常量都需要使用英文單引號括起來,不要使用英文雙引號以及中文符號。

前期準(zhǔn)備

依賴的環(huán)境

環(huán)境:Linux(Centos7)
Flink : 1.13.6

進(jìn)入Flink的lib目錄

cd flink-1.13.6/lib

上傳相關(guān)的依賴包,這幾個包在網(wǎng)上很容易找到

flink-sql-connector-mysql-cdc-2.1.0.jar
mysql-connector-java-8.0.13.jar
flink-sql-connector-postgres-cdc-1.2.0.jar
postgresql-42.6.0.jar

啟動 Flink客戶端

./flink-1.13.1/bin/sql-client.sh

Flink-SQL腳本

1、postgresql ->postgresql

-- pg中映射表,source
CREATE TABLE cdc_pg_source (
 id INT,
 age INT,
 name STRING
) WITH (
 'connector' = 'postgres-cdc',
  'hostname' = '10.254.21.3',
  'port' = '54432',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'username' = 'gpadmin',
  'password' = 'xxxxxxx',
  'table-name' = 'cdc_pg_source',
  'decoding.plugin.name' = 'pgoutput',
  'debezium.slot.name' = 'cdc_pg_source');




-- pg中映射表,sink
CREATE TABLE cdc_pg_sink (
 id INT,
 age INT,
 name STRING,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://10.254.21.3:54432/postgres',
  'username' = 'gpadmin',
  'password' = 'xxxxxx',
  'table-name' = 'cdc_pg_sink',
  'sink.buffer-flush.max-rows' = '1');
 


-- flink job
INSERT INTO cdc_pg_sink select * from cdc_pg_source;

2、mysql -> mysql

CREATE TABLE t_test (
 id bigint,
 username string,
 password string,
 create_time time
) WITH (
 'connector' = 'mysql-cdc',
  'hostname' = '10.252.92.4',
  'port' = '3306',
  'database-name' = 'flink_cdc_test',
  'username' = 'root',
  'password' = 'xxxx',
  'table-name' = 't_test'
);

CREATE TABLE t_test_ods (
 id bigint primary key,
 username string,
 password string,
 create_time time
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://10.252.92.4:3306/flink_cdc_test_ods',
  'username' = 'root',
  'password' = 'xxxx',
  'table-name' = 't_test',
  'sink.buffer-flush.max-rows' = '1'
);

insert into t_test_ods select * from t_test;

遇到的問題

1、Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.? 或? Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print

解決方法:

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>

下載改JAR包,把它加到Flink下的lib路徑下,然后重啟sql-client;文章來源地址http://www.zghlxwxcb.cn/news/detail-426255.html

到了這里,關(guān)于Flink實戰(zhàn)-(6)FlinkSQL實現(xiàn)CDC的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)

    Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)

    FlinkSQL 官網(wǎng)配置參數(shù): https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的錯誤,其中之一就是忘記設(shè)置空閑狀態(tài)保留時間導(dǎo)致狀態(tài)爆炸。列舉兩個場景: ? FlinkSQL 的 regular join(inner、left、right),左右表的數(shù)據(jù)都會一直保存在狀態(tài)里,不

    2024年02月14日
    瀏覽(21)
  • flinksql實時統(tǒng)計程序背壓延遲優(yōu)化

    flinksql實時統(tǒng)計程序背壓延遲優(yōu)化

    下面是實時延遲時的截圖: 下面是實時追上數(shù)據(jù)時的截圖: bingo:我開啟了MiniBatch配置,以5秒微批的方式做實時處理,程序性能直接飛速提升,6小時的延遲十幾分鐘就追上了。性能杠杠的!

    2024年02月12日
    瀏覽(15)
  • Flink:FlinkSql解析嵌套Json

    Flink:FlinkSql解析嵌套Json

    日常開發(fā)中都是用的簡便json格式,但是偶爾也會遇到嵌套json的時候,因此在用flinksql的時候就有點麻煩,下面用簡單例子簡單定義處理下 1,數(shù)據(jù)是網(wǎng)上摘抄,但包含里常用的大部分格式 { ?? ?\\\"afterColumns\\\": { ?? ??? ?\\\"created\\\": \\\"1589186680\\\", ?? ??? ?\\\"extra\\\": { ?? ??? ??? ?\\\"

    2023年04月09日
    瀏覽(24)
  • 實戰(zhàn)Flink Java api消費kafka實時數(shù)據(jù)落盤HDFS

    實戰(zhàn)Flink Java api消費kafka實時數(shù)據(jù)落盤HDFS

    在Java api中,使用flink本地模式,消費kafka主題,并直接將數(shù)據(jù)存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 為了完成 Flink 從 Kafka 消費數(shù)據(jù)并實時寫入 HDFS 的需求,通常需要啟動以下組件: 確保 Zookeeper 在運行,因為 Flink 的 Kafka Consumer 需要依賴 Zookeeper。 確保 Kafka Serve

    2024年01月24日
    瀏覽(28)
  • flink學(xué)習(xí)35:flinkSQL查詢mysql

    flink學(xué)習(xí)35:flinkSQL查詢mysql

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions} object sqlQueryTable { ? def main(args: Array[String]): Unit = { ??? //create env ??? val env = StreamExecutionEnvironment.getExecutionEnv

    2023年04月23日
    瀏覽(19)
  • 【Flink系列七】TableAPI和FlinkSQL初體驗

    【Flink系列七】TableAPI和FlinkSQL初體驗

    Apache Flink 有兩種關(guān)系型 API 來做流批統(tǒng)一處理:Table API 和 SQL Table API 是用于 Scala 和 Java 語言的查詢API,它可以用一種非常直觀的方式來組合使用選取、過濾、join 等關(guān)系型算子。 ?Flink SQL 是基于?Apache Calcite?來實現(xiàn)的標(biāo)準(zhǔn) SQL。無論輸入是連續(xù)的(流式)還是有界的(批處理

    2024年02月03日
    瀏覽(22)
  • 【Flink】FlinkSQL中執(zhí)行計劃以及如何用代碼看執(zhí)行計劃

    FilnkSQL怎么查詢優(yōu)化 Apache Flink 使用并擴(kuò)展了 Apache Calcite 來執(zhí)行復(fù)雜的查詢優(yōu)化。 這包括一系列基于規(guī)則和成本的優(yōu)化,例如: ? 基于 Apache Calcite 的子查詢解相關(guān) ? 投影剪裁 ? 分區(qū)剪裁 ? 過濾器下推 ? 子計劃消除重復(fù)數(shù)據(jù)以避免重復(fù)計算 ? 特殊子查詢重寫,包括兩部

    2023年04月11日
    瀏覽(24)
  • 【Flink】FlinkSQL讀取Mysql表中時間字段相差13個小時

    問題:Flink版本1.13,在我們使用FlinkSQL讀取Mysql中數(shù)據(jù)的時候,發(fā)現(xiàn)讀取出來的時間字段中的數(shù)據(jù)和Mysql表中的數(shù)據(jù)相差13個小時,Mysql建表語句及插入的數(shù)據(jù)如下; CREATE TABLE `mysql_example` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT \\\'自增ID\\\', `name` varchar(64) DEFAULT NULL COMMENT \\\'姓名\\\'

    2024年01月19日
    瀏覽(17)
  • 07_Hudi案例實戰(zhàn)、Flink CDC 實時數(shù)據(jù)采集、Presto、FineBI 報表可視化等

    07_Hudi案例實戰(zhàn)、Flink CDC 實時數(shù)據(jù)采集、Presto、FineBI 報表可視化等

    7.第七章 Hudi案例實戰(zhàn) 7.1 案例架構(gòu) 7.2 業(yè)務(wù)數(shù)據(jù) 7.2.1 客戶信息表 7.2.2 客戶意向表 7.2.3 客戶線索表 7.2.4 線索申訴表 7.2.5 客戶訪問咨詢記錄表 7.3 Flink CDC 實時數(shù)據(jù)采集 7.3.1 開啟MySQL binlog 7.3.2 環(huán)境準(zhǔn)備 7.3.3 實時采集數(shù)據(jù) 7.3.3.1 客戶信息表 7.3.3.2 客戶意向表 7.3.3.3 客戶線索表 7

    2024年02月13日
    瀏覽(29)
  • FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6

    FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6

    本文基于flink-1.13.6 SQL Client: Init scripts and Statement Sets 這個版本極大地改進(jìn)了 SQL 客戶端的功能?,F(xiàn)在 SQL Client 和 SQL 腳本都支持 通過Java 應(yīng)用程序執(zhí)行的幾乎所有操作(從 TableEnvironment 以編程方式啟動查詢)。這意味著 SQL 用戶在 SQL 部署中需要的代碼少了很多。其中最核心的功能

    2023年04月27日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包