国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch

這篇具有很好參考價值的文章主要介紹了實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

前面的博文我們分享了大數據分布式流處理計算框架Flink和其基礎環(huán)境的搭建,相信各位看官都已經搭建好了自己的運行環(huán)境。那么,今天就來實戰(zhàn)一把使用Flink CDC同步Mysql數據導Elasticsearch。

知識積累

CDC簡介

CDC 的全稱是 Change Data Capture(變更數據捕獲技術) ,在廣義的概念上,只要是能捕獲數據變更的技術,我們都可以稱之為 CDC 。目前通常描述的 CDC 技術主要面向數據庫的變更,是一種用于捕獲數據庫中數據變更的技術。
實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch,大數據,flink,docker,大數據,flink,mysql,elasticsearch,cdc

CDC的種類

CDC 的技術方案非常多,目前業(yè)界主流的實現機制可以分為兩種:
基于查詢的 CDC:
◆離線調度查詢作業(yè),批處理。把一張表同步到其他系統(tǒng),每次通過查詢去獲取表中最新的數據;
◆無法保障數據一致性,查的過程中有可能數據已經發(fā)生了多次變更;
◆不保障實時性,基于離線調度存在天然的延遲。
基于日志的 CDC:
◆實時消費日志,流處理,例如 MySQL 的 binlog 日志完整記錄了數據庫中的變更,可以把 binlog 文件當作流的數據源;
◆保障數據一致性,因為 binlog 文件包含了所有歷史變更明細;
◆保障實時性,因為類似 binlog 的日志文件是可以流式消費的,提供的是實時數據。

常見的CDC方案比較

實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch,大數據,flink,docker,大數據,flink,mysql,elasticsearch,cdc

Springboot接入Flink CDC

由于Flink官方提供了Java、Scala、Python語言接口用以開發(fā)Flink應用程序,故我們可以直接用Maven引入Flink依賴進行功能實現。

環(huán)境準備

1、SpringBoot 2.4.3
2、Flink 1.13.6
3、Scala 2.11
4、Maven 3.6.3
5、Java 8
6、mysql 8
7、es 7
Springboot、Flink、Scala版本一定要相匹配,也可以嚴格按照本博客進行配置。
注意:
如果只是本機測試玩玩,Maven依賴已經整合計算環(huán)境,不用額外搭建Flink環(huán)境;如果需要部署到Flink集群則需要額外搭建Flink集群。另外Scala 版本只是用于依賴選擇,不用關心Scala環(huán)境。

項目搭建

1、引入Flink CDC Maven依賴

pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.3</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>flink-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>flink-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
    <java.version>8</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <flink.version>1.13.6</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>
    <!-- Flink CDC connector for MySQL -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.1.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-shaded-guava</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- 
    Flink CDC connector for ES 
    https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7_2.11
    -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge_2.11 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner_2.11 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients_2.11 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java_2.11 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2、創(chuàng)建測試數據庫表users

users表結構

CREATE TABLE `users` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `name` varchar(50) NOT NULL COMMENT '名稱',
  `birthday` timestamp NULL DEFAULT NULL COMMENT '生日',
  `ts` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用戶';

3、es索引操作

es操作命令
es索引會自動創(chuàng)建

#設置es分片與副本
curl -X PUT "10.10.22.174:9200/users" -u elastic:VaHcSC3mOFfovLWTqW6E   -H 'Content-Type: application/json' -d'
{
    "settings" : {
        "number_of_shards" : 3,
        "number_of_replicas" : 2
    }
}'

#查詢index下全部數據 
curl -X GET "http://10.10.22.174:9200/users/_search"  -u elastic:VaHcSC3mOFfovLWTqW6E -H 'Content-Type: application/json' 

#刪除index
curl -X DELETE "10.10.22.174:9200/users" -u elastic:VaHcSC3mOFfovLWTqW6E

本地運行

@SpringBootTest
class FlinkDemoApplicationTests {

