国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)

這篇具有很好參考價(jià)值的文章主要介紹了基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、環(huán)境

  • jdk8
  • Flink 1.16.1(部署在遠(yuǎn)程服務(wù)器:192.168.137.99)
  • Flink CDC 2.3.0
  • MySQL 8.0(安裝在本地:192.168.3.31)
    (安裝部署過程略)

二、準(zhǔn)備

準(zhǔn)備三個(gè)數(shù)據(jù)庫:flink_source、flink_sink、flink_sink_second。
將flink_source.source_test表實(shí)時(shí)同步到flink_sink和flink_sink_second的sink_test表。
(建庫建表過程略)

三、Flink SQL Client上開發(fā)SQL作業(yè)

開發(fā)過程可以參考Flink CDC官網(wǎng)的例子(譬如:基于 Flink CDC 構(gòu)建 MySQL 和 Postgres 的 Streaming ETL)。
這里介紹的是MySQL到MySQL的同步,稍有不同,需要下載JDBC SQL連接器的依賴包(下載地址,使用方法),放到目錄 {flink-1.16.1}/lib/ 下,用于使用JDBC SQL連接器連接MySQL的sink庫。

?注意下面兩個(gè)jar的區(qū)別,是需要將flink-sql-connector-mysql-cdc-2.3.0.jar放到{flink-1.16.1}/lib/ 下。由于網(wǎng)絡(luò)限制,本人最初是通過maven下載的flink-connector-mysql-cdc-2.3.0.jar,導(dǎo)致了各種找不到類的異常。
flink-sql-connector-mysql-cdc-2.3.0.jar(包含有依賴的jar)
flink-connector-mysql-cdc-2.3.0.jar(不包含依賴jar)

flink-sql-connector-mysql-cdc-2.3.0.jar:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>2.3.0</version>
</dependency>

flink-connector-mysql-cdc-2.3.0.jar:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.3.0</version>
</dependency>

使用下面的命令啟動(dòng) Flink SQL CLI

./bin/sql-client.sh

然后使用下面的建表語句創(chuàng)建Flink表,以及將flink_source.source_test表實(shí)時(shí)同步到flink_sink、flink_sink_second的sink_test表。

Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.

Flink SQL> CREATE TABLE source_test (
>   user_id STRING,
>   user_name STRING,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>    'connector' = 'mysql-cdc',
>    'hostname' = '192.168.3.31',
>    'port' = '3306',
>    'username' = 'root',
>    'password' = '******',
>    'database-name' = 'flink_source',
>    'table-name' = 'source_test'
> );
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE sink_test (
>   user_id STRING,
>   user_name STRING,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink',
>    'driver' = 'com.mysql.cj.jdbc.Driver',
>    'username' = 'root',
>    'password' = '******',
>    'table-name' = 'sink_test'
> );
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE sink_test_second (
>   user_id STRING,
>   user_name STRING,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink_second',
>    'driver' = 'com.mysql.cj.jdbc.Driver',
>    'username' = 'root',
>    'password' = '******',
>    'table-name' = 'sink_test'
> );
[INFO] Execute statement succeed.

Flink SQL> insert into sink_test select * from source_test;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 0c49758cc251699f0b4acd6c9f735e6e


Flink SQL> insert into sink_test_second select * from source_test;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: ecea685a715d7d40ee1a94aac3236c18


Flink SQL> 

訪問Flink web:http://192.168.137.99:8081/#/job/running,可以看到已經(jīng)新建了兩個(gè)作業(yè)。
基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)

四、使用本機(jī)執(zhí)行環(huán)境開發(fā)SQL作業(yè)

直接貼代碼。
pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.16.1</flink.version>
        <flink-cdc.version>2.3.0</flink-cdc.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- mysql-cdc fat jar -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>
        <!--<dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>-->

    </dependencies>

</project>

FlinkDemo.java:

