国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink CDC 基于mysql binlog 實(shí)時(shí)同步mysql表

這篇具有很好參考價(jià)值的文章主要介紹了Flink CDC 基于mysql binlog 實(shí)時(shí)同步mysql表。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

環(huán)境說(shuō)明:

flink?1.15.2

mysql 版本5.7? ? 注意:需要開(kāi)啟binlog,因?yàn)樵隽客绞腔赽inlog捕獲數(shù)據(jù)

windows11 IDEA 本地運(yùn)行

先上官網(wǎng)使用說(shuō)明和案例:MySQL CDC Connector — Flink CDC documentation

1. mysql開(kāi)啟binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人測(cè)試是捕獲不到binlog日志的,增量相當(dāng)于沒(méi)用,不知道是不是ndbcluster 下的binlog 配置是否有問(wèn)題,但是同一集群下,InnoDB的表就可以捕獲到binlog日志。聽(tīng)朋友說(shuō),ndbcluster 是內(nèi)存型引擎,有可能不會(huì)實(shí)時(shí)寫日志到磁盤,所以捕獲不到.....)

# 判斷MySQL是否已經(jīng)開(kāi)啟binlog? ?on? 為打開(kāi)狀態(tài)
SHOW VARIABLES LIKE 'log_bin'; ? ?

# 查看MySQL的binlog模式
show global variables like "binlog%";

# 查看日志開(kāi)啟狀態(tài)?
show variables like 'log_%';

# 刷新log日志,立刻產(chǎn)生一個(gè)新編號(hào)的binlog日志文件,跟重啟一個(gè)效果?
flush logs;

# 清空所有binlog日志?
reset master;

2. 創(chuàng)建一個(gè)用戶,賦權(quán)

CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'flink@cdc';
GRANT ALL PRIVILEGES ON *.* TO 'flink_cdc_user'@'%';

3. maven依賴:

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.15.2</flink.version>
</properties>
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
            <!--            此標(biāo)簽會(huì)移除jar包,當(dāng)需要打包到集群運(yùn)行時(shí)加上此標(biāo)簽-->
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.15.2</version>
            <!--<scope>provided</scope>-->
            <!--此標(biāo)簽會(huì)移除jar包,當(dāng)需要打包到集群運(yùn)行時(shí)加上此標(biāo)簽-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

    </dependencies>

4. 若是打包到集群運(yùn)行,相關(guān)依賴要放開(kāi) provided,這樣就不會(huì)把依賴打入到j(luò)ar包里面,就不會(huì)和flink lib里面的jar包沖突。

lib 里面需要加入的包:從官網(wǎng)下載,放入即可

flink-connector-jdbc-1.15.4.jar

flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

flink-sql-connector-mysql-cdc-2.3.0.jar

mysql-connector-java-8.0.29.jar

commons-cli-1.5.0.jar

5.mysql建表如下:

#mysql建表:

