国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink CDC介紹和簡單實用

這篇具有很好參考價值的文章主要介紹了Flink CDC介紹和簡單實用。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

簡介

CDC是Change Data Capture(變更數(shù)據(jù)獲?。┑暮喎Q。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù)或數(shù)據(jù)表的插入、更新以及刪除等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進行訂閱及消費。

種類

基于查詢和基于binlog
Flink CDC介紹和簡單實用

基于日志的 CDC 方案介紹

從 ETL 的角度進行分析,一般采集的都是業(yè)務(wù)庫數(shù)據(jù),這里使用 MySQL 作為需要采集的數(shù)據(jù)庫,通過 Debezium 把 MySQL Binlog 進行采集后發(fā)送至 Kafka 消息隊列,然后對接一些實時計算引擎或者 APP 進行消費后把數(shù)據(jù)傳輸入 OLAP 系統(tǒng)或者其他存儲介質(zhì)。
Flink 希望打通更多數(shù)據(jù)源,發(fā)揮完整的計算能力。我們生產(chǎn)中主要來源于業(yè)務(wù)日志和數(shù)據(jù)庫日志,F(xiàn)link 在業(yè)務(wù)日志的支持上已經(jīng)非常完善,但是在數(shù)據(jù)庫日志支持方面在 Flink 1.11 前還屬于一片空白,這就是為什么要集成 CDC 的原因之一。
Flink SQL 內(nèi)部支持了完整的 changelog 機制,所以 Flink 對接 CDC 數(shù)據(jù)只需要把CDC 數(shù)據(jù)轉(zhuǎn)換成 Flink 認識的數(shù)據(jù),所以在 Flink 1.11 里面重構(gòu)了 TableSource 接口,以便更好支持和集成 CDC。
Flink CDC介紹和簡單實用
Flink CDC介紹和簡單實用
重構(gòu)后的 TableSource 輸出的都是 RowData 數(shù)據(jù)結(jié)構(gòu),代表了一行的數(shù)據(jù)。在RowData 上面會有一個元數(shù)據(jù)的信息,我們稱為 RowKind 。RowKind 里面包括了插入、更新前、更新后、刪除,這樣和數(shù)據(jù)庫里面的 binlog 概念十分類似。通過 Debezium 采集的 JSON 格式,包含了舊數(shù)據(jù)和新數(shù)據(jù)行以及原數(shù)據(jù)信息,op 的 u表示是 update 更新操作標識符,ts_ms 表示同步的時間戳。因此,對接 Debezium JSON 的數(shù)據(jù),其實就是將這種原始的 JSON 數(shù)據(jù)轉(zhuǎn)換成 Flink 認識的 RowData。

flink作為etl工具

原工作原理
Flink CDC介紹和簡單實用
優(yōu)化后
Flink CDC介紹和簡單實用
Flink SQL 采集+計算+傳輸(ETL)一體化優(yōu)點:
? 開箱即用,簡單易上手
? 減少維護的組件,簡化實時鏈路,減輕部署成本
? 減小端到端延遲
? Flink 自身支持 Exactly Once 的讀取和計算
? 數(shù)據(jù)不落地,減少存儲成本
? 支持全量和增量流式讀取
? binlog 采集位點可回溯

應(yīng)用場景

? 實時數(shù)據(jù)同步,數(shù)據(jù)備份,數(shù)據(jù)遷移,數(shù)倉構(gòu)建
優(yōu)勢:豐富的上下游(E & L),強大的計算(T),易用的 API(SQL),流式計算低延遲
? 數(shù)據(jù)庫之上的實時物化視圖、流式數(shù)據(jù)分析
? 索引構(gòu)建和實時維護
? 業(yè)務(wù) cache 刷新
? 審計跟蹤
? 微服務(wù)的解耦,讀寫分離
? 基于 CDC 的維表關(guān)聯(lián)

開源地址

https://github.com/ververica/flink-cdc-connectors

最新flink cdc官方文檔分享

https://flink-learning.org.cn/article/detail/eed4549f80e80cc30c69c406cb08b59a

流程圖