package org.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = null;
        // 本機(jī)執(zhí)行環(huán)境
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 遠(yuǎn)程執(zhí)行環(huán)境
//        env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.137.99", 8081);
        env.enableCheckpointing(3000l).setParallelism(2);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        // CREATE TABLE source_test
        tableEnv.executeSql(sourceDDL());
        // CREATE TABLE sink_test
        tableEnv.executeSql(sinkDDL());
        // CREATE TABLE sink_test_second
        tableEnv.executeSql(sinkDDLOfSecondDb());
        // 將source_test同步到sink_test和sink_test_second
        tableEnv.getConfig().set("pipeline.name", "Flink Demo - To sink_test");         // 設(shè)置job名稱
        tableEnv.executeSql("insert into sink_test select * from source_test;");
        tableEnv.getConfig().set("pipeline.name", "Flink Demo - To sink_test_second");  // 設(shè)置job名稱
        tableEnv.executeSql("insert into sink_test_second select * from source_test;");

    }

    public static String sourceDDL() {
        String sourceDDL = "CREATE TABLE source_test (\n" +
                "  user_id STRING,\n" +
                "  user_name STRING,\n" +
                "  PRIMARY KEY (user_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "   'connector' = 'mysql-cdc',\n" +
                "   'hostname' = '192.168.3.31',\n" +
                "   'port' = '3306',\n" +
                "   'username' = 'root',\n" +
                "   'password' = '******',\n" +
                "   'database-name' = 'flink_source',\n" +
                "   'table-name' = 'source_test'\n" +
                ");";
        return sourceDDL;
    }

    public static String sinkDDL() {
        String sinkDDL = "CREATE TABLE sink_test (\n" +
                "  user_id STRING,\n" +
                "  user_name STRING,\n" +
                "  PRIMARY KEY (user_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "   'connector' = 'jdbc',\n" +
                "   'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink',\n" +
                "   'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "   'username' = 'root',\n" +
                "   'password' = '******',\n" +
                "   'table-name' = 'sink_test'\n" +
                ");";
        return sinkDDL;
    }

    public static String sinkDDLOfSecondDb() {
        String sinkDDL = "CREATE TABLE sink_test_second (\n" +
                "  user_id STRING,\n" +
                "  user_name STRING,\n" +
                "  PRIMARY KEY (user_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "   'connector' = 'jdbc',\n" +
                "   'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink_second',\n" +
                "   'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "   'username' = 'root',\n" +
                "   'password' = '******',\n" +
                "   'table-name' = 'sink_test'\n" +
                ");";
        return sinkDDL;
    }
}

五、遠(yuǎn)程執(zhí)行環(huán)境開發(fā)SQL作業(yè)

提交SQL作業(yè)到遠(yuǎn)程Flink集群上執(zhí)行,需要使用下面的方法創(chuàng)建執(zhí)行環(huán)境:

StreamExecutionEnvironment.createRemoteEnvironment("192.168.137.99", 8081);

而本機(jī)執(zhí)行是:

StreamExecutionEnvironment.getExecutionEnvironment();

上面的FlinkDemo.java,修改執(zhí)行環(huán)境后,運(yùn)行代碼提交作業(yè):
基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)

另外在遠(yuǎn)程執(zhí)行的時(shí)候,必須使用flink-sql-connector-mysql-cdc,不能使用flink-connector-mysql-cdc。否則會(huì)報(bào)錯(cuò):Caused by: java.io.StreamCorruptedException: unexpected block data文章來源地址http://www.zghlxwxcb.cn/news/detail-458683.html

