国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)

這篇具有很好參考價值的文章主要介紹了Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

環(huán)境說明:

flink 1.15.2

mysql 版本5.7 ? ?注意:需要開啟binlog,因為增量同步是基于binlog捕獲數(shù)據(jù)

windows11 IDEA 本地運行

具體前提設(shè)置,請看這篇,包含 binlog 設(shè)置、Maven......

Flink CDC 基于mysql binlog 實時同步mysql表_彩虹豆的博客-CSDN博客

經(jīng)過不懈努力,終于從阿里help頁面找到了支持無主鍵同步的參數(shù):

MySQL_實時計算 Flink版-阿里云幫助中心

Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵),mysql,flink,數(shù)據(jù)庫

?然后就開始一頓模式,各種參數(shù)調(diào)試,終于達到了目的,無主鍵表實時同步,只不過在sink表關(guān)聯(lián)目標表時,要指定幾個字段為主鍵,這樣就不會有重復(fù)的覆蓋情況了,多給幾個字段作為主鍵,不就避免重復(fù)沖突了嘛。比如id+date+local等,具體看表字段。

demo如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class MysqlToMysqlNonePrimaryKey {

    public static void main(String[] args) {
        //1.獲取stream的執(zhí)行環(huán)境
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        //2.創(chuàng)建表執(zhí)行環(huán)境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv, settings);

        String sourceTable = "CREATE TABLE mysql_cdc_source (" +
                "  id INT,\n" +
                "  username STRING,\n" +
                "  password STRING\n" +
                ") WITH (\n" +
                "'connector' = 'mysql-cdc',\n" +
                "'hostname' = 'localhost',\n" +
                "'port' = '3306',\n" +
                "'username' = 'root',\n" +
                "'password' = 'root',\n" +
                "'database-name' = 'test_cdc',\n" +
                "'debezium.snapshot.mode' = 'initial',\n" +
                "'scan.incremental.snapshot.enabled' = 'false',\n" +
                //如果開啟增量快照,必須設(shè)置主鍵。
                //默認開啟增量快照。增量快照是一種讀取全量數(shù)據(jù)快照的新機制。與舊的快照讀取相比,增量快照有很多優(yōu)點,包括:
                //讀取全量數(shù)據(jù)時,Source可以是并行讀取。
                //讀取全量數(shù)據(jù)時,Source支持chunk粒度的檢查點。
                //讀取全量數(shù)據(jù)時,Source不需要獲取全局讀鎖(FLUSH TABLES WITH read lock)。
                //如果您希望Source支持并發(fā)讀取,每個并發(fā)的Reader需要有一個唯一的服務(wù)器ID,因此server-id必須是5400-6400這樣的范圍,并且范圍必須大于等于并發(fā)數(shù)。
                "'scan.incremental.snapshot.chunk.key-column' = 'id' ,\n" +
                //可以指定某一列作為快照階段切分分片的切分列。無主鍵表必填,選擇的列必須是非空類型(NOT NULL)。
                //有主鍵的表為選填,僅支持從主鍵中選擇一列。
                "  'table-name' = 'user'\n" +
                ")";
        tEnv.executeSql(sourceTable);
//        tEnv.executeSql("select * from mysql_cdc_source").print();
        String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
                "  id INT,\n" +
                "  username STRING,\n" +
                "  password STRING\n" +
                "  ,PRIMARY KEY (id,username,password) NOT ENFORCED\n" +
                ") WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "'url' = 'jdbc:mysql://localhost:3306/" + "test_cdc" + "?rewriteBatchedStatements=true',\n" +
                "'username' = 'root',\n" +
                "'password' = 'root',\n" +
                "'table-name' = 'user_new'\n" +
                ")";
        tEnv.executeSql(sinkTable);
        tEnv.executeSql("insert into mysql_cdc_sink select id,username,password from mysql_cdc_source");
    }
}

由于無主鍵,?debezium.snapshot.mode' = 'initial',這個參數(shù)會導(dǎo)致,程序運行幾次,源表數(shù)據(jù)就會同步幾次到目標表,并不會去重,如果想一直這個參數(shù)運行,需要在插入前先清空表,但是如果是數(shù)據(jù)量大的,推薦還是先用這個參數(shù)同步歷史數(shù)據(jù),完成后,再改為?schema_only,啟動程序,然后把上面一個程序干掉。

上面設(shè)置的主鍵是三個字段,id、username、password,這三個字段不能為null,如果有數(shù)據(jù)為null,程序在啟動的時候,就會報錯,雖然沒有打印到控制臺上,但是可以看到控制臺程序結(jié)束了,不是一直在運行,并且數(shù)據(jù)也是同步不過去的。所以挑選主鍵字段時一定要確定此字段一定不為null,如果為null的話,就需要能接受轉(zhuǎn)換處理,比如:varchar 類型 將null值轉(zhuǎn)換為空字符串

insert into mysql_cdc_sink select case when id is null then 0 else id end,case when username is null then '' else username? end,case when password is null then '' else password end from mysql_cdc_source

具體如何處理,還看業(yè)務(wù)需求。不過,在數(shù)據(jù)同步時,盡量要做到不對數(shù)據(jù)做任何變動。如果是可以加入清洗,那就隨便玩。

具體數(shù)據(jù)變化時同步的情況還需自行探索。文章來源地址http://www.zghlxwxcb.cn/news/detail-718830.html