個人理解作圖
Flink CDC介紹和簡單實用

1.X痛點

Flink CDC介紹和簡單實用所以設(shè)計目標
Flink CDC介紹和簡單實用
設(shè)計實現(xiàn)上
在對于有主鍵的表做初始化模式,整體的流程主要分為5個階段:
1.Chunk切分;2.Chunk分配;(實現(xiàn)并行讀取數(shù)據(jù)&CheckPoint)
3.Chunk讀取;(實現(xiàn)無鎖讀取)
4.Chunk匯報;
5.Chunk分配。
對于并發(fā)線程
會對比各個讀取切分的最高和最低的位置區(qū)間,超過區(qū)間進行更新

目前支持開發(fā)方式

個人理解作圖
Flink CDC介紹和簡單實用

開發(fā)測試大致流程

個人理解作圖
Flink CDC介紹和簡單實用

使用

mysql開啟binlog

vi /etc/my.cnf 底部追加

server_id=2
log_bin=mysql-bin
binlog_format=ROW
# 下面這行可寫可不寫  監(jiān)控對應(yīng)的數(shù)據(jù)庫
binlog_do_db=elebap_bak

重啟mysqld服務(wù), 并啟動mysql

systemctl restart mysqld
或者
bin/mysqld --initialize --user=root --basedir=/usr/local/mysql --datadir=/data/mysql
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      154 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+

mysql> show variables like '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name                   | Value                          |
+---------------------------------+--------------------------------+
| log_bin                         | ON                             |
| log_bin_basename                | /var/lib/mysql/mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF                            |
| log_bin_use_v1_row_events       | OFF                            |
| sql_log_bin                     | ON                             |
+---------------------------------+--------------------------------+
6 rows in set (0.01 sec)

log_bin顯示ON開啟狀態(tài)。
mysql的建表以及插入數(shù)據(jù):

CREATE TABLE study(
	ID INT NOT NULL PRIMARY KEY AUTO_INCREMENT ,
	NAME VARCHAR(20) NOT NULL,
	AGE INT(10)
);

INSERT INTO study VALUES(1 , 'a' , 10);
INSERT INTO study VALUES(2 , 'b' , 11);
INSERT INTO study VALUES(3 , 'c' , 12);
INSERT INTO study VALUES(4 , 'd' , 13);
INSERT INTO study VALUES(5 , 'e' , 14);
INSERT INTO study VALUES(6 , 'f' , 15);

代碼

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>


import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**
 * @author wyi
 * @date 2022/8/18 11:06
 * @description
 */
public class flinkcdcTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties pops = new Properties();
        pops.setProperty("debezium.snapshot.locking.mode", "none");

        DebeziumSourceFunction<JSONObject> mysqlSource = MySQLSource.<JSONObject>builder()
                .hostname("192.168.80.161")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList(BussinessConstant.DATABASE_LIST)
                .tableList(BussinessConstant.ABLE_LIST_ALARM_CONFIG_CAP_UNBALANCE)
                .deserializer(new TestRuleDeserialization())
                .build();
        SingleOutputStreamOperator<Object> map = env.addSource(mysqlSource).map(new MapFunction<JSONObject, Object>() {
            @Override
            public Object map(JSONObject jsonObject) throws Exception {
                return jsonObject.toString();
            }
        });
        map.print();
        env.execute("CdcMysqlSource");
    }
}

自定義的序列化類

package com.cosmosource.da.cdc;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

/**
 * @author wyi
 * @date 2022/8/18 10:32
 * @description 這是一個demo,測試flink-cdc連接mysql的反序列化類
 */