    /**
     * flinkCDC
     * mysql to es
     * @author senfel
     * @date 2023/8/22 14:37 
     * @return void
     */
    @Test
    void flinkCDC() throws Exception{
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                //.useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,fsSettings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        // 數據源表
        String sourceDDL =
                "CREATE TABLE users (\n" +
                        "  id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
                        "  name STRING,\n" +
                        "  birthday TIMESTAMP(3),\n" +
                        "  ts TIMESTAMP(3)\n" +
                        ") WITH (\n" +
                        "      'connector' = 'mysql-cdc',\n" +
                        "      'hostname' = '10.10.10.202',\n" +
                        "      'port' = '6456',\n" +
                        "      'username' = 'root',\n" +
                        "      'password' = 'MyNewPass2021',\n" +
                        "      'server-time-zone' = 'Asia/Shanghai',\n" +
                        "      'database-name' = 'cdc',\n" +
                        "      'table-name' = 'users'\n" +
                        "      )";
        // 輸出目標表
        String sinkDDL =
                "CREATE TABLE users_sink_es\n" +
                        "(\n" +
                        "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                        "    name STRING,\n" +
                        "    birthday TIMESTAMP(3),\n" +
                        "    ts TIMESTAMP(3)\n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector' = 'elasticsearch-7',\n" +
                        "  'hosts' = 'http://10.10.22.174:9200',\n" +
                        "  'index' = 'users',\n" +
                        "  'username' = 'elastic',\n" +
                        "  'password' = 'VaHcSC3mOFfovLWTqW6E'\n" +
                        ")";
        // 簡單的聚合處理
        String transformSQL = "INSERT INTO users_sink_es SELECT * FROM users";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDDL);
        TableResult result = tableEnv.executeSql(transformSQL);
        result.print();
        env.execute("mysql-to-es");
    }

請求es用戶索引發(fā)現并無數據:

[root@bluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’
{“took”:0,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:0,“relation”:“eq”},“max_score”:null,“hits”:[]}}

操作mysql數據庫新增多條數據

5 senfel 2023-08-30 15:02:28 2023-08-30 15:02:36
6 sebfel2 2023-08-30 15:02:43 2023-08-30 15:02:47

再次獲取es用戶索引查看數據

[root@bluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’
{“took”:67,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:2,“relation”:“eq”},“max_score”:1.0,“hits”:[{“_index”:“users”,“_type”:“_doc”,“_id”:“5”,“_score”:1.0,“_source”:{“id”:5,“name”:“senfel”,“birthday”:“2023-08-30 15:02:28”,“ts”:“2023-08-30 15:02:36”}},{“_index”:“users”,“_type”:“_doc”,“_id”:“6”,“_score”:1.0,“_source”:{“id”:6,“name”:“sebfel2”,“birthday”:“2023-08-30 15:02:43”,“ts”:“2023-08-30 15:02:47”}}]}}

由上測試結果可知本地運行無異常。

集群運行

項目樹:
實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch,大數據,flink,docker,大數據,flink,mysql,elasticsearch,cdc

1、創(chuàng)建集群運行代碼邏輯

/**
 * FlinkMysqlToEs
 * @author senfel
 * @version 1.0
 * @date 2023/8/22 14:56
 */
public class FlinkMysqlToEs {

    public static void main(String[] args) throws Exception {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                //.useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,fsSettings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        // 數據源表
        String sourceDDL =
                "CREATE TABLE users (\n" +
                        "  id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
                        "  name STRING,\n" +
                        "  birthday TIMESTAMP(3),\n" +
                        "  ts TIMESTAMP(3)\n" +
                        ") WITH (\n" +
                        "      'connector' = 'mysql-cdc',\n" +
                        "      'hostname' = '10.10.10.202',\n" +
                        "      'port' = '6456',\n" +
                        "      'username' = 'root',\n" +
                        "      'password' = 'MyNewPass2021',\n" +
                        "      'server-time-zone' = 'Asia/Shanghai',\n" +
                        "      'database-name' = 'cdc',\n" +
                        "      'table-name' = 'users'\n" +
                        "      )";
        // 輸出目標表
        String sinkDDL =
                "CREATE TABLE users_sink_es\n" +
                        "(\n" +
                        "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                        "    name STRING,\n" +
                        "    birthday TIMESTAMP(3),\n" +
                        "    ts TIMESTAMP(3)\n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector' = 'elasticsearch-7',\n" +
                        "  'hosts' = 'http://10.10.22.174:9200',\n" +
                        "  'index' = 'users',\n" +
                        "  'username' = 'elastic',\n" +
                        "  'password' = 'VaHcSC3mOFfovLWTqW6E'\n" +
                        ")";
        // 簡單的聚合處理
        String transformSQL = "INSERT INTO users_sink_es SELECT * FROM users";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDDL);
        TableResult result = tableEnv.executeSql(transformSQL);
        result.print();
        env.execute("mysql-to-es");
    }
}

2、集群運行需要將Flink程序打包,不同于普通的jar包,這里必須采用shade

<build>
    <finalName>flink-demo</finalName>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <createDependencyReducedPom>false</createDependencyReducedPom>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>module-info.class</exclude>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.handlers</resource>
                                <resource>reference.conf</resource>
                            </transformer>
                            <transformer
                                    implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                <resource>META-INF/spring.factories</resource>
                            </transformer>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.schemas</resource>
                            </transformer>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.example.flinkdemo.FlinkMysqlToEs</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