到了這里,關(guān)于Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    這篇教程將展示如何基于 Flink CDC 快速構(gòu)建 MySQL 到 Databend 的實時數(shù)據(jù)同步。本教程的演示都將在 Flink SQL CLI 中進行,只涉及 SQL,無需一行 Java/Scala 代碼,也無需安裝 IDE。 假設(shè)我們有電子商務(wù)業(yè)務(wù),商品的數(shù)據(jù)存儲在 MySQL ,我們需要實時把它同步到 Databend 中。 接下來的內(nèi)容

    2024年02月10日
    瀏覽(29)
  • Flink CDC 基于Oracle log archiving 實時同步Oracle表到Mysql

    Flink CDC 基于Oracle log archiving 實時同步Oracle表到Mysql

    環(huán)境說明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運行 先上官網(wǎng)使用說明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 開啟 log archiving (1).啟用 log archiving ?? ??? ?a:以DBA用戶連接數(shù)據(jù)庫? ??

    2024年02月11日
    瀏覽(44)
  • 基于 Flink CDC 的實時同步系統(tǒng)

    基于 Flink CDC 的實時同步系統(tǒng)

    摘要: 本文整理自科杰科技大數(shù)據(jù)架構(gòu)師張軍,在 FFA 2022 數(shù)據(jù)集成專場的分享。本篇內(nèi)容主要分為四個部分: 功能概述 架構(gòu)設(shè)計 技術(shù)挑戰(zhàn) 生產(chǎn)實踐 Tips: 點擊 「閱讀原文」 查看原文視頻演講 ppt 科杰科技是專門做大數(shù)據(jù)服務(wù)的供應(yīng)商,目前的客戶包括能源、金融、證券等

    2024年02月05日
    瀏覽(31)
  • Flink CDC2.4 整庫實時同步MySql 到Doris

    ????????Flink 1.15.4? ? ? ? ? 目前有很多工具都支持無代碼實現(xiàn)Mysql - Doris 的實時同步 ? ? ? ? 如:SlectDB 已發(fā)布的功能包 ? ? ? ? ? ? ? ??Dinky?SeaTunnel?TIS?等等 ? ? ? ? ?不過好多要么不支持表結(jié)構(gòu)變動,要不不支持多sink,我們的業(yè)務(wù)必須支持對表結(jié)構(gòu)的實時級變動

    2024年02月11日
    瀏覽(35)
  • 使用Flink CDC將Mysql中的數(shù)據(jù)實時同步到ES

    最近公司要搞搜索,需要把mysql中的數(shù)據(jù)同步到es中來進行搜索,由于公司已經(jīng)搭建了flink集群,就打算用flink來做這個同步。本來以為很簡單,跟著官網(wǎng)文檔走就好了,結(jié)果沒想到折騰了將近一周的時間…… 我也是沒想到,這玩意網(wǎng)上資源竟然這么少,找到的全部都是通過

    2024年02月11日
    瀏覽(25)
  • 基于Flink CDC實時同步PostgreSQL與Tidb【Flink SQL Client模式下親測可行,詳細教程】

    操作系統(tǒng):ubuntu-22.04,運行于wsl 2【 注意,請務(wù)必使用wsl 2 ;wsl 1會出現(xiàn)各種各樣的問題】 軟件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳過此步 (1)pg安裝 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出現(xiàn)的問題 sudo -u postgres psql 報錯: psql: err

    2024年02月11日
    瀏覽(30)
  • 基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    Flink CDC有兩種方式同步數(shù)據(jù)庫: 一種是通過FlinkSQL直接輸入兩表數(shù)據(jù)庫映射進行數(shù)據(jù)同步,缺點是只能單表進行同步; 一種是通過DataStream開發(fā)一個maven項目,打成jar包上傳到服務(wù)器運行。 本方案使用FlinkSQL方法,同步兩表中的數(shù)據(jù)。 其中Flink應(yīng)用可以部署在具有公網(wǎng)IP的服務(wù)

    2023年04月11日
    瀏覽(27)
  • 基于大數(shù)據(jù)平臺(XSailboat)的計算管道實現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    基于大數(shù)據(jù)平臺(XSailboat)的計算管道實現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    筆者在先前的一篇文檔《數(shù)據(jù)標簽設(shè)計 – 大數(shù)據(jù)平臺(XSailboat)的數(shù)據(jù)標簽?zāi)K》 提到了關(guān)于數(shù)據(jù)標簽的模塊,現(xiàn)已實現(xiàn)并應(yīng)用于項目中。在項目中遇到這樣一種情形: 如果打標信息和業(yè)務(wù)數(shù)據(jù)是在一個數(shù)據(jù)庫實例中,那么只需要連接兩張表進行查詢即可。但是數(shù)據(jù)標簽作為

    2024年01月17日
    瀏覽(35)
  • Flink CDC獲取mysql 主從分庫,分庫分表的binlog

    Flink CDC可以獲取MySQL主從分庫,分庫分表的binlog,但是需要注意以下幾點: Flink CDC需要配置MySQL的binlog模式為row,以及開啟GTID(全局事務(wù)標識符),以便正確地識別和處理binlog事件 Flink CDC需要配置MySQL的主從復(fù)制關(guān)系,以及指定主庫或從庫的地址,以便正確地連接和讀取bin

    2024年02月11日
    瀏覽(19)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數(shù)據(jù)到 Elasticsearch、Kafka

    基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數(shù)據(jù)到 Elasticsearch、Kafka

    Dinky 是一個開箱即用的一站式實時計算平臺以 Apache Flink 為基礎(chǔ),連接 OLAP 和數(shù)據(jù)湖等眾多框架致力于流批一體和湖倉一體的建設(shè)與實踐。本文以此為FlinkSQL可視化工具。 Flink SQL 使得使用標準 SQL 開發(fā)流式應(yīng)用變得簡單,免去代碼開發(fā)。 Flink CDC 本文使用 MySQL CDC 連接器 允許從

    2024年02月16日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包