"C:\Program Files\Java\jdk1.8.0_351\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2022.3.3\lib\idea_rt.jar=53061:C:\Program Files\JetBrains\IntelliJ IDEA 2022.3.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_351\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\rt.jar;E:\java\code\FlinkDemo\target\classes;E:\java\.m2\repository\org\apache\flink\flink-java\1.16.1\flink-java-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-core\1.16.1\flink-core-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-annotations\1.16.1\flink-annotations-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-metrics-core\1.16.1\flink-metrics-core-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-asm-9\9.2-15.0\flink-shaded-asm-9-9.2-15.0.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-jackson\2.12.4-15.0\flink-shaded-jackson-2.12.4-15.0.jar;E:\java\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;E:\java\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\java\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;E:\java\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;E:\java\.m2\repository\org\apache\commons\commons-compress\1.21\commons-compress-1.21.jar;E:\java\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;E:\java\.m2\repository\org\apache\commons\commons-math3\3.6.1\commons-math3-3.6.1.jar;E:\java\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;E:\java\.m2\repository\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;E:\java\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-force-shading\15.0\flink-shaded-force-shading-15.0.jar;E:\java\.m2\repository\org\apache\flink\flink-clients\1.16.1\flink-clients-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-runtime\1.16.1\flink-runtime-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-rpc-core\1.16.1\flink-rpc-core-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-rpc-akka-loader\1.16.1\flink-rpc-akka-loader-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-queryable-state-client-java\1.16.1\flink-queryable-state-client-java-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-hadoop-fs\1.16.1\flink-hadoop-fs-1.16.1.jar;E:\java\.m2\repository\commons-io\commons-io\2.11.0\commons-io-2.11.0.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.70.Final-15.0\flink-shaded-netty-4.1.70.Final-15.0.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-zookeeper-3\3.5.9-15.0\flink-shaded-zookeeper-3-3.5.9-15.0.jar;E:\java\.m2\repository\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;E:\java\.m2\repository\org\xerial\snappy\snappy-java\1.1.8.3\snappy-java-1.1.8.3.jar;E:\java\.m2\repository\org\lz4\lz4-java\1.8.0\lz4-java-1.8.0.jar;E:\java\.m2\repository\org\apache\flink\flink-optimizer\1.16.1\flink-optimizer-1.16.1.jar;E:\java\.m2\repository\commons-cli\commons-cli\1.5.0\commons-cli-1.5.0.jar;E:\java\.m2\repository\org\apache\flink\flink-streaming-java\1.16.1\flink-streaming-java-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-file-sink-common\1.16.1\flink-file-sink-common-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-guava\30.1.1-jre-15.0\flink-shaded-guava-30.1.1-jre-15.0.jar;E:\java\.m2\repository\org\apache\flink\flink-table-api-java-bridge\1.16.1\flink-table-api-java-bridge-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-api-java\1.16.1\flink-table-api-java-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-api-bridge-base\1.16.1\flink-table-api-bridge-base-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-planner-loader\1.16.1\flink-table-planner-loader-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-runtime\1.16.1\flink-table-runtime-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-common\1.16.1\flink-table-common-1.16.1.jar;E:\java\.m2\repository\com\ibm\icu\icu4j\67.1\icu4j-67.1.jar;E:\java\.m2\repository\org\apache\flink\flink-cep\1.16.1\flink-cep-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-connector-base\1.16.1\flink-connector-base-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-connector-jdbc\1.16.1\flink-connector-jdbc-1.16.1.jar;E:\java\.m2\repository\com\ververica\flink-connector-mysql-cdc\2.3.0\flink-connector-mysql-cdc-2.3.0.jar;E:\java\.m2\repository\com\ververica\flink-connector-debezium\2.3.0\flink-connector-debezium-2.3.0.jar;E:\java\.m2\repository\io\debezium\debezium-api\1.6.4.Final\debezium-api-1.6.4.Final.jar;E:\java\.m2\repository\io\debezium\debezium-embedded\1.6.4.Final\debezium-embedded-1.6.4.Final.jar;E:\java\.m2\repository\org\apache\kafka\connect-api\2.7.1\connect-api-2.7.1.jar;E:\java\.m2\repository\org\apache\kafka\kafka-clients\2.7.1\kafka-clients-2.7.1.jar;E:\java\.m2\repository\com\github\luben\zstd-jni\1.4.5-6\zstd-jni-1.4.5-6.jar;E:\java\.m2\repository\javax\ws\rs\javax.ws.rs-api\2.1.1\javax.ws.rs-api-2.1.1.jar;E:\java\.m2\repository\org\apache\kafka\connect-runtime\2.7.1\connect-runtime-2.7.1.jar;E:\java\.m2\repository\org\apache\kafka\kafka-tools\2.7.1\kafka-tools-2.7.1.jar;E:\java\.m2\repository\net\sourceforge\argparse4j\argparse4j\0.7.0\argparse4j-0.7.0.jar;E:\java\.m2\repository\org\apache\kafka\connect-transforms\2.7.1\connect-transforms-2.7.1.jar;E:\java\.m2\repository\com\fasterxml\jackson\jaxrs\jackson-jaxrs-json-provider\2.10.5\jackson-jaxrs-json-provider-2.10.5.jar;E:\java\.m2\repository\com\fasterxml\jackson\jaxrs\jackson-jaxrs-base\2.10.5\jackson-jaxrs-base-2.10.5.jar;E:\java\.m2\repository\com\fasterxml\jackson\module\jackson-module-jaxb-annotations\2.10.5\jackson-module-jaxb-annotations-2.10.5.jar;E:\java\.m2\repository\jakarta\xml\bind\jakarta.xml.bind-api\2.3.2\jakarta.xml.bind-api-2.3.2.jar;E:\java\.m2\repository\jakarta\activation\jakarta.activation-api\1.2.1\jakarta.activation-api-1.2.1.jar;E:\java\.m2\repository\org\glassfish\jersey\containers\jersey-container-servlet\2.31\jersey-container-servlet-2.31.jar;E:\java\.m2\repository\org\glassfish\jersey\containers\jersey-container-servlet-core\2.31\jersey-container-servlet-core-2.31.jar;E:\java\.m2\repository\org\glassfish\hk2\external\jakarta.inject\2.6.1\jakarta.inject-2.6.1.jar;E:\java\.m2\repository\jakarta\ws\rs\jakarta.ws.rs-api\2.1.6\jakarta.ws.rs-api-2.1.6.jar;E:\java\.m2\repository\org\glassfish\jersey\inject\jersey-hk2\2.31\jersey-hk2-2.31.jar;E:\java\.m2\repository\org\glassfish\hk2\hk2-locator\2.6.1\hk2-locator-2.6.1.jar;E:\java\.m2\repository\org\glassfish\hk2\external\aopalliance-repackaged\2.6.1\aopalliance-repackaged-2.6.1.jar;E:\java\.m2\repository\org\glassfish\hk2\hk2-api\2.6.1\hk2-api-2.6.1.jar;E:\java\.m2\repository\org\glassfish\hk2\hk2-utils\2.6.1\hk2-utils-2.6.1.jar;E:\java\.m2\repository\javax\xml\bind\jaxb-api\2.3.0\jaxb-api-2.3.0.jar;E:\java\.m2\repository\javax\activation\activation\1.1.1\activation-1.1.1.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-server\9.4.38.v20210224\jetty-server-9.4.38.v20210224.jar;E:\java\.m2\repository\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-http\9.4.38.v20210224\jetty-http-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-io\9.4.38.v20210224\jetty-io-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-servlet\9.4.38.v20210224\jetty-servlet-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-security\9.4.38.v20210224\jetty-security-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-util-ajax\9.4.38.v20210224\jetty-util-ajax-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-servlets\9.4.38.v20210224\jetty-servlets-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-continuation\9.4.38.v20210224\jetty-continuation-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-util\9.4.38.v20210224\jetty-util-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-client\9.4.38.v20210224\jetty-client-9.4.38.v20210224.jar;E:\java\.m2\repository\org\reflections\reflections\0.9.12\reflections-0.9.12.jar;E:\java\.m2\repository\org\apache\maven\maven-artifact\3.6.3\maven-artifact-3.6.3.jar;E:\java\.m2\repository\org\codehaus\plexus\plexus-utils\3.2.1\plexus-utils-3.2.1.jar;E:\java\.m2\repository\org\apache\kafka\connect-json\2.7.1\connect-json-2.7.1.jar;E:\java\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.5\jackson-datatype-jdk8-2.10.5.jar;E:\java\.m2\repository\org\apache\kafka\connect-file\2.7.1\connect-file-2.7.1.jar;E:\java\.m2\repository\io\debezium\debezium-connector-mysql\1.6.4.Final\debezium-connector-mysql-1.6.4.Final.jar;E:\java\.m2\repository\io\debezium\debezium-core\1.6.4.Final\debezium-core-1.6.4.Final.jar;E:\java\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.10.5\jackson-core-2.10.5.jar;E:\java\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.10.5.1\jackson-databind-2.10.5.1.jar;E:\java\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.5\jackson-annotations-2.10.5.jar;E:\java\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.5\jackson-datatype-jsr310-2.10.5.jar;E:\java\.m2\repository\com\google\guava\guava\30.0-jre\guava-30.0-jre.jar;E:\java\.m2\repository\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;E:\java\.m2\repository\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;E:\java\.m2\repository\io\debezium\debezium-ddl-parser\1.6.4.Final\debezium-ddl-parser-1.6.4.Final.jar;E:\java\.m2\repository\org\antlr\antlr4-runtime\4.8\antlr4-runtime-4.8.jar;E:\java\.m2\repository\com\zendesk\mysql-binlog-connector-java\0.25.3\mysql-binlog-connector-java-0.25.3.jar;E:\java\.m2\repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar;E:\java\.m2\repository\com\esri\geometry\esri-geometry-api\2.2.0\esri-geometry-api-2.2.0.jar;E:\java\.m2\repository\com\zaxxer\HikariCP\4.0.3\HikariCP-4.0.3.jar;E:\java\.m2\repository\org\awaitility\awaitility\4.0.1\awaitility-4.0.1.jar;E:\java\.m2\repository\org\hamcrest\hamcrest\2.1\hamcrest-2.1.jar" org.example.FlinkDemo
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.flink.table.api.TableException: Failed to execute sql
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:867)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:827)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
	at org.example.FlinkDemo.main(FlinkDemo.java:29)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Flink Demo - To sink_test'.
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
	at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:850)
	... 4 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: source_test[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: sink_test[3]
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
	... 3 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: source_test[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: sink_test[3]
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
	... 3 more
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: source_test[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: sink_test[3]
	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
	... 4 more
Caused by: java.io.StreamCorruptedException: unexpected block data
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1685)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2310)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2310)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2310)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2118)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1656)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
	at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:488)
	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
	... 20 more

