国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡(jiǎn)單使用

這篇具有很好參考價(jià)值的文章主要介紹了flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡(jiǎn)單使用。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

目錄

一、flink cdc介紹

1、什么是flink cdc

2、flink cdc能用來(lái)做什么

3、flink cdc的優(yōu)點(diǎn)

二、flink cdc基礎(chǔ)使用

1、使用flink cdc讀取txt文本數(shù)據(jù)

2、DataStream的使用方式

3、SQL的方式

總結(jié)


一、flink cdc介紹

1、什么是flink cdc

flink cdc是一個(gè)由阿里研發(fā)的,一個(gè)可以直接從MySQL、PostgreSQL等數(shù)據(jù)庫(kù)直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的source組件。

2、flink cdc能用來(lái)做什么

flink cdc能感知數(shù)據(jù)庫(kù)的所有修改、新增、刪除操作,并以流的形式,進(jìn)行實(shí)時(shí)的觸發(fā)和反饋。如:你想監(jiān)聽(tīng)一個(gè)表的數(shù)據(jù)是否有變動(dòng),并且需要把變動(dòng)的數(shù)據(jù)讀取出來(lái),插入到另外的表里,或者對(duì)該數(shù)據(jù)進(jìn)行其他處理。在我們傳統(tǒng)的開(kāi)發(fā)里,如果不使用cdc技術(shù),是不是就只能通過(guò)定時(shí)任務(wù)去定時(shí)的獲取數(shù)據(jù)?或者在執(zhí)行數(shù)據(jù)修改操作時(shí)調(diào)用指定的接口來(lái)進(jìn)行數(shù)據(jù)上報(bào)?并且還要拿新數(shù)據(jù)和舊數(shù)據(jù)進(jìn)行比較,才能得到自己想要的結(jié)果?flink cdc就是解決這種問(wèn)題的,它是cdc里面的佼佼者,它能在數(shù)據(jù)表被修改時(shí),進(jìn)行實(shí)時(shí)的反饋。

3、flink cdc的優(yōu)點(diǎn)

① 低延遲:毫秒級(jí)的延遲

② 高吞吐:每秒能處理數(shù)百萬(wàn)個(gè)事件

③ 高可用及結(jié)果的準(zhǔn)確性、良好的容錯(cuò)性,動(dòng)態(tài)擴(kuò)展、全天候24小時(shí)運(yùn)行

二、flink cdc基礎(chǔ)使用

1、使用flink cdc讀取txt文本數(shù)據(jù)

① 項(xiàng)目目錄

flink cdc sql,數(shù)據(jù)庫(kù),java,flink

② 需要用到的flink依賴(有些可以不用的,看實(shí)際需要使用哪些功能):

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-parser</artifactId>
            <version>1.13.0</version>
        </dependency>

③ 具體代碼(TestFlinkController)

package com.bug.controller;

import com.bug.util.flink.TextFlatUtil;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.springframework.web.bind.annotation.*;

/**
 * 1、flink讀取本地txt文件數(shù)據(jù)
 */
public class TestFlinkController {

    /**
     * 1、flink讀取本地txt文件數(shù)據(jù)
     * @param args args
     */
    public static void main(String[] args) throws Exception {
        String path = "D:\\javaprojects\\my_springboot1\\my_springboot1\\src\\main\\resources\\flinkText\\flinkTest.txt";
        //創(chuàng)建執(zhí)行環(huán)境
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        //讀取txt文件數(shù)據(jù)
        DataSet<String> dataSet = environment.readTextFile(path);
        //處理讀取的數(shù)據(jù)
        DataSet<Tuple3<String, String, String>> out = dataSet.flatMap(new TextFlatUtil());
        //輸出
        out.print();
    }

}

?TextFlatUtil代碼:

package com.bug.util.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;

/**
 * 1、flink讀取本地txt文件數(shù)據(jù)
 */
public class TextFlatUtil implements FlatMapFunction<String, Tuple3<String, String, String>> {

    @Override
    public void flatMap(String value, Collector<Tuple3<String, String, String>> collector) {
        for(String word : value.split("\n")){
            String[] res = word.split("\t");
            collector.collect(new Tuple3<>(res[0],res[1],res[2]));
        }
    }
}

