国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink學習13-Flink CDC

這篇具有很好參考價值的文章主要介紹了Flink學習13-Flink CDC。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、CDC簡介

cdc全稱 Change Data Capture 變更數(shù)據(jù)捕獲。通俗來講只要能捕獲到變更的數(shù)據(jù)的技術都可以稱為cdc。常見的開源技術有以下幾種:
canal:https://github.com/alibaba/canal
maxwell:https://github.com/zendesk/maxwell
Debezium:https://github.com/debezium/debezium
flink-cdc:https://github.com/ververica/flink-cdc-connectors
以下是幾種技術的橫向對比
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)

二、canal+maxwell

兩者實現(xiàn)原理類似。canal模擬mysql主從復制過程,把自己當做從庫。通過dump操作把binlog從主庫讀取到從庫,然后根據(jù)binlog進行數(shù)據(jù)還原。maxwell原理同理。兩者區(qū)別在于maxwell是一款輕量級框架,可拓展性較少。比如它支持處理json格式數(shù)據(jù),并把數(shù)據(jù)發(fā)送到kafka、redis等中。而canal可以自定義數(shù)據(jù)格式,而且并不局限于將數(shù)據(jù)發(fā)送到特定的數(shù)據(jù)存儲介質中。
下面以canal舉例說明:

2.1 安裝canal

安裝很簡單,選擇release版本下載解壓即可:https://github.com/alibaba/canal/releases?page=2。
這里需要說明的是deployer是個人開發(fā)版。
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)

2.2 配置canal

下載解壓完成之后如下:
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)
進入conf/example/instance.porperties中修改如下配置
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)
在canal安裝目錄下的conf下找到canal.properties,主要修改數(shù)據(jù)需要發(fā)送的介質。默認配置是tcp,這個可以自定義將數(shù)據(jù)寫入到其他地方,如果需要將canal采集到的數(shù)據(jù)寫入kafka topic就選擇第二種。
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)
配置完成之后啟動:/bin/startup.sh

2.3 配置mysql binlog

mysql數(shù)據(jù)源需要提前開啟binlog,找到my.cnf配置如下:
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)
然后重啟mysql服務。
登錄mysql執(zhí)行一下,然后查看是on就代表開啟成功
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)

2.4代碼實現(xiàn)

pom依賴

<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
</dependency>

import com.alibaba.fastjson.JSON;
import com.alibaba.google.common.base.CaseFormat;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;

public class CanalClientApp {
    public static void main(String[] args) throws Exception{
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), //本地部署就用localhost,端口默認
                "example", null, null);
        //設置死循環(huán),一直訪問connector中數(shù)據(jù)
        while (true){
            connector.connect(); //進行鏈接
            connector.subscribe("test.*");//設置需要監(jiān)控的庫表
            Message message = connector.get(100);//設置獲取一批數(shù)據(jù)量大小

            List<CanalEntry.Entry> entries = message.getEntries(); //獲取一批消息的list集合
            if(entries.size() > 0){ //如果list中有數(shù)據(jù)就遍歷取出
                for (CanalEntry.Entry entry : entries) {
                    String tableName = entry.getHeader().getTableName(); //獲取header中請求到的表名

                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());//value值轉換成string類型
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

                    //insert update delete ...
                    CanalEntry.EventType eventType = rowChange.getEventType();

                    if(eventType == CanalEntry.EventType.INSERT){
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

                            HashMap<String, String> map = new HashMap<>();
                            for (CanalEntry.Column column : afterColumnsList) {
                                String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
                                map.put(key, column.getValue());
                            }

                            /**
                             * TODO...
                             * 到這一步,下面要做的事情就是把map的數(shù)據(jù)發(fā)送到想要的地方去...
                             */
                            System.out.println("tableName:"+ tableName + " , " + JSON.toJSONString(map));
                        }
                    }
                }
            }

        }
    }
}

mysql寫入數(shù)據(jù):
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)
idea實時打印輸出
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)

三、Debezium+Flink-CDC

Debezium是為Kafka Connect而建的一系列Source Connectors,每個Source Connector會根據(jù)對應數(shù)據(jù)庫特性來捕獲數(shù)據(jù)變更記錄。不像其他方法,例如,輪詢或者雙寫等。Debezium是基于日志進行捕獲變更的。而flink-cdc(1.x版本)和Debezium一脈相承。接下來通過案例簡單了解下flink-cdc的使用方式。

