国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink CDC 實時mysql到mysql

這篇具有很好參考價值的文章主要介紹了Flink CDC 實時mysql到mysql。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

CDC?的全稱是?Change Data Capture?,在廣義的概念上,只要是能捕獲數(shù)據(jù)變更的技術(shù),我們都可以稱之為?CDC?。目前通常描述的?CDC?技術(shù)主要面向數(shù)據(jù)庫的變更,是一種用于捕獲數(shù)據(jù)庫中數(shù)據(jù)變更的技術(shù)。

mysqlcdc需要mysql開啟binlog,找到my.cnf,在[mysqld]中加入如下信息

[mysqld]

server-id=1

log-bin=mysql-bin

binlog-format=row

重啟數(shù)據(jù)庫。

2.創(chuàng)建springboot項目,pom添加依賴

<properties>
<java.version>1.8</java.version>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.6</version>
<!-- <scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.1</version>
</dependency>

</dependencies>

<build>
<plugins>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Flink cdc實現(xiàn)mysql到mysql代碼

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkMysqlToMysql {

public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 創(chuàng)建Table環(huán)境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 注冊源表和目標(biāo)表
tEnv.executeSql("create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表連接器一定得是mysql-cdc
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'database-name' = 'quarant_db',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'password' = 'admin'\n" +
")");
// Table result = tEnv.sqlQuery("SELECT id, name,card_num,phone,address FROM quarantine");
// tEnv.registerTable("sourceTable",result);
tEnv.executeSql("create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目標(biāo)表連接器是jdbc
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'password' = 'admin'\n" +
")");
// 執(zhí)行CDC過程
String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
tEnv.executeSql(query).print();
}
}

運行Main方法

flink實時讀取mysql數(shù)據(jù),flink,大數(shù)據(jù)

Flink會同步源表數(shù)據(jù)到目標(biāo)表,后續(xù)源表的增刪改都會實時同步至目標(biāo)表中。

3.將程序打包成flink jar

idea使用快捷鍵control+alt+shift+s,點擊Artifacts->JAR

flink實時讀取mysql數(shù)據(jù),flink,大數(shù)據(jù)

?選擇Main class,點擊ok

flink實時讀取mysql數(shù)據(jù),flink,大數(shù)據(jù)

?然后選擇上面菜單欄Build Artifacts

flink實時讀取mysql數(shù)據(jù),flink,大數(shù)據(jù)

?點擊build

flink實時讀取mysql數(shù)據(jù),flink,大數(shù)據(jù)

?生成的jar在項目目錄下面有個out目錄

flink實時讀取mysql數(shù)據(jù),flink,大數(shù)據(jù)

至此,flink?jar程序就寫好了,可以把jar丟到flink上運行了?文章來源地址http://www.zghlxwxcb.cn/news/detail-523256.html