flinkTest.txt文件值:?

801165935581855745	小明1	年齡1
801165936156475393	小明3	年齡3
801165936567517185	小明5	年齡5
801165936991141889	小明7	年齡7
801165937460903937	小明9	年齡9

④ 輸出效果

flink cdc sql,數(shù)據(jù)庫(kù),java,flink

2、DataStream的使用方式

① 數(shù)據(jù)庫(kù)修改配置my.cnf文件:binlog_format=row

flink cdc sql,數(shù)據(jù)庫(kù),java,flink

?② 直接上代碼

package com.bug.flinkcdc;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 2、DataStream的方式
 */
public class TestFlinkStream {
    /**
     * 2、DataStream的方式
     */
    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//線程數(shù)
        //開(kāi)啟ck
//        env.enableCheckpointing(60*1000);//60秒啟動(dòng)一次checkpoint
//        env.getCheckpointConfig().setCheckpointTimeout(30*1000);//設(shè)置超時(shí)時(shí)間,默認(rèn)是10min
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//Checkpoint級(jí)別,EXACTLY_ONCE精準(zhǔn)一次,AT_LEAST_ONCE最多一次
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//設(shè)置兩次checkpoint的最小時(shí)間間隔
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//允許的最大checkpoint并行度
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));//設(shè)置checkpoint的地址
        //構(gòu)建sourceFunction環(huán)境,正式開(kāi)發(fā)可以把一些配置提取出來(lái)寫(xiě)成公共配置即可
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("***.***.***.***")//ip地址
                .port(***)//端口號(hào)
                .username("***")//用戶名
                .password("***")//密碼
                .databaseList("xiaobug")//數(shù)據(jù)庫(kù)名稱
                .tableList("xiaobug.test_flink")//表名稱
                .deserializer(new StringDebeziumDeserializationSchema())//反序列化
                .startupOptions(StartupOptions.initial())//同步方式,initial全量和增量,latest增量
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
        //數(shù)據(jù)輸出
        dataStreamSource.print();
        //啟動(dòng)
        env.execute();
    }

}

③ 效果

flink cdc sql,數(shù)據(jù)庫(kù),java,flink

?3、SQL的方式

① 數(shù)據(jù)庫(kù)修改配置my.cnf文件:binlog_format=row

?flink cdc sql,數(shù)據(jù)庫(kù),java,flink

?② 代碼

package com.bug.flinkcdc;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 3、SQL的方式
 */
public class TestFlinkSQL {
    /**
     * 3、SQL的方式
     */
    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//線程數(shù)
        StreamTableEnvironment tev = StreamTableEnvironment.create(env);
        //正式開(kāi)發(fā)時(shí)可以把這些語(yǔ)句做成單獨(dú)的sql文件,更方便管理和維護(hù),with的配置也可以做成公共的,然后讀取即可
        tev.executeSql("CREATE TABLE test_flink (" +
                " userid String primary key," +
                " username String," +
                " userAge String," +
                " userCardid String" +
                " ) with ( " +
                " 'connector' = 'mysql-cdc'," +     //別名
                " 'hostname' = '***.***.***.***'," +  //數(shù)據(jù)庫(kù)ip地址
                " 'port' = '***'," +               //端口號(hào)
                " 'username' = '***'," +           //用戶名
                " 'password' = '***'," +           //密碼
                " 'database-name' = 'xiaobug'," +   //數(shù)據(jù)庫(kù)名稱
                " 'table-name' = 'test_flink' " +   //表名稱
                ")");
        //查詢數(shù)據(jù)sql,也可以寫(xiě)在單獨(dú)的文件里,然后引用即可,復(fù)雜的連表查詢也是可以的,但需要其他表也進(jìn)行加載
        Table table = tev.sqlQuery("select * from test_flink");
        //輸出,正式開(kāi)發(fā)可以用sql語(yǔ)句的insert into進(jìn)行插入,直接實(shí)現(xiàn)表到表的同步
        DataStream<Tuple2<Boolean, Row>> dataStream = tev.toRetractStream(table,Row.class);
        dataStream.print();
        //啟動(dòng)
        env.execute("FlinkSQLCDC");
    }

}

