整體思路:
1、使用?io.debezium.connector.mysql.MySqlConnector 自動同步數(shù)據(jù)到kafka消息隊(duì)列
2、通過listener監(jiān)聽消息隊(duì)列,代碼控制數(shù)據(jù)插入es
ps:其實(shí)有更簡單的方式:在此基礎(chǔ)上使用ElasticsearchSinkConnector、ksql,完成數(shù)據(jù)的轉(zhuǎn)換與自動同步es,全程無需代碼控制,后續(xù)本地跑通流程后再來記錄
一、連接器的下載與配置
下載debezium mysql connector
在kafka中建立connect文件夾,并解壓連接器
在kafka/config下的connect-distributed.properties文件中,修改plugin.path=連接器地址
啟動連接器:
bin/connect-distributed.sh -daemon config/connect-distributed.properties
postman查詢連接器是否配置成功
http://localhost:8083/connector-plugins
如果返回連接器,則表示配置成功
[
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "2.1.2.Final"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.3.2"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.3.2"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.3.2"
}
]
二、創(chuàng)建同步連接器實(shí)例
post請求地址:
http://localhost:8083/connectors
請求體:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1", // 數(shù)據(jù)庫ip
"database.port": "3306",
"database.user": "root", // 數(shù)據(jù)庫登陸用戶名
"database.password": "123456", // 登陸密碼
"database.server.id": "2",
"database.server.name": "hc",
"database.include.list": "store", // 需要同步的庫
"table.include.list": "store.product", // 需要同步的表
"database.history.kafka.bootstrap.servers": "localhost:9092", // kafka地址
"database.history.kafka.topic": "schema-changes-inventory",
"topic.prefix": "pro",
"include.schema.changes": "true",
"transforms": "unwrap,Cast",
"transforms.Cast.type":
"org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "amount:float64,unit_price:float64",
"transforms.unwrap.type":
"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
查看是否建立成功:
get請求:
http://localhost:8083/connectors
返回結(jié)果:
[
"mysql-connector"
]
三、代碼里監(jiān)聽消息
@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class HcCustomerListener {
private final EsSearchService esSearchService;
private final IEsRepository esRepository;
private final String INDEX = "product";
/**
* 監(jiān)聽產(chǎn)品表
* @param record
*/
@KafkaListener(topics = "test.store.product")
public void onMessage(ConsumerRecord<String, String> record) {
String kafkaMessage = record.value();
if (StrUtil.isBlank(kafkaMessage)) {
return;
}
// 檢查索引是否存在,沒有則新建
if (!esRepository.checkIndex(INDEX)) {
if (!esRepository.createIndex(INDEX)) {
log.error("建立索引失?。∷饕? + INDEX);
}
}
// 數(shù)據(jù)轉(zhuǎn)換為要存儲的對象
Product item = JSONObject.toJavaObject(JSONObject.parseObject(kafkaMessage), Product.class);
// 數(shù)據(jù)同步
if (!esRepository.dataSync(Product, INDEX, QueryBuilders.termQuery("code", Product.getCode()))) {
log.error("產(chǎn)品信息同步es失?。‘a(chǎn)品編號:" + customer.getCode());
}
}
}
ps:關(guān)于數(shù)據(jù)存儲es,數(shù)據(jù)查詢es的具體方法,會寫一篇專門的文章記錄
中間也遇到了一些坑,比如建connector的時(shí)候一直報(bào)錯(cuò)缺少什么值,最后把jdk1.8改到j(luò)dk17就好了文章來源:http://www.zghlxwxcb.cn/news/detail-721507.html
比如同步數(shù)據(jù)一直報(bào)數(shù)據(jù)轉(zhuǎn)換錯(cuò)誤,才發(fā)現(xiàn)bigdecimal類型的字段需要在建connector時(shí)顯示的去做轉(zhuǎn)換:"transforms.Cast.spec": "unit_price:float64,amount:float64"這樣就行文章來源地址http://www.zghlxwxcb.cn/news/detail-721507.html
到了這里,關(guān)于通過kafka connector實(shí)現(xiàn)mysql數(shù)據(jù)自動同步es的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!