將項目打包將包傳入集群啟動

1、項目打包
mvn package -Dmaven.test.skip=true

2、手動上傳到服務器拷貝如集群內部運行:
/opt/flink/bin# ./flink run …/flink-demo.jar

3、測試操作mysql數據庫

刪除id =6只剩下id=5的用戶

5 senfel000 2023-08-30 15:02:28 2023-08-30 15:02:36

4、查詢es用戶索引

[root@bluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’
{“took”:931,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:1,“relation”:“eq”},“max_score”:1.0,“hits”:[{“_index”:“users”,“_type”:“_doc”,“_id”:“5”,“_score”:1.0,“_source”:{“id”:5,“name”:“senfel”,“birthday”:“2023-08-30 15:02:28”,“ts”:“2023-08-30 15:02:36”}}]}}[

如上所示es中只剩下了id==5的數據;
經測試手動部署到集群環(huán)境成功。

遠程將包部署到flink集群

1、新增controller觸發(fā)接口

/**
 * remote runTask
 * @author senfel
 * @date 2023/8/30 16:57 
 * @return org.apache.flink.api.common.JobID
 */
@GetMapping("/runTask")
public JobID runTask() {
    try {
        // 集群信息
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.ADDRESS, "10.10.22.91");
        configuration.setInteger(JobManagerOptions.PORT, 6123);
        configuration.setInteger(RestOptions.PORT, 8081);
        RestClusterClient<StandaloneClusterId>  client = new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
        //jar包存放路徑,也可以直接調用hdfs中的jar
        File jarFile = new File("input/flink-demo.jar");
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
        //構建提交任務參數
        PackagedProgram program = PackagedProgram
                .newBuilder()
                .setConfiguration(configuration)
                .setEntryPointClassName("com.example.flinkdemo.FlinkMysqlToEs")
                .setJarFile(jarFile)
                .setSavepointRestoreSettings(savepointRestoreSettings).build();
        //創(chuàng)建任務
        JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, 1, false);
        //提交任務
        CompletableFuture<JobID> result = client.submitJob(jobGraph);
        return result.get();
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

2、啟動Springboot項目
實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch,大數據,flink,docker,大數據,flink,mysql,elasticsearch,cdc

3、postman請求
實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch,大數據,flink,docker,大數據,flink,mysql,elasticsearch,cdc
4、查看Fink集群控制臺
實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch,大數據,flink,docker,大數據,flink,mysql,elasticsearch,cdc

由上圖所示已將遠程部署完成。

5、測試操作mysql數據庫

5 senfel000 2023-08-30 15:02:28 2023-08-30 15:02:36
7 eeeee 2023-08-30 17:12:00 2023-08-30 17:12:04
8 33333 2023-08-30 17:12:08 2023-08-30 17:12:11

6、查詢es用戶索引

[root@bluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’
{“took”:766,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:3,“relation”:“eq”},“max_score”:1.0,“hits”:[{“_index”:“users”,“_type”:“_doc”,“_id”:“5”,“_score”:1.0,“_source”:{“id”:5,“name”:“senfel000”,“birthday”:“2023-08-30 15:02:28”,“ts”:“2023-08-30 15:02:36”}},{“_index”:“users”,“_type”:“_doc”,“_id”:“7”,“_score”:1.0,“_source”:{“id”:7,“name”:“eeeee”,“birthday”:“2023-08-30 17:12:00”,“ts”:“2023-08-30 17:12:04”}},{“_index”:“users”,“_type”:“_doc”,“_id”:“8”,“_score”:1.0,“_source”:{“id”:8,“name”:“33333”,“birthday”:“2023-08-30 17:12:08”,“ts”:“2023-08-30 17:12:11”}}]}}

如上所以es中新增了兩條數據;
經測試遠程發(fā)布Flink Task完成。

寫在最后

大數據Flink CDC同步Mysql數據到ElasticSearch搭建與測試運行較為簡單,對于基礎的學習測試環(huán)境獨立集群目前只支持單個任務部署,如果需要多個任務或者運用于生產可以采用Yarn與Job分離模式進行部署。文章來源地址http://www.zghlxwxcb.cn/news/detail-698983.html

