目錄
一、flink cdc介紹
1、什么是flink cdc
2、flink cdc能用來(lái)做什么
3、flink cdc的優(yōu)點(diǎn)
二、flink cdc基礎(chǔ)使用
1、使用flink cdc讀取txt文本數(shù)據(jù)
2、DataStream的使用方式
3、SQL的方式
總結(jié)
一、flink cdc介紹
1、什么是flink cdc
flink cdc是一個(gè)由阿里研發(fā)的,一個(gè)可以直接從MySQL、PostgreSQL等數(shù)據(jù)庫(kù)直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的source組件。
2、flink cdc能用來(lái)做什么
flink cdc能感知數(shù)據(jù)庫(kù)的所有修改、新增、刪除操作,并以流的形式,進(jìn)行實(shí)時(shí)的觸發(fā)和反饋。如:你想監(jiān)聽(tīng)一個(gè)表的數(shù)據(jù)是否有變動(dòng),并且需要把變動(dòng)的數(shù)據(jù)讀取出來(lái),插入到另外的表里,或者對(duì)該數(shù)據(jù)進(jìn)行其他處理。在我們傳統(tǒng)的開(kāi)發(fā)里,如果不使用cdc技術(shù),是不是就只能通過(guò)定時(shí)任務(wù)去定時(shí)的獲取數(shù)據(jù)?或者在執(zhí)行數(shù)據(jù)修改操作時(shí)調(diào)用指定的接口來(lái)進(jìn)行數(shù)據(jù)上報(bào)?并且還要拿新數(shù)據(jù)和舊數(shù)據(jù)進(jìn)行比較,才能得到自己想要的結(jié)果?flink cdc就是解決這種問(wèn)題的,它是cdc里面的佼佼者,它能在數(shù)據(jù)表被修改時(shí),進(jìn)行實(shí)時(shí)的反饋。
3、flink cdc的優(yōu)點(diǎn)
① 低延遲:毫秒級(jí)的延遲
② 高吞吐:每秒能處理數(shù)百萬(wàn)個(gè)事件
③ 高可用及結(jié)果的準(zhǔn)確性、良好的容錯(cuò)性,動(dòng)態(tài)擴(kuò)展、全天候24小時(shí)運(yùn)行
二、flink cdc基礎(chǔ)使用
1、使用flink cdc讀取txt文本數(shù)據(jù)
① 項(xiàng)目目錄
② 需要用到的flink依賴(有些可以不用的,看實(shí)際需要使用哪些功能):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-parser</artifactId>
<version>1.13.0</version>
</dependency>
③ 具體代碼(TestFlinkController)
package com.bug.controller;
import com.bug.util.flink.TextFlatUtil;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.springframework.web.bind.annotation.*;
/**
* 1、flink讀取本地txt文件數(shù)據(jù)
*/
public class TestFlinkController {
/**
* 1、flink讀取本地txt文件數(shù)據(jù)
* @param args args
*/
public static void main(String[] args) throws Exception {
String path = "D:\\javaprojects\\my_springboot1\\my_springboot1\\src\\main\\resources\\flinkText\\flinkTest.txt";
//創(chuàng)建執(zhí)行環(huán)境
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
//讀取txt文件數(shù)據(jù)
DataSet<String> dataSet = environment.readTextFile(path);
//處理讀取的數(shù)據(jù)
DataSet<Tuple3<String, String, String>> out = dataSet.flatMap(new TextFlatUtil());
//輸出
out.print();
}
}
?TextFlatUtil代碼:
package com.bug.util.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
/**
* 1、flink讀取本地txt文件數(shù)據(jù)
*/
public class TextFlatUtil implements FlatMapFunction<String, Tuple3<String, String, String>> {
@Override
public void flatMap(String value, Collector<Tuple3<String, String, String>> collector) {
for(String word : value.split("\n")){
String[] res = word.split("\t");
collector.collect(new Tuple3<>(res[0],res[1],res[2]));
}
}
}
flinkTest.txt文件值:?
801165935581855745 小明1 年齡1
801165936156475393 小明3 年齡3
801165936567517185 小明5 年齡5
801165936991141889 小明7 年齡7
801165937460903937 小明9 年齡9
④ 輸出效果
2、DataStream的使用方式
① 數(shù)據(jù)庫(kù)修改配置my.cnf文件:binlog_format=row
?② 直接上代碼
package com.bug.flinkcdc;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 2、DataStream的方式
*/
public class TestFlinkStream {
/**
* 2、DataStream的方式
*/
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//線程數(shù)
//開(kāi)啟ck
// env.enableCheckpointing(60*1000);//60秒啟動(dòng)一次checkpoint
// env.getCheckpointConfig().setCheckpointTimeout(30*1000);//設(shè)置超時(shí)時(shí)間,默認(rèn)是10min
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//Checkpoint級(jí)別,EXACTLY_ONCE精準(zhǔn)一次,AT_LEAST_ONCE最多一次
// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//設(shè)置兩次checkpoint的最小時(shí)間間隔
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//允許的最大checkpoint并行度
// env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));//設(shè)置checkpoint的地址
//構(gòu)建sourceFunction環(huán)境,正式開(kāi)發(fā)可以把一些配置提取出來(lái)寫(xiě)成公共配置即可
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("***.***.***.***")//ip地址
.port(***)//端口號(hào)
.username("***")//用戶名
.password("***")//密碼
.databaseList("xiaobug")//數(shù)據(jù)庫(kù)名稱
.tableList("xiaobug.test_flink")//表名稱
.deserializer(new StringDebeziumDeserializationSchema())//反序列化
.startupOptions(StartupOptions.initial())//同步方式,initial全量和增量,latest增量
.build();
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
//數(shù)據(jù)輸出
dataStreamSource.print();
//啟動(dòng)
env.execute();
}
}
③ 效果
?3、SQL的方式
① 數(shù)據(jù)庫(kù)修改配置my.cnf文件:binlog_format=row
?
?② 代碼
package com.bug.flinkcdc;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* 3、SQL的方式
*/
public class TestFlinkSQL {
/**
* 3、SQL的方式
*/
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//線程數(shù)
StreamTableEnvironment tev = StreamTableEnvironment.create(env);
//正式開(kāi)發(fā)時(shí)可以把這些語(yǔ)句做成單獨(dú)的sql文件,更方便管理和維護(hù),with的配置也可以做成公共的,然后讀取即可
tev.executeSql("CREATE TABLE test_flink (" +
" userid String primary key," +
" username String," +
" userAge String," +
" userCardid String" +
" ) with ( " +
" 'connector' = 'mysql-cdc'," + //別名
" 'hostname' = '***.***.***.***'," + //數(shù)據(jù)庫(kù)ip地址
" 'port' = '***'," + //端口號(hào)
" 'username' = '***'," + //用戶名
" 'password' = '***'," + //密碼
" 'database-name' = 'xiaobug'," + //數(shù)據(jù)庫(kù)名稱
" 'table-name' = 'test_flink' " + //表名稱
")");
//查詢數(shù)據(jù)sql,也可以寫(xiě)在單獨(dú)的文件里,然后引用即可,復(fù)雜的連表查詢也是可以的,但需要其他表也進(jìn)行加載
Table table = tev.sqlQuery("select * from test_flink");
//輸出,正式開(kāi)發(fā)可以用sql語(yǔ)句的insert into進(jìn)行插入,直接實(shí)現(xiàn)表到表的同步
DataStream<Tuple2<Boolean, Row>> dataStream = tev.toRetractStream(table,Row.class);
dataStream.print();
//啟動(dòng)
env.execute("FlinkSQLCDC");
}
}
?③ 效果
?
總結(jié)
搞定啦,就是這么簡(jiǎn)單!flinkcdc的進(jìn)階:怎樣確保數(shù)據(jù)的一致性、可靠性、不重復(fù)、不丟失,后面有時(shí)間再寫(xiě)啦。
測(cè)試的時(shí)候還碰到了一個(gè)jar包版本的問(wèn)題,sql的方式一定要使用1.13.0以上的版本,不然會(huì)報(bào)錯(cuò)!
還有flink-sql-parser的這個(gè)包也一定要添加,不然會(huì)出現(xiàn)下面這個(gè)提示:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-644983.html
org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList;文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-644983.html
到了這里,關(guān)于flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡(jiǎn)單使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!