一、CDC簡介
cdc全稱 Change Data Capture 變更數(shù)據(jù)捕獲。通俗來講只要能捕獲到變更的數(shù)據(jù)的技術都可以稱為cdc。常見的開源技術有以下幾種:
canal:https://github.com/alibaba/canal
maxwell:https://github.com/zendesk/maxwell
Debezium:https://github.com/debezium/debezium
flink-cdc:https://github.com/ververica/flink-cdc-connectors
以下是幾種技術的橫向對比
二、canal+maxwell
兩者實現(xiàn)原理類似。canal模擬mysql主從復制過程,把自己當做從庫。通過dump操作把binlog從主庫讀取到從庫,然后根據(jù)binlog進行數(shù)據(jù)還原。maxwell原理同理。兩者區(qū)別在于maxwell是一款輕量級框架,可拓展性較少。比如它支持處理json格式數(shù)據(jù),并把數(shù)據(jù)發(fā)送到kafka、redis等中。而canal可以自定義數(shù)據(jù)格式,而且并不局限于將數(shù)據(jù)發(fā)送到特定的數(shù)據(jù)存儲介質中。
下面以canal舉例說明:
2.1 安裝canal
安裝很簡單,選擇release版本下載解壓即可:https://github.com/alibaba/canal/releases?page=2。
這里需要說明的是deployer是個人開發(fā)版。
2.2 配置canal
下載解壓完成之后如下:
進入conf/example/instance.porperties中修改如下配置
在canal安裝目錄下的conf下找到canal.properties,主要修改數(shù)據(jù)需要發(fā)送的介質。默認配置是tcp,這個可以自定義將數(shù)據(jù)寫入到其他地方,如果需要將canal采集到的數(shù)據(jù)寫入kafka topic就選擇第二種。
配置完成之后啟動:/bin/startup.sh
2.3 配置mysql binlog
mysql數(shù)據(jù)源需要提前開啟binlog,找到my.cnf配置如下:
然后重啟mysql服務。
登錄mysql執(zhí)行一下,然后查看是on就代表開啟成功
2.4代碼實現(xiàn)
pom依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
import com.alibaba.fastjson.JSON;
import com.alibaba.google.common.base.CaseFormat;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
public class CanalClientApp {
public static void main(String[] args) throws Exception{
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), //本地部署就用localhost,端口默認
"example", null, null);
//設置死循環(huán),一直訪問connector中數(shù)據(jù)
while (true){
connector.connect(); //進行鏈接
connector.subscribe("test.*");//設置需要監(jiān)控的庫表
Message message = connector.get(100);//設置獲取一批數(shù)據(jù)量大小
List<CanalEntry.Entry> entries = message.getEntries(); //獲取一批消息的list集合
if(entries.size() > 0){ //如果list中有數(shù)據(jù)就遍歷取出
for (CanalEntry.Entry entry : entries) {
String tableName = entry.getHeader().getTableName(); //獲取header中請求到的表名
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());//value值轉換成string類型
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
//insert update delete ...
CanalEntry.EventType eventType = rowChange.getEventType();
if(eventType == CanalEntry.EventType.INSERT){
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
HashMap<String, String> map = new HashMap<>();
for (CanalEntry.Column column : afterColumnsList) {
String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
map.put(key, column.getValue());
}
/**
* TODO...
* 到這一步,下面要做的事情就是把map的數(shù)據(jù)發(fā)送到想要的地方去...
*/
System.out.println("tableName:"+ tableName + " , " + JSON.toJSONString(map));
}
}
}
}
}
}
}
mysql寫入數(shù)據(jù):
idea實時打印輸出
三、Debezium+Flink-CDC
Debezium是為Kafka Connect而建的一系列Source Connectors,每個Source Connector會根據(jù)對應數(shù)據(jù)庫特性來捕獲數(shù)據(jù)變更記錄。不像其他方法,例如,輪詢或者雙寫等。Debezium是基于日志進行捕獲變更的。而flink-cdc(1.x版本)和Debezium一脈相承。接下來通過案例簡單了解下flink-cdc的使用方式。
3.1 flink-cdc解析
官方社區(qū)的解析已經很清晰明了,通過兩種方式對比發(fā)現(xiàn),flink-cdc節(jié)省了很大一部分運維成本。傳統(tǒng)etl中flink只負責計算,而flink-cdc將采集計算為一體。當然看到了flink-cdc的優(yōu)勢,也需要了解當前版本(1.x)的局限性。因為flink-cdc底層采用了Debezium框架,數(shù)據(jù)讀取分為全量+增量模式。在全量讀取數(shù)據(jù)的時候為了保證數(shù)據(jù)一致性會加上一個全局鎖,如果數(shù)據(jù)量非常大讀取數(shù)據(jù)會以小時級別計算。切如果在全量讀取階段任務運行失敗是無法進行checkpoint的。
簡單來說,flink-cdc第一階段讀取全量數(shù)據(jù)時默認會加一個全局鎖,會拒絕其他事務提交update操作,這樣可以保證數(shù)據(jù)一致性,但數(shù)據(jù)量特別大時,可能會導致數(shù)據(jù)庫hang住。
于是官方又更新了flink-cdc 2.x版本。這個版本主要解決鎖還有無法checkpoint的問題。主要原理是chunk算法+SourceEnumerator組件實現(xiàn)。
flink-cdc 2.x版本讀取流程如上圖所述,首先根據(jù)chunk算法對binlog進行切片。每個i切片分區(qū)內數(shù)據(jù)不重合。正在讀寫的切片如果有數(shù)據(jù)更新的話,會將更新后的數(shù)據(jù)輸出。從而實現(xiàn)不加鎖的方式下保證數(shù)據(jù)讀的一致性。
3.2 flink-cdc讀取mysql binlog數(shù)據(jù)
實現(xiàn)方式分為兩種:https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-connectors.html#usage-for-datastream-api
Datastream Api
pom依賴里需要添加如下:
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
<scope>provided</scope>
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // 默認json反序列化器,進階版可以用自定義反序列化器
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
Table Api文章來源:http://www.zghlxwxcb.cn/news/detail-794492.html
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkTableCDCApp {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableenv = StreamTableEnvironment.create(env);
env.setParallelism(1);
tableenv.executeSql("create table mysql_bin(" +
"id INT primary key, " +
"name STRING" +
") with(" +
"'connector' = 'mysql-cdc'," +
"'hostname' = 'ip'," +
"'port' = '3306'," +
"'username' = 'yourUsername'," +
"'password' = 'yourPassword'," +
"'database-name' = 'yourDatabaseName'," +
"'table-oname' = 'yourTableName'" +
")"
);
Table table = tableenv.sqlQuery("select * from mysql_bin");
tableenv.toRetractStream(table, Row.class).print();
env.execute();
}
}
這里需要注意的是當前只是本地編譯,沒有提交flink。如果任務上環(huán)境提交運行需要提前將此jar放在FLINK_HOME/lib/目錄下去。
還需要有一點說明的是,我在本地編譯的時候選擇flink-cdc的版本是1.x,如上圖。實際上不同版本的flink-cdc對應的flink版本都不同。以下版本對應關系。文章來源地址http://www.zghlxwxcb.cn/news/detail-794492.html
到了這里,關于Flink學習13-Flink CDC的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!