3.1 flink-cdc解析

Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)
官方社區(qū)的解析已經很清晰明了,通過兩種方式對比發(fā)現(xiàn),flink-cdc節(jié)省了很大一部分運維成本。傳統(tǒng)etl中flink只負責計算,而flink-cdc將采集計算為一體。當然看到了flink-cdc的優(yōu)勢,也需要了解當前版本(1.x)的局限性。因為flink-cdc底層采用了Debezium框架,數(shù)據(jù)讀取分為全量+增量模式。在全量讀取數(shù)據(jù)的時候為了保證數(shù)據(jù)一致性會加上一個全局鎖,如果數(shù)據(jù)量非常大讀取數(shù)據(jù)會以小時級別計算。切如果在全量讀取階段任務運行失敗是無法進行checkpoint的。
簡單來說,flink-cdc第一階段讀取全量數(shù)據(jù)時默認會加一個全局鎖,會拒絕其他事務提交update操作,這樣可以保證數(shù)據(jù)一致性,但數(shù)據(jù)量特別大時,可能會導致數(shù)據(jù)庫hang住。
于是官方又更新了flink-cdc 2.x版本。這個版本主要解決鎖還有無法checkpoint的問題。主要原理是chunk算法+SourceEnumerator組件實現(xiàn)。
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)
flink-cdc 2.x版本讀取流程如上圖所述,首先根據(jù)chunk算法對binlog進行切片。每個i切片分區(qū)內數(shù)據(jù)不重合。正在讀寫的切片如果有數(shù)據(jù)更新的話,會將更新后的數(shù)據(jù)輸出。從而實現(xiàn)不加鎖的方式下保證數(shù)據(jù)讀的一致性。

3.2 flink-cdc讀取mysql binlog數(shù)據(jù)

實現(xiàn)方式分為兩種:https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-connectors.html#usage-for-datastream-api
Datastream Api
pom依賴里需要添加如下:

<dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.3.0</version>
            <scope>provided</scope>
        </dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // 默認json反序列化器,進階版可以用自定義反序列化器
            .build();
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // enable checkpoint
    env.enableCheckpointing(3000);
    
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
    env.execute("Print MySQL Snapshot + Binlog");
  }
}

Table Api

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkTableCDCApp {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableenv = StreamTableEnvironment.create(env);
        env.setParallelism(1);

        tableenv.executeSql("create table mysql_bin(" +
                "id INT primary key, " +
                "name STRING" +
                ") with(" +
                "'connector' = 'mysql-cdc'," +
                "'hostname' = 'ip'," +
                "'port' = '3306'," +
                "'username' = 'yourUsername'," +
                "'password' = 'yourPassword'," +
                "'database-name' = 'yourDatabaseName'," +
                "'table-oname' = 'yourTableName'" +
                ")"
        );
        Table table = tableenv.sqlQuery("select * from mysql_bin");
        tableenv.toRetractStream(table, Row.class).print();

        env.execute();
    }
}

這里需要注意的是當前只是本地編譯,沒有提交flink。如果任務上環(huán)境提交運行需要提前將此jar放在FLINK_HOME/lib/目錄下去。
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)
還需要有一點說明的是,我在本地編譯的時候選擇flink-cdc的版本是1.x,如上圖。實際上不同版本的flink-cdc對應的flink版本都不同。以下版本對應關系。
Flink學習13-Flink CDC,flink,學習,大數(shù)據(jù)文章來源地址http://www.zghlxwxcb.cn/news/detail-794492.html