到了這里,關(guān)于Flink CDC 實時mysql到mysql的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 使用 Flink CDC 實現(xiàn) MySQL 數(shù)據(jù),表結(jié)構(gòu)實時入 Apache Doris

    現(xiàn)有數(shù)據(jù)庫:mysql 數(shù)據(jù):庫表較多,每個企業(yè)用戶一個分庫,每個企業(yè)下的表均不同,無法做到聚合,且表可以被用戶隨意改動,增刪改列等,增加表 分析:用戶自定義分析,通過拖拽定義圖卡,要求實時,點擊確認即出現(xiàn)相應(yīng)結(jié)果,其中有無法預(yù)判的過濾 問題:隨業(yè)務(wù)增長

    2023年04月08日
    瀏覽(24)
  • Flink CDC 實時mysql到mysql

    Flink CDC 實時mysql到mysql

    CDC?的全稱是?Change Data Capture?,在廣義的概念上,只要是能捕獲數(shù)據(jù)變更的技術(shù),我們都可以稱之為?CDC?。目前通常描述的?CDC?技術(shù)主要面向數(shù)據(jù)庫的變更,是一種用于捕獲數(shù)據(jù)庫中數(shù)據(jù)變更的技術(shù)。 mysqlcdc需要mysql開啟binlog,找到my.cnf,在 [mysqld] 中加入如下信息 [mysqld]

    2024年02月12日
    瀏覽(27)
  • Flink CDC 基于mysql binlog 實時同步mysql表

    Flink CDC 基于mysql binlog 實時同步mysql表

    環(huán)境說明: flink?1.15.2 mysql 版本5.7? ? 注意:需要開啟binlog,因為增量同步是基于binlog捕獲數(shù)據(jù) windows11 IDEA 本地運行 先上官網(wǎng)使用說明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql開啟binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人測試是捕獲不到binlog日志的,增量相

    2024年02月10日
    瀏覽(24)
  • 【Flink CDC(一)】實現(xiàn)mysql整表與增量讀取

    【Flink CDC(一)】實現(xiàn)mysql整表與增量讀取

    MySQL CDC 連接器允許從 MySQL 數(shù)據(jù)庫讀取快照數(shù)據(jù)( 比如:flink任務(wù)消費時刻的整表數(shù)據(jù) )和增量數(shù)據(jù)。本文描述了如何設(shè)置 MySQL CDC 連接器來對 MySQL 數(shù)據(jù)庫運行 SQL 查詢。 本篇只關(guān)注mysql整表與增量讀取的實現(xiàn),對于并發(fā)讀取等能力后續(xù)再探索。 ? 1.1. Maven dependency ? 1.2. SQL

    2024年03月27日
    瀏覽(16)
  • Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)

    Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)

    環(huán)境說明: flink 1.15.2 mysql 版本5.7 ? ?注意:需要開啟binlog,因為增量同步是基于binlog捕獲數(shù)據(jù) windows11 IDEA 本地運行 具體前提設(shè)置,請看這篇,包含 binlog 設(shè)置、Maven...... Flink CDC 基于mysql binlog 實時同步mysql表_彩虹豆的博客-CSDN博客 經(jīng)過不懈努力,終于從阿里help頁面找到了支

    2024年02月08日
    瀏覽(28)
  • Flink CDC2.4 整庫實時同步MySql 到Doris

    ????????Flink 1.15.4? ? ? ? ? 目前有很多工具都支持無代碼實現(xiàn)Mysql - Doris 的實時同步 ? ? ? ? 如:SlectDB 已發(fā)布的功能包 ? ? ? ? ? ? ? ??Dinky?SeaTunnel?TIS?等等 ? ? ? ? ?不過好多要么不支持表結(jié)構(gòu)變動,要不不支持多sink,我們的業(yè)務(wù)必須支持對表結(jié)構(gòu)的實時級變動

    2024年02月11日
    瀏覽(35)
  • 【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    CDC是Change Data Capture的縮寫,中文意思是 變更數(shù)據(jù)獲取 ,flink-cdc的作用是,通過flink捕獲數(shù)據(jù)源的事務(wù)變動操作記錄,包括數(shù)據(jù)的增刪改操作等,根據(jù)這些記錄可作用于對目標(biāo)端進行實時數(shù)據(jù)同步。 下圖是flink-cdc最新支持的數(shù)據(jù)源類型: kafka的數(shù)據(jù)源要通過flink-cdc進行實時數(shù)

    2024年02月12日
    瀏覽(36)
  • 【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對很多初入門的人來說是無法理解cdc到底是什么個東西。 有這樣一個需求,比如在mysql數(shù)據(jù)庫中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉庫(starrocks), 數(shù)據(jù)倉庫你可以理解為存儲了各種各樣來自不同數(shù)據(jù)庫中表。 數(shù)據(jù)的同步目前對

    2023年04月08日
    瀏覽(94)
  • Flink CDC實時同步PG數(shù)據(jù)庫

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git? 1、更改配置文件postgresql.conf # 更改wal日志方式為logical wal_level = logical # minimal, replica, or logical # 更改solts最大數(shù)量(默認值為10),flink-cdc默認一張表占用一個slots max_replication_slots = 20 # m

    2024年02月13日
    瀏覽(35)
  • Flink CDC 實時抽取 Oracle 數(shù)據(jù)-排錯&調(diào)優(yōu)

    Flink CDC 實時抽取 Oracle 數(shù)據(jù)-排錯&調(diào)優(yōu)

    Flink CDC 于 2021 年 11 月 15 日發(fā)布了最新版本 2.1,該版本通過引入內(nèi)置 Debezium 組件,增加了對 Oracle 的支持。對該版本進行試用并成功實現(xiàn)了對 Oracle 的實時數(shù)據(jù)捕獲以及性能調(diào)優(yōu),現(xiàn)將試用過程中的一些關(guān)鍵細節(jié)進行分享。 Oracle:11.2.0.4.0(RAC 部署) Flink:1.13.1 Hadoop:3.2.1

    2024年01月16日
    瀏覽(34)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包