public class TestRuleDeserialization implements DebeziumDeserializationSchema<JSONObject> {

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<JSONObject> collector) throws Exception {
        //獲取主題
        String topic = sourceRecord.topic();
        String[] arr = topic.split("\\.");
        String db = arr[1];
        String tableName = arr[2];

        System.out.println(arr[1]);
        System.out.println(arr[2]);

        //獲取操作類型 READ DELETE UPDATE CREATE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        //獲取值信息并轉(zhuǎn)換為Struct類型
        Struct value = (Struct) sourceRecord.value();

        System.out.println("value:"+value);
        //獲取變化后的數(shù)據(jù)
        Struct after = value.getStruct("after");

        //創(chuàng)建JSON對象用于存儲數(shù)據(jù)信息
        JSONObject data = new JSONObject();
        for (Field field : after.schema().fields()) {
            Object o = after.get(field);
            data.put(field.name(), o);
        }

        //創(chuàng)建JSON對象用于封裝最終返回值數(shù)據(jù)信息
        JSONObject result = new JSONObject();
        result.put("operation", operation.toString().toLowerCase());
        result.put("data", data);
        result.put("database", db);
        result.put("table", tableName);

        //發(fā)送數(shù)據(jù)至下游
        collector.collect(result);


    }

    @Override
    public TypeInformation<JSONObject> getProducedType() {

        return TypeInformation.of(JSONObject.class);
    }
}

結(jié)果:文章來源地址http://www.zghlxwxcb.cn/news/detail-461224.html

……………………………………
……………………………………
……………………………………

wy
study
value:Struct{after=Struct{ID=9,NAME=1,AGE=15},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=wy,table=study,server_id=0,file=mysql-bin.000001,pos=3128,row=0},op=c,ts_ms=1660793058775}
八月 18, 2022 11:24:19 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.80.161:3306 at mysql-bin.000001/3128 (sid:5501, cid:24)
5> {"database":"wy","data":{"ID":4,"NAME":"d","AGE":13},"operation":"create","table":"study"}
1> {"database":"wy","data":{"ID":8,"NAME":"1","AGE":15},"operation":"create","table":"study"}
8> {"database":"wy","data":{"ID":7,"NAME":"f","AGE":15},"operation":"create","table":"study"}
4> {"database":"wy","data":{"ID":3,"NAME":"c","AGE":12},"operation":"create","table":"study"}
2> {"database":"wy","data":{"ID":1,"NAME":"a","AGE":10},"operation":"create","table":"study"}
2> {"database":"wy","data":{"ID":9,"NAME":"1","AGE":15},"operation":"create","table":"study"}
3> {"database":"wy","data":{"ID":2,"NAME":"b","AGE":11},"operation":"create","table":"study"}
6> {"database":"wy","data":{"ID":5,"NAME":"e","AGE":14},"operation":"create","table":"study"}
7> {"database":"wy","data":{"ID":6,"NAME":"f","AGE":15},"operation":"create","table":"study"}