到了這里,關于Flink學習13-Flink CDC的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • flink cdc多種數(shù)據(jù)源安裝、配置與驗證
flink cdc多種數(shù)據(jù)源安裝、配置與驗證

    flink cdc多種數(shù)據(jù)源安裝、配置與驗證 flink cdc多種數(shù)據(jù)源安裝、配置與驗證

    ? 搜索 文章目錄 1. 前言 2. 數(shù)據(jù)源安裝與配置 2.1 MySQL 2.1.1 安裝 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安裝 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安裝 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安裝 2.4.2 CDC 配置 3. 驗證 3.1 Flink版本與CDC版本的對應關系 3.2 下載相關包 3.3 添加cdc jar 至lib目錄 3.4 驗證 本文目錄結構

    2024年02月09日
    瀏覽(48)
  • Flink CDC數(shù)據(jù)同步

    Flink CDC數(shù)據(jù)同步

    一、什么是FLink Apache?Flink?是一個框架和分布式處理引擎,用于在無邊界和有邊界數(shù)據(jù)流上進行有狀態(tài)的計算。Flink?能在所有常見集群環(huán)境中運行,并能以內存速度和任意規(guī)模進行計算。 接下來,我們來介紹一下?Flink?架構中的重要方面。 任何類型的數(shù)據(jù)都可以形成一種事

    2024年02月08日
    瀏覽(20)
  • 【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    CDC是Change Data Capture的縮寫,中文意思是 變更數(shù)據(jù)獲取 ,flink-cdc的作用是,通過flink捕獲數(shù)據(jù)源的事務變動操作記錄,包括數(shù)據(jù)的增刪改操作等,根據(jù)這些記錄可作用于對目標端進行實時數(shù)據(jù)同步。 下圖是flink-cdc最新支持的數(shù)據(jù)源類型: kafka的數(shù)據(jù)源要通過flink-cdc進行實時數(shù)

    2024年02月12日
    瀏覽(36)
  • 【大數(shù)據(jù)】Flink 詳解(十):SQL 篇 Ⅲ(Flink SQL CDC)

    【大數(shù)據(jù)】Flink 詳解(十):SQL 篇 Ⅲ(Flink SQL CDC)

    《 Flink 詳解 》系列(已完結),共包含以下 10 10 10 篇文章: 【大數(shù)據(jù)】Flink 詳解(一):基礎篇(架構、并行度、算子) 【大數(shù)據(jù)】Flink 詳解(二):核心篇 Ⅰ(窗口、WaterMark) 【大數(shù)據(jù)】Flink 詳解(三):核心篇 Ⅱ(狀態(tài) State) 【大數(shù)據(jù)】Flink 詳解(四):核心篇

    2024年01月25日
    瀏覽(54)
  • Flink CDC 新一代數(shù)據(jù)集成框架

    Flink CDC 新一代數(shù)據(jù)集成框架

    前言: 主要講解了技術原理,入門與生產實踐,主要功能:全增量一體化數(shù)據(jù)集成、實時數(shù)據(jù)入庫入倉、最詳細的教程。Flink CDC 是Apache Flink的一個重要組件,主要使用了CDC技術從各種數(shù)據(jù)庫中獲取變更流并接入到Flink中,Apache Flink作為一款非常優(yōu)秀的流處理引擎,其SQL API又

    2024年02月13日
    瀏覽(32)
  • 基于 Flink CDC 的現(xiàn)代數(shù)據(jù)棧實踐

    基于 Flink CDC 的現(xiàn)代數(shù)據(jù)棧實踐

    摘要:本文整理自阿里云技術專家,Apache Flink PMC Member Committer, Flink CDC Maintainer 徐榜江和阿里云高級研發(fā)工程師,Apache Flink Contributor Flink CDC Maintainer 阮航,在 Flink Forward Asia 2022 數(shù)據(jù)集成專場的分享。本篇內容主要分為四個部分: 1.深入解讀 Flink CDC 2.3 版本 2.基于 Flink CDC 構建

    2024年02月09日
    瀏覽(20)
  • 大數(shù)據(jù)技術之 Flink-CDC

    大數(shù)據(jù)技術之 Flink-CDC

    CDC 是 Change Data Capture(變更數(shù)據(jù)獲?。┑暮喎Q。在廣義的概念上,只要是能捕獲數(shù)據(jù)變更的技術,我們都可以稱之為 CDC 。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù)或數(shù)據(jù)表的插入、更新以及刪除等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以

    2024年02月05日
    瀏覽(21)
  • 【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對很多初入門的人來說是無法理解cdc到底是什么個東西。 有這樣一個需求,比如在mysql數(shù)據(jù)庫中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉庫(starrocks), 數(shù)據(jù)倉庫你可以理解為存儲了各種各樣來自不同數(shù)據(jù)庫中表。 數(shù)據(jù)的同步目前對

    2023年04月08日
    瀏覽(94)
  • 【大數(shù)據(jù)】Flink CDC 的概覽和使用

    【大數(shù)據(jù)】Flink CDC 的概覽和使用

    CDC ( Change Data Capture , 數(shù)據(jù)變更抓取 )是一種用于跟蹤數(shù)據(jù)庫中數(shù)據(jù)更改的技術。它用于監(jiān)視數(shù)據(jù)庫中的變化,并捕獲這些變化,以便實時或定期將變化的數(shù)據(jù)同步到其他系統(tǒng)、數(shù)據(jù)倉庫或分析平臺。CDC 技術通常用于數(shù)據(jù)復制、數(shù)據(jù)倉庫更新、實時報告和數(shù)據(jù)同步等場景。

    2024年01月24日
    瀏覽(32)
  • Flink CDC 實時抽取 Oracle 數(shù)據(jù)-排錯&調優(yōu)

    Flink CDC 實時抽取 Oracle 數(shù)據(jù)-排錯&調優(yōu)

    Flink CDC 于 2021 年 11 月 15 日發(fā)布了最新版本 2.1,該版本通過引入內置 Debezium 組件,增加了對 Oracle 的支持。對該版本進行試用并成功實現(xiàn)了對 Oracle 的實時數(shù)據(jù)捕獲以及性能調優(yōu),現(xiàn)將試用過程中的一些關鍵細節(jié)進行分享。 Oracle:11.2.0.4.0(RAC 部署) Flink:1.13.1 Hadoop:3.2.1

    2024年01月16日
    瀏覽(34)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包