CREATE TABLE `user` (
? `id` int(11) NOT NULL,
? `username` varchar(255) DEFAULT NULL,
? `password` varchar(255) DEFAULT NULL,
? PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `user_sink` (
? `id` int(11) NOT NULL,
? `username` varchar(255) DEFAULT NULL,
? `password` varchar(255) DEFAULT NULL,
? PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

6.測(cè)試demo如下:

package com.xgg.flink.stream.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class MysqlToMysqlHavePrimaryKey {
    public static void main(String[] args) {
        //1.獲取stream的執(zhí)行環(huán)境
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        //2.創(chuàng)建表執(zhí)行環(huán)境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);

        String sourceTable = "CREATE TABLE mysql_cdc_source (" +
                "  id INT,\n" +
                "  username STRING,\n" +
                "  password STRING,\n" +
                "PRIMARY KEY(id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "'connector' = 'mysql-cdc',\n" +
                "'hostname' = 'localhost',\n" +
                "'port' = '3306',\n" +
                "'username' = 'root',\n" +
                "'password' = 'root',\n" +
                "'database-name' = 'test_cdc',\n" +
                "'debezium.snapshot.mode' = 'initial',\n" +
                "'table-name' = 'user'\n" +
                ")";
        tEnv.executeSql(sourceTable);
        String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
                "  id INT,\n" +
                "  username STRING,\n" +
                "  password STRING,\n" +
                "PRIMARY KEY(id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "'url' = 'jdbc:mysql://localhost:3306/test_cdc?rewriteBatchedStatements=true',\n" +
                "'username' = 'root',\n" +
                "'password' = 'root',\n" +
                "'table-name' = 'user_sink'\n" +
                ")";

        tEnv.executeSql(sinkTable);
        tEnv.executeSql("insert into mysql_cdc_sink select id,username,password from mysql_cdc_source");
        tEnv.executeSql("select * from mysql_cdc_source").print();


    }
}

源表進(jìn)行操作,flink cdc 捕獲操作記錄進(jìn)行打印,然后插入到表中。(mysql的cdc可以一邊打印,一邊寫表,無(wú)問(wèn)題。oracle的cdc,如果有多個(gè)執(zhí)行操作,就會(huì)只執(zhí)行一個(gè),比如,先打印再寫表,oracle只能打印,寫表操作就不會(huì)觸發(fā)。如果不打印,只寫表,那就沒(méi)問(wèn)題。好像和senv.setParallelism(1);沒(méi)關(guān)系,應(yīng)該還是底層實(shí)現(xiàn)的問(wèn)題。)

flinkcdc mysql版本,flink,mysql,數(shù)據(jù)庫(kù)

user 源表和目標(biāo)表 user_sink,數(shù)據(jù)都如下。

flinkcdc mysql版本,flink,mysql,數(shù)據(jù)庫(kù)

?源表和目標(biāo)表都是在Mysql有主鍵的,所以找個(gè)參數(shù)雖然是初始化操作,后面插入也是 insert into ,但是不管執(zhí)行多少遍,都不會(huì)有重復(fù)的數(shù)據(jù)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-691039.html

"'debezium.snapshot.mode' = 'initial',\n" +
?rewriteBatchedStatements=true 這個(gè)參數(shù)是開(kāi)啟批量寫,能加大寫速度。

到了這里,關(guān)于Flink CDC 基于mysql binlog 實(shí)時(shí)同步mysql表的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql(無(wú)主鍵)

    環(huán)境說(shuō)明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運(yùn)行 具體環(huán)境設(shè)置和maven依賴請(qǐng)看上篇:Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 現(xiàn)在操作的是源表和目標(biāo)表都無(wú)主鍵數(shù)

    2024年02月15日
    瀏覽(30)
  • 基于 Flink CDC 的實(shí)時(shí)同步系統(tǒng)

    基于 Flink CDC 的實(shí)時(shí)同步系統(tǒng)

    摘要: 本文整理自科杰科技大數(shù)據(jù)架構(gòu)師張軍,在 FFA 2022 數(shù)據(jù)集成專場(chǎng)的分享。本篇內(nèi)容主要分為四個(gè)部分: 功能概述 架構(gòu)設(shè)計(jì) 技術(shù)挑戰(zhàn) 生產(chǎn)實(shí)踐 Tips: 點(diǎn)擊 「閱讀原文」 查看原文視頻演講 ppt 科杰科技是專門做大數(shù)據(jù)服務(wù)的供應(yīng)商,目前的客戶包括能源、金融、證券等

    2024年02月05日
    瀏覽(31)
  • FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)

    FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)

    本文介紹了? 來(lái)源單表-目標(biāo)源單表同步,多來(lái)源單表-目標(biāo)源單表同步。 注:1.16版本、1.17版本都可以使用火焰圖,生產(chǎn)上最好關(guān)閉,詳情見(jiàn)文章末尾 Flink版本:1.16.2 環(huán)境:Linux CentOS 7.0、jdk1.8 基礎(chǔ)文件: flink-1.16.2-bin-scala_2.12.tgz、 flink-connector-jdbc-3.0.0-1.16.jar、(maven倉(cāng)庫(kù)目錄:

    2024年02月11日
    瀏覽(22)
  • Flink CDC2.4 整庫(kù)實(shí)時(shí)同步MySql 到Doris

    ????????Flink 1.15.4? ? ? ? ? 目前有很多工具都支持無(wú)代碼實(shí)現(xiàn)Mysql - Doris 的實(shí)時(shí)同步 ? ? ? ? 如:SlectDB 已發(fā)布的功能包 ? ? ? ? ? ? ? ??Dinky?SeaTunnel?TIS?等等 ? ? ? ? ?不過(guò)好多要么不支持表結(jié)構(gòu)變動(dòng),要不不支持多sink,我們的業(yè)務(wù)必須支持對(duì)表結(jié)構(gòu)的實(shí)時(shí)級(jí)變動(dòng)

    2024年02月11日
    瀏覽(35)
  • 使用Flink CDC將Mysql中的數(shù)據(jù)實(shí)時(shí)同步到ES

    最近公司要搞搜索,需要把mysql中的數(shù)據(jù)同步到es中來(lái)進(jìn)行搜索,由于公司已經(jīng)搭建了flink集群,就打算用flink來(lái)做這個(gè)同步。本來(lái)以為很簡(jiǎn)單,跟著官網(wǎng)文檔走就好了,結(jié)果沒(méi)想到折騰了將近一周的時(shí)間…… 我也是沒(méi)想到,這玩意網(wǎng)上資源竟然這么少,找到的全部都是通過(guò)

    2024年02月11日
    瀏覽(25)
  • 基于Flink CDC實(shí)時(shí)同步PostgreSQL與Tidb【Flink SQL Client模式下親測(cè)可行,詳細(xì)教程】

    操作系統(tǒng):ubuntu-22.04,運(yùn)行于wsl 2【 注意,請(qǐng)務(wù)必使用wsl 2 ;wsl 1會(huì)出現(xiàn)各種各樣的問(wèn)題】 軟件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳過(guò)此步 (1)pg安裝 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出現(xiàn)的問(wèn)題 sudo -u postgres psql 報(bào)錯(cuò): psql: err

    2024年02月11日
    瀏覽(29)
  • 基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    Flink CDC有兩種方式同步數(shù)據(jù)庫(kù): 一種是通過(guò)FlinkSQL直接輸入兩表數(shù)據(jù)庫(kù)映射進(jìn)行數(shù)據(jù)同步,缺點(diǎn)是只能單表進(jìn)行同步; 一種是通過(guò)DataStream開(kāi)發(fā)一個(gè)maven項(xiàng)目,打成jar包上傳到服務(wù)器運(yùn)行。 本方案使用FlinkSQL方法,同步兩表中的數(shù)據(jù)。 其中Flink應(yīng)用可以部署在具有公網(wǎng)IP的服務(wù)

    2023年04月11日
    瀏覽(27)
  • 基于大數(shù)據(jù)平臺(tái)(XSailboat)的計(jì)算管道實(shí)現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    基于大數(shù)據(jù)平臺(tái)(XSailboat)的計(jì)算管道實(shí)現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    筆者在先前的一篇文檔《數(shù)據(jù)標(biāo)簽設(shè)計(jì) – 大數(shù)據(jù)平臺(tái)(XSailboat)的數(shù)據(jù)標(biāo)簽?zāi)K》 提到了關(guān)于數(shù)據(jù)標(biāo)簽的模塊,現(xiàn)已實(shí)現(xiàn)并應(yīng)用于項(xiàng)目中。在項(xiàng)目中遇到這樣一種情形: 如果打標(biāo)信息和業(yè)務(wù)數(shù)據(jù)是在一個(gè)數(shù)據(jù)庫(kù)實(shí)例中,那么只需要連接兩張表進(jìn)行查詢即可。但是數(shù)據(jù)標(biāo)簽作為

    2024年01月17日
    瀏覽(35)
  • Flink CDC獲取mysql 主從分庫(kù),分庫(kù)分表的binlog

    Flink CDC可以獲取MySQL主從分庫(kù),分庫(kù)分表的binlog,但是需要注意以下幾點(diǎn): Flink CDC需要配置MySQL的binlog模式為row,以及開(kāi)啟GTID(全局事務(wù)標(biāo)識(shí)符),以便正確地識(shí)別和處理binlog事件 Flink CDC需要配置MySQL的主從復(fù)制關(guān)系,以及指定主庫(kù)或從庫(kù)的地址,以便正確地連接和讀取bin

    2024年02月11日
    瀏覽(19)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數(shù)據(jù)到 Elasticsearch、Kafka

    基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數(shù)據(jù)到 Elasticsearch、Kafka

    Dinky 是一個(gè)開(kāi)箱即用的一站式實(shí)時(shí)計(jì)算平臺(tái)以 Apache Flink 為基礎(chǔ),連接 OLAP 和數(shù)據(jù)湖等眾多框架致力于流批一體和湖倉(cāng)一體的建設(shè)與實(shí)踐。本文以此為FlinkSQL可視化工具。 Flink SQL 使得使用標(biāo)準(zhǔn) SQL 開(kāi)發(fā)流式應(yīng)用變得簡(jiǎn)單,免去代碼開(kāi)發(fā)。 Flink CDC 本文使用 MySQL CDC 連接器 允許從

    2024年02月16日
    瀏覽(19)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包