到了這里,關(guān)于Flink CDC介紹和簡單實用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【Flink-CDC】Flink CDC 介紹和原理概述

    【Flink-CDC】Flink CDC 介紹和原理概述

    CDC是( Change Data Capture 變更數(shù)據(jù)獲取 )的簡稱。 核心思想是, 監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù) 或 數(shù)據(jù)表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進行訂閱及消費。 CDC 主要分為基于查詢和基于

    2024年01月20日
    瀏覽(25)
  • Flink CDC介紹

    Flink CDC介紹

    1.CDC概述 CDC(Change Data Capture)是一種用于捕獲和處理數(shù)據(jù)源中的變化的技術(shù)。它允許實時地監(jiān)視數(shù)據(jù)庫或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動,并將這些變動抽取出來,以便進行進一步的處理和分析。 傳統(tǒng)上,數(shù)據(jù)源的變化通常通過周期性地輪詢整個數(shù)據(jù)集進行檢查來實現(xiàn)。但是,這

    2024年02月11日
    瀏覽(16)
  • Flink CDC介紹及原理

    CDC (Change Data Capture) 是一種用于 捕捉數(shù)據(jù)庫變更數(shù)據(jù) 的技術(shù),F(xiàn)link 從 1.11 版本開始原生支持 CDC 數(shù)據(jù)(changelog)的處理,目前已經(jīng)是非常成熟的變更數(shù)據(jù)處理方案。 Flink CDC Connectors 是 Flink 的一組 Source 連接器,是 Flink CDC 的核心組件,這些連接器負責從? MySQL、PostgreSQL、Ora

    2024年02月11日
    瀏覽(15)
  • flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡單使用

    flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡單使用

    目錄 一、flink cdc介紹 1、什么是flink cdc 2、flink cdc能用來做什么 3、flink cdc的優(yōu)點 二、flink cdc基礎(chǔ)使用 1、使用flink cdc讀取txt文本數(shù)據(jù) 2、DataStream的使用方式 3、SQL的方式 總結(jié) flink cdc是一個由阿里研發(fā)的,一個可以直接從MySQL、PostgreSQL等數(shù)據(jù)庫直接讀取全量數(shù)據(jù)和增量變更數(shù)

    2024年02月13日
    瀏覽(26)
  • 60、Flink CDC 入門介紹及Streaming ELT示例(同步Mysql數(shù)據(jù)庫數(shù)據(jù)到Elasticsearch)-CDC Connector介紹及示例 (1)

    60、Flink CDC 入門介紹及Streaming ELT示例(同步Mysql數(shù)據(jù)庫數(shù)據(jù)到Elasticsearch)-CDC Connector介紹及示例 (1)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月19日
    瀏覽(21)
  • 業(yè)務(wù)數(shù)據(jù)同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    業(yè)務(wù)數(shù)據(jù)同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介紹 Sqoop : SQ L-to-Had oop ( Apache已經(jīng)終止Sqoop項目 ) 用途:把關(guān)系型數(shù)據(jù)庫的數(shù)據(jù)轉(zhuǎn)移到HDFS(Hive、Hbase)(重點使用的場景);Hadoop中的數(shù)據(jù)轉(zhuǎn)移到關(guān)系型數(shù)據(jù)庫中。Sqoop是java語言開發(fā)的,底層使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是數(shù)據(jù)塊的轉(zhuǎn)移,沒有使

    2024年02月15日
    瀏覽(44)
  • SQL server開啟變更數(shù)據(jù)捕獲(CDC)

    SQL server開啟變更數(shù)據(jù)捕獲(CDC)

    多多點贊,會變好看! 多多留言,會變有錢! 變更數(shù)據(jù)捕獲(Change Data Capture ,簡稱 CDC):記錄 SQL Server 表的插入、更新和刪除操作。開啟cdc的源表在插入、更新和刪除操作時會插入數(shù)據(jù)到日志表中。cdc通過捕獲進程將變更數(shù)據(jù)捕獲到變更表中,通過cdc提供的查詢函數(shù),可

    2024年02月11日
    瀏覽(20)
  • Flink作業(yè)任務(wù)的9種狀態(tài)簡單介紹

    Flink作業(yè)任務(wù)的9種狀態(tài)簡單介紹

    ? 當創(chuàng)建一個Flink任務(wù)后,該任務(wù)可能會經(jīng)歷多種狀態(tài)。目前Flink給任務(wù)共定義了9種狀態(tài),包括: Created , Running , Finished , Cancelling , Canceled , Restarting , Failing , Failed , Suspended 。下面這張圖詳細展示了一個Job可能會經(jīng)歷的所有狀態(tài)。 最簡單的一種狀態(tài)就是:作業(yè)啟動

    2024年02月02日
    瀏覽(20)
  • 33、Flink之hive介紹與簡單示例

    33、Flink之hive介紹與簡單示例

    1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接 13、Flink 的table api與sql的基本概念、通用api介紹及入門示例 14、Flink 的table api與sql之數(shù)據(jù)類型: 內(nèi)置數(shù)據(jù)類型以及它們的屬性 15、Flink 的table api與sql之流式概念-詳解的介紹了動

    2024年02月10日
    瀏覽(16)
  • flink如何監(jiān)聽kafka主題配置變更

    flink如何監(jiān)聽kafka主題配置變更

    從前一篇文章我們知道flink消費kafka主題時是采用的手動assign指定分區(qū)的方式,這種消費方式是不處理主題的rebalance操作的,也就是消費者組中即使有消費者退出或者進入也是不會觸發(fā)消費者所消費的分區(qū)的,那么疑問就來了,那是否比如kafka主題分區(qū)變多,或者新增了滿足

    2024年02月14日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包