国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用finksql方式將mysql數(shù)據(jù)同步到kafka中,每次只能同步一張表

這篇具有很好參考價(jià)值的文章主要介紹了使用finksql方式將mysql數(shù)據(jù)同步到kafka中,每次只能同步一張表。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

使用finksql方式將mysql數(shù)據(jù)同步到kafka中,每次只能同步一張表文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-680930.html

package flink;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class FlinkSQL_CDC {

    public static void main(String[] args) throws Exception {

//
//        Configuration conf = new Configuration();
//        conf.setInteger("rest.port",3335);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //1.創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        //2.創(chuàng)建Flink-MySQL-CDC的Source
        TableResult tableResult = tableEnv.executeSql("CREATE TABLE table_name (" +
                "  id INT primary key," +
                "  name STRING" +
                ") WITH (" +
                "  'connector' = 'mysql-cdc'," +
                "  'hostname' = 'hadoop102'," +
                "  'port' = '3306'," +
                "  'username' = 'root'," +
                "  'password' = 'xxxx'," +
                "  'database-name' = 'student'," +
                "  'table-name' = 'table_name'," +
                "'server-time-zone' = 'Asia/Shanghai'," +
                "'scan.startup.mode' = 'initial'" +
                ")"
        );

        // 2. 注冊(cè)SinkTable: sink_sensor
//        tableEnv.executeSql("" +
//                "CREATE TABLE kafka_binlog ( " +
//                "  user_id INT, " +
//                "  user_name STRING, " +
//                "`proc_time` as PROCTIME()" +
//                ") WITH ( " +
//                "  'connector' = 'kafka', " +
//                "  'topic' = 'test2', " +
//                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
//                "  'format' = 'json' " +
//                ")" +
//                "");

        //upsert-kafka
        tableEnv.executeSql("" +
                "CREATE TABLE kafka_binlog ( " +
                "  user_id INT, " +
                "  user_name STRING, " +
                "`proc_time` as PROCTIME()," +
                "  PRIMARY KEY (user_id) NOT ENFORCED" +
                ") WITH ( " +
                "  'connector' = 'upsert-kafka', " +
                "  'topic' = 'test2', " +
                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
                "  'key.format' = 'json' ," +
                "  'value.format' = 'json' " +
                ")" +
                "");


        // 3. 從SourceTable 查詢數(shù)據(jù), 并寫(xiě)入到 SinkTable
         tableEnv.executeSql("insert into kafka_binlog select * from table_name");

         tableEnv.executeSql("select * from kafka_binlog").print();

        env.execute();
    }

}