Process finished with exit code 1

到了這里,關(guān)于基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 使用Flink CDC將Mysql中的數(shù)據(jù)實(shí)時(shí)同步到ES

    最近公司要搞搜索,需要把mysql中的數(shù)據(jù)同步到es中來進(jìn)行搜索,由于公司已經(jīng)搭建了flink集群,就打算用flink來做這個(gè)同步。本來以為很簡(jiǎn)單,跟著官網(wǎng)文檔走就好了,結(jié)果沒想到折騰了將近一周的時(shí)間…… 我也是沒想到,這玩意網(wǎng)上資源竟然這么少,找到的全部都是通過

    2024年02月11日
    瀏覽(25)
  • Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql(無主鍵)

    環(huán)境說明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運(yùn)行 具體環(huán)境設(shè)置和maven依賴請(qǐng)看上篇:Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 現(xiàn)在操作的是源表和目標(biāo)表都無主鍵數(shù)

    2024年02月15日
    瀏覽(30)
  • 基于 Flink CDC 的實(shí)時(shí)同步系統(tǒng)

    基于 Flink CDC 的實(shí)時(shí)同步系統(tǒng)

    摘要: 本文整理自科杰科技大數(shù)據(jù)架構(gòu)師張軍,在 FFA 2022 數(shù)據(jù)集成專場(chǎng)的分享。本篇內(nèi)容主要分為四個(gè)部分: 功能概述 架構(gòu)設(shè)計(jì) 技術(shù)挑戰(zhàn) 生產(chǎn)實(shí)踐 Tips: 點(diǎn)擊 「閱讀原文」 查看原文視頻演講 ppt 科杰科技是專門做大數(shù)據(jù)服務(wù)的供應(yīng)商,目前的客戶包括能源、金融、證券等

    2024年02月05日
    瀏覽(31)
  • 基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    Flink CDC有兩種方式同步數(shù)據(jù)庫: 一種是通過FlinkSQL直接輸入兩表數(shù)據(jù)庫映射進(jìn)行數(shù)據(jù)同步,缺點(diǎn)是只能單表進(jìn)行同步; 一種是通過DataStream開發(fā)一個(gè)maven項(xiàng)目,打成jar包上傳到服務(wù)器運(yùn)行。 本方案使用FlinkSQL方法,同步兩表中的數(shù)據(jù)。 其中Flink應(yīng)用可以部署在具有公網(wǎng)IP的服務(wù)

    2023年04月11日
    瀏覽(27)
  • 基于大數(shù)據(jù)平臺(tái)(XSailboat)的計(jì)算管道實(shí)現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    基于大數(shù)據(jù)平臺(tái)(XSailboat)的計(jì)算管道實(shí)現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    筆者在先前的一篇文檔《數(shù)據(jù)標(biāo)簽設(shè)計(jì) – 大數(shù)據(jù)平臺(tái)(XSailboat)的數(shù)據(jù)標(biāo)簽?zāi)K》 提到了關(guān)于數(shù)據(jù)標(biāo)簽的模塊,現(xiàn)已實(shí)現(xiàn)并應(yīng)用于項(xiàng)目中。在項(xiàng)目中遇到這樣一種情形: 如果打標(biāo)信息和業(yè)務(wù)數(shù)據(jù)是在一個(gè)數(shù)據(jù)庫實(shí)例中,那么只需要連接兩張表進(jìn)行查詢即可。但是數(shù)據(jù)標(biāo)簽作為

    2024年01月17日
    瀏覽(35)
  • Flink CDC2.4 整庫實(shí)時(shí)同步MySql 到Doris

    ????????Flink 1.15.4? ? ? ? ? 目前有很多工具都支持無代碼實(shí)現(xiàn)Mysql - Doris 的實(shí)時(shí)同步 ? ? ? ? 如:SlectDB 已發(fā)布的功能包 ? ? ? ? ? ? ? ??Dinky?SeaTunnel?TIS?等等 ? ? ? ? ?不過好多要么不支持表結(jié)構(gòu)變動(dòng),要不不支持多sink,我們的業(yè)務(wù)必須支持對(duì)表結(jié)構(gòu)的實(shí)時(shí)級(jí)變動(dòng)

    2024年02月11日
    瀏覽(35)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數(shù)據(jù)到 Elasticsearch、Kafka

    基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 數(shù)據(jù)到 Elasticsearch、Kafka

    Dinky 是一個(gè)開箱即用的一站式實(shí)時(shí)計(jì)算平臺(tái)以 Apache Flink 為基礎(chǔ),連接 OLAP 和數(shù)據(jù)湖等眾多框架致力于流批一體和湖倉一體的建設(shè)與實(shí)踐。本文以此為FlinkSQL可視化工具。 Flink SQL 使得使用標(biāo)準(zhǔn) SQL 開發(fā)流式應(yīng)用變得簡(jiǎn)單,免去代碼開發(fā)。 Flink CDC 本文使用 MySQL CDC 連接器 允許從

    2024年02月16日
    瀏覽(19)
  • 【實(shí)戰(zhàn)-01】flink cdc 實(shí)時(shí)數(shù)據(jù)同步利器

    【實(shí)戰(zhàn)-01】flink cdc 實(shí)時(shí)數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對(duì)很多初入門的人來說是無法理解cdc到底是什么個(gè)東西。 有這樣一個(gè)需求,比如在mysql數(shù)據(jù)庫中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉庫(starrocks), 數(shù)據(jù)倉庫你可以理解為存儲(chǔ)了各種各樣來自不同數(shù)據(jù)庫中表。 數(shù)據(jù)的同步目前對(duì)

    2023年04月08日
    瀏覽(94)
  • Flink CDC實(shí)時(shí)同步PG數(shù)據(jù)庫

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git? 1、更改配置文件postgresql.conf # 更改wal日志方式為logical wal_level = logical # minimal, replica, or logical # 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個(gè)slots max_replication_slots = 20 # m

    2024年02月13日
    瀏覽(35)
  • 【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進(jìn)行實(shí)時(shí)數(shù)據(jù)同步

    【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進(jìn)行實(shí)時(shí)數(shù)據(jù)同步

    CDC是Change Data Capture的縮寫,中文意思是 變更數(shù)據(jù)獲取 ,flink-cdc的作用是,通過flink捕獲數(shù)據(jù)源的事務(wù)變動(dòng)操作記錄,包括數(shù)據(jù)的增刪改操作等,根據(jù)這些記錄可作用于對(duì)目標(biāo)端進(jìn)行實(shí)時(shí)數(shù)據(jù)同步。 下圖是flink-cdc最新支持的數(shù)據(jù)源類型: kafka的數(shù)據(jù)源要通過flink-cdc進(jìn)行實(shí)時(shí)數(shù)

    2024年02月12日
    瀏覽(36)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包