国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

利用MQ實(shí)現(xiàn)mysql與elasticsearch數(shù)據(jù)同步

這篇具有很好參考價(jià)值的文章主要介紹了利用MQ實(shí)現(xiàn)mysql與elasticsearch數(shù)據(jù)同步。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

流程

1.聲明exchange、queue、RoutingKey
2. 在hotel-admin中進(jìn)行增刪改(SQL),完成消息發(fā)送
3. 在hotel-demo中完成消息監(jiān)聽(tīng),并更新elasticsearch數(shù)據(jù)
4. 測(cè)試同步

利用MQ實(shí)現(xiàn)mysql與elasticsearch數(shù)據(jù)同步,JAVA微服務(wù),mysql,elasticsearch,java

1.引入依賴(lài)

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

我這里的mq是掛在了docker上,虛擬機(jī)地址是192.168.116.128。到時(shí)候這個(gè)根據(jù)自己的項(xiàng)目改就行

 spring: 
  rabbitmq:
    host: 192.168.116.128 # 主機(jī)名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機(jī)
    username: itcast # 用戶(hù)名
    password: 123321 # 密碼

2.聲明交換機(jī)、隊(duì)列和綁定關(guān)系

package cn.itcast.hotel.constants;

public class MqConstants {

    /**
     * 交換機(jī)
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * 監(jiān)聽(tīng)新增和修改的隊(duì)列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * 監(jiān)聽(tīng)刪除的隊(duì)列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * 刪除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

在hotel-demo中,定義配置類(lèi),聲明隊(duì)列、交換機(jī):

package cn.itcast.hotel.config;


import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    // 定義交換機(jī)
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
    }
    // 定義隊(duì)列
    @Bean
    public Queue insertQueue()
    {
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
    }
    @Bean
    public Queue deleteQueue()
    {
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
    }

    // 定義綁定關(guān)系
    @Bean
    public Binding insertQueueBinding()
    {
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }
    @Bean
    public Binding deleteQueueBinding()
    {
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

3.發(fā)送MQ消息

在hotel-admin中的增、刪、改業(yè)務(wù)中分別發(fā)送MQ消息,具體怎么添加根據(jù):

    // 新增
    @PostMapping
    public void saveHotel(){
        //數(shù)據(jù)庫(kù)新增操作
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    }

    // 更新
    @PutMapping()
    public void updateById(){
            //數(shù)據(jù)庫(kù)修改操作
            rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());

    }
    // 刪除
    @DeleteMapping("/{id}")
    public void deleteById() {
		// 數(shù)據(jù)庫(kù)刪除操作
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);

    }

4.監(jiān)聽(tīng)MQ消息

接收MQ消息

hotel-demo接收到MQ消息要做的事情包括:

  • 新增消息:根據(jù)傳遞的hotel的id查詢(xún)hotel信息,然后新增一條數(shù)據(jù)到索引庫(kù)
  • 刪除消息:根據(jù)傳遞的hotel的id刪除索引庫(kù)中的一條數(shù)據(jù)

1)首先在hotel-demo的cn.itcast.hotel.service包下的IHotelService中新增新增、刪除業(yè)務(wù)

void deleteById(Long id);

void insertById(Long id);

2)給hotel-demo中的cn.itcast.hotel.service.impl包下的HotelService中實(shí)現(xiàn)業(yè)務(wù):文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-707020.html

@Override
public void deleteById(Long id) {
    try {
        // 1.準(zhǔn)備Request
        DeleteRequest request = new DeleteRequest("hotel", id.toString());
        // 2.發(fā)送請(qǐng)求
        client.delete(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

@Override
public void insertById(Long id) {
    try {
        // 0.根據(jù)id查詢(xún)酒店數(shù)據(jù)
        Hotel hotel = getById(id);
        // 轉(zhuǎn)換為文檔類(lèi)型
        HotelDoc hotelDoc = new HotelDoc(hotel);

        // 1.準(zhǔn)備Request對(duì)象
        IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
        // 2.準(zhǔn)備Json文檔
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
        // 3.發(fā)送請(qǐng)求
        client.index(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

編寫(xiě)監(jiān)聽(tīng)類(lèi)

package cn.itcast.hotel.mq;

import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HotelListener {
    // 專(zhuān)門(mén)用于消息監(jiān)聽(tīng)的類(lèi)

    @Autowired
    private IHotelService hotelService;

    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id)
    {
        hotelService.insertById(id);
    }

    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id)
    {
        hotelService.deleteById(id);
    }
}

到了這里,關(guān)于利用MQ實(shí)現(xiàn)mysql與elasticsearch數(shù)據(jù)同步的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀(guān)點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • elasticsearch實(shí)現(xiàn)mysql數(shù)據(jù)同步

    當(dāng)酒店數(shù)據(jù)發(fā)生增、刪、改時(shí),要求對(duì)elasticsearch中數(shù)據(jù)也要完成相同操作。 常見(jiàn)的數(shù)據(jù)同步方案有三種: 同步調(diào)用 異步通知 監(jiān)聽(tīng)binlog 以下使用異步通知同步elasticsearch的數(shù)據(jù)? constatnts 包下新建一個(gè)類(lèi) MqConstants,存儲(chǔ)交換機(jī)和隊(duì)列的名稱(chēng) 發(fā)送MQ消息 ? 在增、刪、改業(yè)務(wù)中分

    2024年02月09日
    瀏覽(12)
  • elasticsearch 實(shí)現(xiàn)與mysql 數(shù)據(jù)同步

    elasticsearch 實(shí)現(xiàn)與mysql 數(shù)據(jù)同步

    mysql8相關(guān)的安裝可以看下另一篇博客 https://editor.csdn.net/md/?articleId=135905811 1.下載安裝logstash 2.logstash 配置 logstash.yml 3.pipelines.yml 配置 同步方式: 1.logstash 2.go-mysql-elasticsearch 3.canal(阿里云) 一.logstash 1.安裝mysql-connector-java 插件(需與mysql 版本一致) 2.配置logstash.conf 3.啟動(dòng)logsta

    2024年04月12日
    瀏覽(19)
  • DataX實(shí)現(xiàn)Mysql與ElasticSearch(ES)數(shù)據(jù)同步

    DataX實(shí)現(xiàn)Mysql與ElasticSearch(ES)數(shù)據(jù)同步

    jdk1.8及以上 python2 查看是否安裝成功 查看python版本號(hào),判斷是否安裝成功 在datax/job下,json格式,具體內(nèi)容及主要配置含義如下 mysqlreader為讀取mysql數(shù)據(jù)部分,配置mysql相關(guān)信息 username,password為數(shù)據(jù)庫(kù)賬號(hào)密碼 querySql:需要查詢(xún)數(shù)據(jù)的sql,也可通過(guò)colums指定需要查找的字段(

    2024年02月05日
    瀏覽(22)
  • 本地部署Canal筆記-實(shí)現(xiàn)MySQL與ElasticSearch7數(shù)據(jù)同步

    本地部署Canal筆記-實(shí)現(xiàn)MySQL與ElasticSearch7數(shù)據(jù)同步

    本地搭建canal實(shí)現(xiàn)mysql數(shù)據(jù)到es的簡(jiǎn)單的數(shù)據(jù)同步,僅供學(xué)習(xí)參考 建議首先熟悉一下canal同步方式:https://github.com/alibaba/canal/wiki 本地搭建MySQL數(shù)據(jù)庫(kù) 本地搭建ElasticSearch 本地搭建canal-server 本地搭建canal-adapter 本地環(huán)境為window11,大部分組件采用docker進(jìn)行部署,MySQL采用8.0.27, 推薦

    2024年02月02日
    瀏覽(96)
  • 基于Canal實(shí)現(xiàn)Mysql數(shù)據(jù)實(shí)時(shí)同步到Elasticsearch(Docker版)

    基于Canal實(shí)現(xiàn)Mysql數(shù)據(jù)實(shí)時(shí)同步到Elasticsearch(Docker版)

    1、Canal簡(jiǎn)介 ??Canal主要用途是對(duì)MySQL數(shù)據(jù)庫(kù)增量日志進(jìn)行解析,提供增量數(shù)據(jù)的訂閱和消費(fèi),簡(jiǎn)單說(shuō)就是可以對(duì)MySQL的增量數(shù)據(jù)進(jìn)行實(shí)時(shí)同步,支持同步到MySQL、Elasticsearch、HBase等數(shù)據(jù)存儲(chǔ)中去。 ??Canal會(huì)模擬MySQL主庫(kù)和從庫(kù)的交互協(xié)議,從而偽裝成MySQL的從庫(kù),然后向My

    2024年02月10日
    瀏覽(92)
  • 一種Mysql和Mongodb數(shù)據(jù)同步到Elasticsearch的實(shí)現(xiàn)辦法和系統(tǒng)

    一種Mysql和Mongodb數(shù)據(jù)同步到Elasticsearch的實(shí)現(xiàn)辦法和系統(tǒng)

    本文分享自天翼云開(kāi)發(fā)者社區(qū)《一種Mysql和Mongodb數(shù)據(jù)同步到Elasticsearch的實(shí)現(xiàn)辦法和系統(tǒng)》,作者:l****n 核心流程如下: ? 核心邏輯說(shuō)明: MySQL Binlog解析 : 首先,從MySQL的二進(jìn)制日志(Binlog)中解析出表名。這一步驟非常關(guān)鍵,因?yàn)槲覀冎魂P(guān)注特定表的數(shù)據(jù)變更。 進(jìn)一步,我

    2024年02月05日
    瀏覽(16)
  • DolphinScheduler 調(diào)度 DataX 實(shí)現(xiàn) MySQL To ElasticSearch 增量數(shù)據(jù)同步實(shí)踐

    DolphinScheduler 調(diào)度 DataX 實(shí)現(xiàn) MySQL To ElasticSearch 增量數(shù)據(jù)同步實(shí)踐

    基于SQL查詢(xún)的 CDC(Change Data Capture): 離線(xiàn)調(diào)度查詢(xún)作業(yè),批處理。把一張表同步到其他系統(tǒng),每次通過(guò)查詢(xún)?nèi)カ@取表中最新的數(shù)據(jù)。也就是我們說(shuō)的基于SQL查詢(xún)抽取; 無(wú)法保障數(shù)據(jù)一致性,查的過(guò)程中有可能數(shù)據(jù)已經(jīng)發(fā)生了多次變更; 不保障實(shí)時(shí)性,基于離線(xiàn)調(diào)度存在天然的

    2024年02月03日
    瀏覽(24)
  • mysql數(shù)據(jù)利用pipe同步至redis

    所有數(shù)據(jù)表,一條記錄一個(gè)空間 ps:*4 #表示有4個(gè)參數(shù)、$4 #表示“參數(shù)”有4個(gè)字節(jié)、 所有數(shù)據(jù)表,一個(gè)表一個(gè)空間 ps:這里發(fā)現(xiàn)當(dāng)存在空值時(shí),CONCAT結(jié)果為空,所以對(duì)于空數(shù)據(jù)需要做處理 相關(guān)邏輯優(yōu)化 數(shù)據(jù)入庫(kù),如果作為鍵的字段非主鍵,即作為鍵的字段,可能存在相同的

    2024年02月15日
    瀏覽(15)
  • 數(shù)據(jù)同步MySQL -> Elasticsearch

    數(shù)據(jù)同步MySQL -> Elasticsearch

    大家好我是蘇麟,今天聊聊數(shù)據(jù)同步 . 一般情況下,如果做查詢(xún)搜索功能,使用 ES 來(lái)模糊搜索,但是數(shù)據(jù)是存放在數(shù)據(jù)庫(kù) MySQL 里的,所以說(shuō)我們需要把 MySQL 中的數(shù)據(jù)和 ES 進(jìn)行同步,保證數(shù)據(jù)一致(以 MySQL 為主)。 MySQL =ES(單向) 同步方式 首次安裝完 ES,把 MySQL 數(shù)據(jù)全量同步到

    2024年03月21日
    瀏覽(16)
  • 利用Canal把MySQL數(shù)據(jù)同步到ES

    Canal是阿里巴巴開(kāi)源的一個(gè)數(shù)據(jù)庫(kù)變更數(shù)據(jù)同步工具,主要用于 MySQL 數(shù)據(jù)庫(kù)的增量數(shù)據(jù)到下游的同步,例如同步到 Elasticsearch、HBase、Hive 等。下面是一個(gè)基本的步驟來(lái)導(dǎo)入 MySQL 數(shù)據(jù)庫(kù)到 Elasticsearch。 安裝和配置 Canal 首先,需要在你的機(jī)器上安裝并配置Canal。具體步驟可在 C

    2024年02月16日
    瀏覽(28)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包