到了這里,關于實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式)

    對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下 授權鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant 基于jdk1.8 + springboot2.7.x + elasticsearch7.x 到此就大功告成啦!代碼地址:https://gitee.com/qianxkun/lakudouzi-components/tree/

    2024年02月16日
    瀏覽(25)
  • 【實戰(zhàn)-01】flink cdc 實時數據同步利器

    【實戰(zhàn)-01】flink cdc 實時數據同步利器

    cdc github源碼地址 cdc官方文檔 對很多初入門的人來說是無法理解cdc到底是什么個東西。 有這樣一個需求,比如在mysql數據庫中存在很多數據,但是公司要把mysql中的數據同步到數據倉庫(starrocks), 數據倉庫你可以理解為存儲了各種各樣來自不同數據庫中表。 數據的同步目前對

    2023年04月08日
    瀏覽(94)
  • 基于Flink CDC實時同步數據(MySQL到MySQL)

    基于Flink CDC實時同步數據(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在遠程服務器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安裝在本地:192.168.3.31) (安裝部署過程略) 準備三個數據庫:flink_source、flink_sink、flink_sink_second。 將flink_source.source_test表實時同步到flink_sink和flink_sink_second的sink_test表。 (建庫建表過程略) 開發(fā)過程

    2024年02月06日
    瀏覽(28)
  • 基于Flink SQL CDC Mysql to Mysql數據同步

    基于Flink SQL CDC Mysql to Mysql數據同步

    Flink CDC有兩種方式同步數據庫: 一種是通過FlinkSQL直接輸入兩表數據庫映射進行數據同步,缺點是只能單表進行同步; 一種是通過DataStream開發(fā)一個maven項目,打成jar包上傳到服務器運行。 本方案使用FlinkSQL方法,同步兩表中的數據。 其中Flink應用可以部署在具有公網IP的服務

    2023年04月11日
    瀏覽(27)
  • 基于 Flink CDC 構建 MySQL 到 Databend 的 實時數據同步

    基于 Flink CDC 構建 MySQL 到 Databend 的 實時數據同步

    這篇教程將展示如何基于 Flink CDC 快速構建 MySQL 到 Databend 的實時數據同步。本教程的演示都將在 Flink SQL CLI 中進行,只涉及 SQL,無需一行 Java/Scala 代碼,也無需安裝 IDE。 假設我們有電子商務業(yè)務,商品的數據存儲在 MySQL ,我們需要實時把它同步到 Databend 中。 接下來的內容

    2024年02月10日
    瀏覽(29)
  • Flink DataStream API CDC同步MySQL數據到StarRocks

    Flink DataStream API CDC同步MySQL數據到StarRocks

    Flink:1.16.1 pom文件如下 Java代碼 SourceAndSinkInfo 類,用于定義source和sink的IP、端口、賬號、密碼信息 DataCenterShine實體類,字段與數據庫一一對應。 StarRocksPrimary 實體類 FieldInfo注解類,用于標記字段序號、是否為主鍵、是否為空,后續(xù)生成TableSchema需要使用到。 TableName 注解類,

    2024年02月03日
    瀏覽(32)
  • 使用Flink CDC將Mysql中的數據實時同步到ES

    最近公司要搞搜索,需要把mysql中的數據同步到es中來進行搜索,由于公司已經搭建了flink集群,就打算用flink來做這個同步。本來以為很簡單,跟著官網文檔走就好了,結果沒想到折騰了將近一周的時間…… 我也是沒想到,這玩意網上資源竟然這么少,找到的全部都是通過

    2024年02月11日
    瀏覽(25)
  • 基于大數據平臺(XSailboat)的計算管道實現MySQL數據源的CDC同步--flink CDC

    基于大數據平臺(XSailboat)的計算管道實現MySQL數據源的CDC同步--flink CDC

    筆者在先前的一篇文檔《數據標簽設計 – 大數據平臺(XSailboat)的數據標簽模塊》 提到了關于數據標簽的模塊,現已實現并應用于項目中。在項目中遇到這樣一種情形: 如果打標信息和業(yè)務數據是在一個數據庫實例中,那么只需要連接兩張表進行查詢即可。但是數據標簽作為

    2024年01月17日
    瀏覽(35)
  • Flink CDC MySQL同步MySQL錯誤記錄

    Flink CDC MySQL同步MySQL錯誤記錄

    0、相關Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者從mvnrepository.com下載 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    瀏覽(21)
  • 最新版Flink CDC MySQL同步MySQL(一)_flink 連接mysql(1)

    最新版Flink CDC MySQL同步MySQL(一)_flink 連接mysql(1)

    下載 連接器 SQL jar (或 自行構建 )。 將下載的jar包放在FLINK_HOME/lib/. 重啟Flink集群。 注意 :目前2.4以上版本需要進行自行編譯構建。本文筆者自行進行構建上傳的 6.使用 Flink CDC 對 MySQL 進行流式 ETL 本教程將展示如何使用 Flink CDC 快速構建 MySQL的流式 ETL。 假設我們將產品數

    2024年04月26日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包