?③ 效果

flink cdc sql,數(shù)據(jù)庫(kù),java,flink

?

總結(jié)

搞定啦,就是這么簡(jiǎn)單!flinkcdc的進(jìn)階:怎樣確保數(shù)據(jù)的一致性、可靠性、不重復(fù)、不丟失,后面有時(shí)間再寫(xiě)啦。

測(cè)試的時(shí)候還碰到了一個(gè)jar包版本的問(wèn)題,sql的方式一定要使用1.13.0以上的版本,不然會(huì)報(bào)錯(cuò)!

還有flink-sql-parser的這個(gè)包也一定要添加,不然會(huì)出現(xiàn)下面這個(gè)提示:

org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList;文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-644983.html

到了這里,關(guān)于flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡(jiǎn)單使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 使用Flink CDC將Mysql中的數(shù)據(jù)實(shí)時(shí)同步到ES

    最近公司要搞搜索,需要把mysql中的數(shù)據(jù)同步到es中來(lái)進(jìn)行搜索,由于公司已經(jīng)搭建了flink集群,就打算用flink來(lái)做這個(gè)同步。本來(lái)以為很簡(jiǎn)單,跟著官網(wǎng)文檔走就好了,結(jié)果沒(méi)想到折騰了將近一周的時(shí)間…… 我也是沒(méi)想到,這玩意網(wǎng)上資源竟然這么少,找到的全部都是通過(guò)

    2024年02月11日
    瀏覽(25)
  • Flink CDC-Oracle CDC配置及DataStream API實(shí)現(xiàn)代碼...可實(shí)現(xiàn)監(jiān)控采集一個(gè)數(shù)據(jù)庫(kù)的多個(gè)表

    使用sysdba角色登錄到Oracle數(shù)據(jù)庫(kù) 確保Oracle歸檔日志(Archive Log)已啟用 若未啟用歸檔日志, 需運(yùn)行以下命令啟用歸檔日志 設(shè)置歸檔日志存儲(chǔ)大小及位置 設(shè)置數(shù)據(jù)庫(kù)恢復(fù)文件存儲(chǔ)區(qū)域的大小(如歸檔重做日志文件、控制文件備份等) 設(shè)置恢復(fù)文件的實(shí)際物理存儲(chǔ)路徑;scope=spfile參數(shù)

    2024年02月05日
    瀏覽(25)
  • 業(yè)務(wù)數(shù)據(jù)同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    業(yè)務(wù)數(shù)據(jù)同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介紹 Sqoop : SQ L-to-Had oop ( Apache已經(jīng)終止Sqoop項(xiàng)目 ) 用途:把關(guān)系型數(shù)據(jù)庫(kù)的數(shù)據(jù)轉(zhuǎn)移到HDFS(Hive、Hbase)(重點(diǎn)使用的場(chǎng)景);Hadoop中的數(shù)據(jù)轉(zhuǎn)移到關(guān)系型數(shù)據(jù)庫(kù)中。Sqoop是java語(yǔ)言開(kāi)發(fā)的,底層使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是數(shù)據(jù)塊的轉(zhuǎn)移,沒(méi)有使

    2024年02月15日
    瀏覽(44)
  • flink cdc DataStream api 時(shí)區(qū)問(wèn)題

    flink cdc DataStream api 時(shí)區(qū)問(wèn)題

    以postgrsql 作為數(shù)據(jù)源時(shí),Date和timesatmp等類型cdc同步讀出來(lái)時(shí),會(huì)發(fā)現(xiàn)一下幾個(gè)問(wèn)題: 源表: sink 表: 解決方案:在自定義序列化時(shí)進(jìn)行處理。 java code scala code mysql cdc也會(huì)出現(xiàn)上述時(shí)區(qū)問(wèn)題,Debezium默認(rèn)將MySQL中datetime類型轉(zhuǎn)成UTC的時(shí)間戳({@link io.debezium.time.Timestamp}),時(shí)區(qū)是

    2024年02月07日
    瀏覽(32)
  • Flink CDC數(shù)據(jù)同步

    Flink CDC數(shù)據(jù)同步

    一、什么是FLink Apache?Flink?是一個(gè)框架和分布式處理引擎,用于在無(wú)邊界和有邊界數(shù)據(jù)流上進(jìn)行有狀態(tài)的計(jì)算。Flink?能在所有常見(jiàn)集群環(huán)境中運(yùn)行,并能以內(nèi)存速度和任意規(guī)模進(jìn)行計(jì)算。 接下來(lái),我們來(lái)介紹一下?Flink?架構(gòu)中的重要方面。 任何類型的數(shù)據(jù)都可以形成一種事

    2024年02月08日
    瀏覽(20)
  • 基于Flink CDC實(shí)時(shí)同步PostgreSQL與Tidb【Flink SQL Client模式下親測(cè)可行,詳細(xì)教程】

    操作系統(tǒng):ubuntu-22.04,運(yùn)行于wsl 2【 注意,請(qǐng)務(wù)必使用wsl 2 ;wsl 1會(huì)出現(xiàn)各種各樣的問(wèn)題】 軟件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳過(guò)此步 (1)pg安裝 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出現(xiàn)的問(wèn)題 sudo -u postgres psql 報(bào)錯(cuò): psql: err

    2024年02月11日
    瀏覽(30)
  • 【實(shí)戰(zhàn)-01】flink cdc 實(shí)時(shí)數(shù)據(jù)同步利器

    【實(shí)戰(zhàn)-01】flink cdc 實(shí)時(shí)數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對(duì)很多初入門的人來(lái)說(shuō)是無(wú)法理解cdc到底是什么個(gè)東西。 有這樣一個(gè)需求,比如在mysql數(shù)據(jù)庫(kù)中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉(cāng)庫(kù)(starrocks), 數(shù)據(jù)倉(cāng)庫(kù)你可以理解為存儲(chǔ)了各種各樣來(lái)自不同數(shù)據(jù)庫(kù)中表。 數(shù)據(jù)的同步目前對(duì)

    2023年04月08日
    瀏覽(94)
  • Flink CDC實(shí)時(shí)同步PG數(shù)據(jù)庫(kù)

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git? 1、更改配置文件postgresql.conf # 更改wal日志方式為logical wal_level = logical # minimal, replica, or logical # 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個(gè)slots max_replication_slots = 20 # m

    2024年02月13日
    瀏覽(35)
  • 【FLINK】Kafka數(shù)據(jù)源通過(guò)Flink-cdc進(jìn)行實(shí)時(shí)數(shù)據(jù)同步

    【FLINK】Kafka數(shù)據(jù)源通過(guò)Flink-cdc進(jìn)行實(shí)時(shí)數(shù)據(jù)同步

    CDC是Change Data Capture的縮寫(xiě),中文意思是 變更數(shù)據(jù)獲取 ,flink-cdc的作用是,通過(guò)flink捕獲數(shù)據(jù)源的事務(wù)變動(dòng)操作記錄,包括數(shù)據(jù)的增刪改操作等,根據(jù)這些記錄可作用于對(duì)目標(biāo)端進(jìn)行實(shí)時(shí)數(shù)據(jù)同步。 下圖是flink-cdc最新支持的數(shù)據(jù)源類型: kafka的數(shù)據(jù)源要通過(guò)flink-cdc進(jìn)行實(shí)時(shí)數(shù)

    2024年02月12日
    瀏覽(36)
  • 實(shí)戰(zhàn):大數(shù)據(jù)Flink CDC同步Mysql數(shù)據(jù)到ElasticSearch

    實(shí)戰(zhàn):大數(shù)據(jù)Flink CDC同步Mysql數(shù)據(jù)到ElasticSearch

    前面的博文我們分享了大數(shù)據(jù)分布式流處理計(jì)算框架Flink和其基礎(chǔ)環(huán)境的搭建,相信各位看官都已經(jīng)搭建好了自己的運(yùn)行環(huán)境。那么,今天就來(lái)實(shí)戰(zhàn)一把使用Flink CDC同步Mysql數(shù)據(jù)導(dǎo)Elasticsearch。 CDC簡(jiǎn)介 CDC 的全稱是 Change Data Capture(變更數(shù)據(jù)捕獲技術(shù)) ,在廣義的概念上,只要

    2024年02月09日
    瀏覽(22)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包