到了這里,關(guān)于使用finksql方式將mysql數(shù)據(jù)同步到kafka中,每次只能同步一張表的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Canal+Kafka實(shí)現(xiàn)Mysql數(shù)據(jù)同步

    Canal+Kafka實(shí)現(xiàn)Mysql數(shù)據(jù)同步

    canal [k?\\\'n?l] ,譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi) canal可以用來(lái)監(jiān)控?cái)?shù)據(jù)庫(kù)數(shù)據(jù)的變化,從而獲得新增數(shù)據(jù),或者修改的數(shù)據(jù)。 canal是應(yīng)阿里巴巴存在杭州和美國(guó)的雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求而提出的。

    2024年02月12日
    瀏覽(89)
  • cancel框架同步mysql數(shù)據(jù)到kafka

    1、下載cancel 2、修改conf文件夾下的canal.properties配置文件 3、修改conf/example文件夾下的instance.properties配置文件 在sql查詢show binary logs語(yǔ)句得到binlog日志 4、啟動(dòng) 在bin目錄下執(zhí)行 啟動(dòng)程序 注:MySQL需要?jiǎng)?chuàng)建新用戶

    2024年02月15日
    瀏覽(17)
  • 從 MySQL 到 DolphinDB,Debezium + Kafka 數(shù)據(jù)同步實(shí)戰(zhàn)

    從 MySQL 到 DolphinDB,Debezium + Kafka 數(shù)據(jù)同步實(shí)戰(zhàn)

    Debezium 是一個(gè)開(kāi)源的分布式平臺(tái),用于實(shí)時(shí)捕獲和發(fā)布數(shù)據(jù)庫(kù)更改事件。它可以將關(guān)系型數(shù)據(jù)庫(kù)(如 MySQL、PostgreSQL、Oracle 等)的變更事件轉(zhuǎn)化為可觀察的流數(shù)據(jù),以供其他應(yīng)用程序?qū)崟r(shí)消費(fèi)和處理。 本文中我們將采用 Debezium 與 Kafka 組合的方式來(lái)實(shí)現(xiàn)從 MySQL 到 DolphinDB 的數(shù)

    2024年02月02日
    瀏覽(26)
  • 通過(guò)kafka connector實(shí)現(xiàn)mysql數(shù)據(jù)自動(dòng)同步es

    整體思路: 1、使用?io.debezium.connector.mysql.MySqlConnector 自動(dòng)同步數(shù)據(jù)到kafka消息隊(duì)列 2、通過(guò)listener監(jiān)聽(tīng)消息隊(duì)列,代碼控制數(shù)據(jù)插入es ps:其實(shí)有更簡(jiǎn)單的方式:在此基礎(chǔ)上使用ElasticsearchSinkConnector、ksql,完成數(shù)據(jù)的轉(zhuǎn)換與自動(dòng)同步es,全程無(wú)需代碼控制,后續(xù)本地跑通流程后

    2024年02月08日
    瀏覽(24)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數(shù)據(jù)到 Elasticsearch、Kafka

    基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數(shù)據(jù)到 Elasticsearch、Kafka

    Dinky 是一個(gè)開(kāi)箱即用的一站式實(shí)時(shí)計(jì)算平臺(tái)以 Apache Flink 為基礎(chǔ),連接 OLAP 和數(shù)據(jù)湖等眾多框架致力于流批一體和湖倉(cāng)一體的建設(shè)與實(shí)踐。本文以此為FlinkSQL可視化工具。 Flink SQL 使得使用標(biāo)準(zhǔn) SQL 開(kāi)發(fā)流式應(yīng)用變得簡(jiǎn)單,免去代碼開(kāi)發(fā)。 Flink CDC 本文使用 MySQL CDC 連接器 允許從

    2024年02月16日
    瀏覽(19)
  • Doris:MySQL數(shù)據(jù)同步到Doris的N種方式

    Doris:MySQL數(shù)據(jù)同步到Doris的N種方式

    目錄 1.CSV文件方式 1.1 導(dǎo)出mysql數(shù)據(jù) 1.2 導(dǎo)入數(shù)據(jù) 2.JDBC 編碼方式 3.JDBC Catalog 方式 3.1 上傳mysql驅(qū)動(dòng)包 3.2 創(chuàng)建mysql catalog 3.3. 插入數(shù)據(jù) 4.Binlog Load 方式 ????????當(dāng)mysql與doris服務(wù)之間無(wú)法通過(guò)網(wǎng)絡(luò)互聯(lián)時(shí),可以通過(guò)將mysql數(shù)據(jù)導(dǎo)出成csv文件,然后再在doris服務(wù)器導(dǎo)入csv文件的方

    2024年02月04日
    瀏覽(21)
  • 成功解決VScode每次只能打開(kāi)一個(gè)文件,即只能打開(kāi)一個(gè)編輯窗口。

    成功解決VScode每次只能打開(kāi)一個(gè)文件,即只能打開(kāi)一個(gè)編輯窗口。

    點(diǎn)擊文件 -- 首選項(xiàng) -- 設(shè)置 -- 工作臺(tái) -- 編輯管理 -- 取消勾選Enable Preview 如下圖所示: 下拉,取消勾選Enable Preview

    2024年02月16日
    瀏覽(21)
  • Flink讀取數(shù)據(jù)的5種方式(文件,Socket,Kafka,MySQL,自定義數(shù)據(jù)源)

    這是最簡(jiǎn)單的數(shù)據(jù)讀取方式。當(dāng)需要進(jìn)行功能測(cè)試時(shí),可以將數(shù)據(jù)保存在文件中,讀取后驗(yàn)證流處理的邏輯是否符合預(yù)期。 程序代碼: 輸出結(jié)果 用于驗(yàn)證一些通過(guò)Socket傳輸數(shù)據(jù)的場(chǎng)景非常方便。 程序代碼: 測(cè)試時(shí),需要先在 172.16.3.6 的服務(wù)器上啟動(dòng) nc ,然后再啟動(dòng)Flink讀

    2024年02月16日
    瀏覽(21)
  • flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡(jiǎn)單使用

    flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡(jiǎn)單使用

    目錄 一、flink cdc介紹 1、什么是flink cdc 2、flink cdc能用來(lái)做什么 3、flink cdc的優(yōu)點(diǎn) 二、flink cdc基礎(chǔ)使用 1、使用flink cdc讀取txt文本數(shù)據(jù) 2、DataStream的使用方式 3、SQL的方式 總結(jié) flink cdc是一個(gè)由阿里研發(fā)的,一個(gè)可以直接從MySQL、PostgreSQL等數(shù)據(jù)庫(kù)直接讀取全量數(shù)據(jù)和增量變更數(shù)

    2024年02月13日
    瀏覽(26)
  • TiDB數(shù)據(jù)庫(kù)從入門(mén)到精通系列之六:使用 TiCDC 將 TiDB 的數(shù)據(jù)同步到 Apache Kafka

    TiDB數(shù)據(jù)庫(kù)從入門(mén)到精通系列之六:使用 TiCDC 將 TiDB 的數(shù)據(jù)同步到 Apache Kafka

    快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群 創(chuàng)建 changefeed,將 TiDB 增量數(shù)據(jù)輸出至 Kafka 使用 go-tpc 寫(xiě)入數(shù)據(jù)到上游 TiDB 使用 Kafka console consumer 觀察數(shù)據(jù)被寫(xiě)入到指定的 Topic (可選)配置 Flink 集群消費(fèi) Kafka 內(nèi)數(shù)據(jù) 部署包含 TiCDC 的 TiDB 集群 在實(shí)驗(yàn)或測(cè)試環(huán)境中,可以使用 TiU

    2024年02月12日
    瀏覽(20)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包