前言
前面的博文我們分享了大數據分布式流處理計算框架Flink和其基礎環(huán)境的搭建,相信各位看官都已經搭建好了自己的運行環(huán)境。那么,今天就來實戰(zhàn)一把使用Flink CDC同步Mysql數據導Elasticsearch。
知識積累
CDC簡介
CDC 的全稱是 Change Data Capture(變更數據捕獲技術) ,在廣義的概念上,只要是能捕獲數據變更的技術,我們都可以稱之為 CDC 。目前通常描述的 CDC 技術主要面向數據庫的變更,是一種用于捕獲數據庫中數據變更的技術。
CDC的種類
CDC 的技術方案非常多,目前業(yè)界主流的實現機制可以分為兩種:
基于查詢的 CDC:
◆離線調度查詢作業(yè),批處理。把一張表同步到其他系統(tǒng),每次通過查詢去獲取表中最新的數據;
◆無法保障數據一致性,查的過程中有可能數據已經發(fā)生了多次變更;
◆不保障實時性,基于離線調度存在天然的延遲。
基于日志的 CDC:
◆實時消費日志,流處理,例如 MySQL 的 binlog 日志完整記錄了數據庫中的變更,可以把 binlog 文件當作流的數據源;
◆保障數據一致性,因為 binlog 文件包含了所有歷史變更明細;
◆保障實時性,因為類似 binlog 的日志文件是可以流式消費的,提供的是實時數據。
常見的CDC方案比較
Springboot接入Flink CDC
由于Flink官方提供了Java、Scala、Python語言接口用以開發(fā)Flink應用程序,故我們可以直接用Maven引入Flink依賴進行功能實現。
環(huán)境準備
1、SpringBoot 2.4.3
2、Flink 1.13.6
3、Scala 2.11
4、Maven 3.6.3
5、Java 8
6、mysql 8
7、es 7
Springboot、Flink、Scala版本一定要相匹配,也可以嚴格按照本博客進行配置。
注意:
如果只是本機測試玩玩,Maven依賴已經整合計算環(huán)境,不用額外搭建Flink環(huán)境;如果需要部署到Flink集群則需要額外搭建Flink集群。另外Scala 版本只是用于依賴選擇,不用關心Scala環(huán)境。
項目搭建
1、引入Flink CDC Maven依賴
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>flink-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>flink-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<!-- Flink CDC connector for MySQL -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--
Flink CDC connector for ES
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7_2.11
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2、創(chuàng)建測試數據庫表users
users表結構
CREATE TABLE `users` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` varchar(50) NOT NULL COMMENT '名稱',
`birthday` timestamp NULL DEFAULT NULL COMMENT '生日',
`ts` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用戶';
3、es索引操作
es操作命令
es索引會自動創(chuàng)建
#設置es分片與副本
curl -X PUT "10.10.22.174:9200/users" -u elastic:VaHcSC3mOFfovLWTqW6E -H 'Content-Type: application/json' -d'
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 2
}
}'
#查詢index下全部數據
curl -X GET "http://10.10.22.174:9200/users/_search" -u elastic:VaHcSC3mOFfovLWTqW6E -H 'Content-Type: application/json'
#刪除index
curl -X DELETE "10.10.22.174:9200/users" -u elastic:VaHcSC3mOFfovLWTqW6E
本地運行
@SpringBootTest
class FlinkDemoApplicationTests {
/**
* flinkCDC
* mysql to es
* @author senfel
* @date 2023/8/22 14:37
* @return void
*/
@Test
void flinkCDC() throws Exception{
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
//.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 數據源表
String sourceDDL =
"CREATE TABLE users (\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '10.10.10.202',\n" +
" 'port' = '6456',\n" +
" 'username' = 'root',\n" +
" 'password' = 'MyNewPass2021',\n" +
" 'server-time-zone' = 'Asia/Shanghai',\n" +
" 'database-name' = 'cdc',\n" +
" 'table-name' = 'users'\n" +
" )";
// 輸出目標表
String sinkDDL =
"CREATE TABLE users_sink_es\n" +
"(\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") \n" +
"WITH (\n" +
" 'connector' = 'elasticsearch-7',\n" +
" 'hosts' = 'http://10.10.22.174:9200',\n" +
" 'index' = 'users',\n" +
" 'username' = 'elastic',\n" +
" 'password' = 'VaHcSC3mOFfovLWTqW6E'\n" +
")";
// 簡單的聚合處理
String transformSQL = "INSERT INTO users_sink_es SELECT * FROM users";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
result.print();
env.execute("mysql-to-es");
}
請求es用戶索引發(fā)現并無數據:
[root@bluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’
{“took”:0,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:0,“relation”:“eq”},“max_score”:null,“hits”:[]}}
操作mysql數據庫新增多條數據
5 senfel 2023-08-30 15:02:28 2023-08-30 15:02:36
6 sebfel2 2023-08-30 15:02:43 2023-08-30 15:02:47
再次獲取es用戶索引查看數據
[root@bluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’
{“took”:67,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:2,“relation”:“eq”},“max_score”:1.0,“hits”:[{“_index”:“users”,“_type”:“_doc”,“_id”:“5”,“_score”:1.0,“_source”:{“id”:5,“name”:“senfel”,“birthday”:“2023-08-30 15:02:28”,“ts”:“2023-08-30 15:02:36”}},{“_index”:“users”,“_type”:“_doc”,“_id”:“6”,“_score”:1.0,“_source”:{“id”:6,“name”:“sebfel2”,“birthday”:“2023-08-30 15:02:43”,“ts”:“2023-08-30 15:02:47”}}]}}
由上測試結果可知本地運行無異常。
集群運行
項目樹:
1、創(chuàng)建集群運行代碼邏輯
/**
* FlinkMysqlToEs
* @author senfel
* @version 1.0
* @date 2023/8/22 14:56
*/
public class FlinkMysqlToEs {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
//.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 數據源表
String sourceDDL =
"CREATE TABLE users (\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '10.10.10.202',\n" +
" 'port' = '6456',\n" +
" 'username' = 'root',\n" +
" 'password' = 'MyNewPass2021',\n" +
" 'server-time-zone' = 'Asia/Shanghai',\n" +
" 'database-name' = 'cdc',\n" +
" 'table-name' = 'users'\n" +
" )";
// 輸出目標表
String sinkDDL =
"CREATE TABLE users_sink_es\n" +
"(\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") \n" +
"WITH (\n" +
" 'connector' = 'elasticsearch-7',\n" +
" 'hosts' = 'http://10.10.22.174:9200',\n" +
" 'index' = 'users',\n" +
" 'username' = 'elastic',\n" +
" 'password' = 'VaHcSC3mOFfovLWTqW6E'\n" +
")";
// 簡單的聚合處理
String transformSQL = "INSERT INTO users_sink_es SELECT * FROM users";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
result.print();
env.execute("mysql-to-es");
}
}
2、集群運行需要將Flink程序打包,不同于普通的jar包,這里必須采用shade
<build>
<finalName>flink-demo</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.flinkdemo.FlinkMysqlToEs</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
將項目打包將包傳入集群啟動
1、項目打包
mvn package -Dmaven.test.skip=true
2、手動上傳到服務器拷貝如集群內部運行:
/opt/flink/bin# ./flink run …/flink-demo.jar
3、測試操作mysql數據庫
刪除id =6只剩下id=5的用戶
5 senfel000 2023-08-30 15:02:28 2023-08-30 15:02:36
4、查詢es用戶索引
[root@bluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’
{“took”:931,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:1,“relation”:“eq”},“max_score”:1.0,“hits”:[{“_index”:“users”,“_type”:“_doc”,“_id”:“5”,“_score”:1.0,“_source”:{“id”:5,“name”:“senfel”,“birthday”:“2023-08-30 15:02:28”,“ts”:“2023-08-30 15:02:36”}}]}}[
如上所示es中只剩下了id==5的數據;
經測試手動部署到集群環(huán)境成功。
遠程將包部署到flink集群
1、新增controller觸發(fā)接口
/**
* remote runTask
* @author senfel
* @date 2023/8/30 16:57
* @return org.apache.flink.api.common.JobID
*/
@GetMapping("/runTask")
public JobID runTask() {
try {
// 集群信息
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.ADDRESS, "10.10.22.91");
configuration.setInteger(JobManagerOptions.PORT, 6123);
configuration.setInteger(RestOptions.PORT, 8081);
RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
//jar包存放路徑,也可以直接調用hdfs中的jar
File jarFile = new File("input/flink-demo.jar");
SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
//構建提交任務參數
PackagedProgram program = PackagedProgram
.newBuilder()
.setConfiguration(configuration)
.setEntryPointClassName("com.example.flinkdemo.FlinkMysqlToEs")
.setJarFile(jarFile)
.setSavepointRestoreSettings(savepointRestoreSettings).build();
//創(chuàng)建任務
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, 1, false);
//提交任務
CompletableFuture<JobID> result = client.submitJob(jobGraph);
return result.get();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
2、啟動Springboot項目
3、postman請求
4、查看Fink集群控制臺
由上圖所示已將遠程部署完成。
5、測試操作mysql數據庫
5 senfel000 2023-08-30 15:02:28 2023-08-30 15:02:36
7 eeeee 2023-08-30 17:12:00 2023-08-30 17:12:04
8 33333 2023-08-30 17:12:08 2023-08-30 17:12:11
6、查詢es用戶索引
[root@bluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’
{“took”:766,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:3,“relation”:“eq”},“max_score”:1.0,“hits”:[{“_index”:“users”,“_type”:“_doc”,“_id”:“5”,“_score”:1.0,“_source”:{“id”:5,“name”:“senfel000”,“birthday”:“2023-08-30 15:02:28”,“ts”:“2023-08-30 15:02:36”}},{“_index”:“users”,“_type”:“_doc”,“_id”:“7”,“_score”:1.0,“_source”:{“id”:7,“name”:“eeeee”,“birthday”:“2023-08-30 17:12:00”,“ts”:“2023-08-30 17:12:04”}},{“_index”:“users”,“_type”:“_doc”,“_id”:“8”,“_score”:1.0,“_source”:{“id”:8,“name”:“33333”,“birthday”:“2023-08-30 17:12:08”,“ts”:“2023-08-30 17:12:11”}}]}}
如上所以es中新增了兩條數據;
經測試遠程發(fā)布Flink Task完成。文章來源:http://www.zghlxwxcb.cn/news/detail-698983.html
寫在最后
大數據Flink CDC同步Mysql數據到ElasticSearch搭建與測試運行較為簡單,對于基礎的學習測試環(huán)境獨立集群目前只支持單個任務部署,如果需要多個任務或者運用于生產可以采用Yarn與Job分離模式進行部署。文章來源地址http://www.zghlxwxcb.cn/news/detail-698983.html
到了這里,關于實戰(zhàn):大數據Flink CDC同步Mysql數據到ElasticSearch的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!