一、背景
隨著公司的業(yè)務(wù)量越來越大,查詢需求越來越復(fù)雜,mysql已經(jīng)不支持變化多樣的復(fù)雜查詢了。
于是,使用cdc捕獲MySQL的數(shù)據(jù)變化,同步到ES中,進(jìn)行數(shù)據(jù)的檢索。
一、環(huán)境準(zhǔn)備
1、創(chuàng)建ES索引
// 創(chuàng)建索引并指定映射
PUT /course
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text"
},
"label": {
"type": "text"
},
"content": {
"type": "text"
}
}
}
}
// 查詢course下所有數(shù)據(jù)(備用)
GET /course/_search
// 刪除索引及數(shù)據(jù)(備用)
DELETE /course
2、創(chuàng)建mysql數(shù)據(jù)表
CREATE TABLE `course` (
`id` varchar(32) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`label` varchar(255) DEFAULT NULL,
`content` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
二、使用FlinkCDC同步數(shù)據(jù)
1、導(dǎo)包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.18.0</version>
</dependency>
2、demo
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* cdc
*/
public class CDCTest {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("192.168.56.10")
.port(3306)
.databaseList("mytest")
.tableList("mytest.course")
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟檢查點(diǎn)
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// 1個并行任務(wù)
.setParallelism(1)
.addSink(new RichSinkFunction<String>() {
private final static ElasticSearchUtil es = new ElasticSearchUtil("192.168.56.10");
@Override
public void invoke(String value, Context context) throws Exception {
super.invoke(value, context);
JSONObject jsonObject = JSON.parseObject(value);
DataInfo dataInfo = new DataInfo();
dataInfo.setOp(jsonObject.getString("op"));
dataInfo.setBefore(jsonObject.getJSONObject("before"));
dataInfo.setAfter(jsonObject.getJSONObject("after"));
dataInfo.setDb(jsonObject.getJSONObject("source").getString("db"));
dataInfo.setTable(jsonObject.getJSONObject("source").getString("table"));
if (dataInfo.getDb().equals("mytest") && dataInfo.getTable().equals("course")) {
String id = dataInfo.getAfter().get("id").toString();
if(dataInfo.getOp().equals("d")) {
es.deleteById("course", id);
} else {
es.put(dataInfo.getAfter(), "course", id);
}
}
}
})
.setParallelism(1); // 對接收器使用并行性1來保持消息順序
env.execute("Print MySQL Snapshot + Binlog");
}
}
```java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.Map;
/**
* 收集的數(shù)據(jù)類型
* @author cuixiangfei
* @since 20234-03-20
*/
public class DataInfo {
// 操作 c是create;u是update;d是delete;r是read
private String op;
private String db;
private String table;
private Map<String, Object> before;
private Map<String, Object> after;
public String getOp() {
return op;
}
public void setOp(String op) {
this.op = op;
}
public String getDb() {
return db;
}
public void setDb(String db) {
this.db = db;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public Map<String, Object> getBefore() {
return before;
}
public void setBefore(Map<String, Object> before) {
this.before = before;
}
public Map<String, Object> getAfter() {
return after;
}
public void setAfter(Map<String, Object> after) {
this.after = after;
}
public boolean checkOpt() {
if (this.op.equals("r")) {
return false;
}
return true;
}
@Override
public String toString() {
return "DataInfo{" +
"op='" + op + '\'' +
", db='" + db + '\'' +
", table='" + table + '\'' +
", before=" + before +
", after=" + after +
'}';
}
public static void main(String[] args) {
String value = "{\"before\":{\"id\":\"333\",\"name\":\"333\",\"label\":\"333\",\"content\":\"3333\"},\"after\":{\"id\":\"333\",\"name\":\"33322\",\"label\":\"333\",\"content\":\"3333\"},\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710923957000,\"snapshot\":\"false\",\"db\":\"mytest\",\"sequence\":null,\"table\":\"course\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000008\",\"pos\":1318,\"row\":0,\"thread\":9,\"query\":null},\"op\":\"u\",\"ts_ms\":1710923957825,\"transaction\":null}";
JSONObject jsonObject = JSON.parseObject(value);
System.out.println(jsonObject.get("op"));
System.out.println(jsonObject.get("before"));
System.out.println(jsonObject.get("after"));
System.out.println(jsonObject.getJSONObject("source").get("db"));
System.out.println(jsonObject.getJSONObject("source").get("table"));
}
}
3、es工具類
springboot集成elasticSearch(附帶工具類)文章來源:http://www.zghlxwxcb.cn/news/detail-850257.html
三、測試
1、先創(chuàng)建幾條數(shù)據(jù)
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('1', '11', '111', '1111');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('2', '22 33', '222 333', '2222 3333');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('3', '33 44', '33 444', '3333 4444');
2、啟動cdc
3、查詢es
文章來源地址http://www.zghlxwxcb.cn/news/detail-850257.html
4、增刪改幾條數(shù)據(jù)進(jìn)行測驗(yàn)
到了這里,關(guān)于使用FlinkCDC從mysql同步數(shù)據(jù)到ES,并實(shí)現(xiàn)數(shù)據(jù)檢索的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!