国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦

這篇具有很好參考價(jià)值的文章主要介紹了Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

前言

本篇博客是一篇elasticsearch的使用案例,包括結(jié)合MybatisPlus使用ES,如何保證MySQL和es的數(shù)據(jù)一致性,另外使用了RabbitMQ進(jìn)行解耦,自定義了發(fā)消息的方法。

其他相關(guān)的Elasticsearch的文章列表如下:

  • Elasticsearch的Docker版本的安裝和參數(shù)設(shè)置 & 端口開(kāi)放和瀏覽器訪(fǎng)問(wèn)

  • Elasticsearch的可視化Kibana工具安裝 & IK分詞器的安裝和使用

  • Elasticsearch的springboot整合 & Kibana進(jìn)行全查詢(xún)和模糊查詢(xún)

引出


1.elasticsearch的使用案例,包括結(jié)合MybatisPlus使用ES;
2.如何保證MySQL和es的數(shù)據(jù)一致性;
3.使用了RabbitMQ進(jìn)行解耦,自定義了發(fā)消息的方法。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-719303.html

結(jié)合MybatisPlus使用ES

1.引入依賴(lài)

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>

        <!--mysql驅(qū)動(dòng)-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--        druid-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
        </dependency>

        <!--  springboot 整合mybaits plus   -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2.進(jìn)行配置

package com.tianju.es.config;


import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;

/**
 * 你也可以不繼承 AbstractElasticsearchConfiguration 類(lèi),而將 ESConfig 寫(xiě)成一般的配置類(lèi)的型式。
 * 不過(guò)繼承 AbstractElasticsearchConfiguration 好處在于,它已經(jīng)幫我們配置好了elasticsearchTemplate 直接使用。
 */
@Configuration
public class ESConfig extends AbstractElasticsearchConfiguration {
    @Override
    public RestHighLevelClient elasticsearchClient() {
        ClientConfiguration clientConfiguration =
                ClientConfiguration.builder()
                        .connectedTo("192.168.111.130:9200")
                        .build();
        return RestClients.create(clientConfiguration).rest();
    }
}

3.實(shí)體類(lèi)上加入注解

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.es.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.math.BigDecimal;

/**
 * 產(chǎn)品,包括庫(kù)存,價(jià)格信息
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("finance_sku")
@Document(indexName = "finance_sku")
public class FinanceSkuES {
    @TableId(value = "ID",type = IdType.AUTO)
    private Long id;
    @TableField("finance_sku_describe")
    @Field(index = true,analyzer = "ik_smart",
            searchAnalyzer = "ik_smart",type = FieldType.Text)
    private String detail; // 詳情
    @TableField("finance_sku_price")
    private BigDecimal price;
    @TableField("finance_sku_stock")
    private Long stock;
    @TableField("finance_state")
    private Integer status;
}

參數(shù)解釋

@Document(indexName = "books", shards = 1, replicas = 0)
@Data
public class Book {
    @Id
    @Field(type = FieldType.Integer)
    private Integer id;
    
    @Field(type = FieldType.Keyword)
    private String title;
    
    @Field(type = FieldType.Text)
    private String press;
    
    @Field(type = FieldType.Keyword)
    private String author;
    
    @Field(type = FieldType.Keyword,index=false)
    private BigDecimal price;
    
    @Field(type = FieldType.Text)
    private String description;
}
  • @Document :注解會(huì)對(duì)實(shí)體中的所有屬性建立索引;
    indexName = “books” :表示創(chuàng)建一個(gè)名稱(chēng)為 “books” 的索引;
    shards = 1 : 表示只使用一個(gè)分片;
    replicas = 0 : 表示不使用復(fù)制備份;
    index = false: 不能索引查詢(xún)
  • @Field(type = FieldType.Keyword) : 用以指定字段的數(shù)據(jù)類(lèi)型。

4.創(chuàng)建操作的 Repository

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

從它的祖先們那里繼承了大量的現(xiàn)成的方法,除此之外,它還可以按 spring data 的規(guī)則定義特定的方法。

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.es.mapper;

import com.tianju.es.entity.FinanceSkuES;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;

/**
 * 操作es,類(lèi)似于之前的mapper
 */
@Repository
public interface SkuESMapper extends ElasticsearchRepository<FinanceSkuES, Long> {

    /**
     * 根據(jù)關(guān)鍵字進(jìn)行 分詞 分頁(yè)查詢(xún) sku數(shù)據(jù)
     * @param detail 查詢(xún)條件
     * @param pageable 分頁(yè)
     * @return
     */
    Page<FinanceSkuES> findFinanceSkuESByDetail(String detail, Pageable pageable);

    /**
     * 根據(jù)id進(jìn)行刪除
     * @param id
     */
    void removeFinanceSkuESById(Long id);

}

5.初始化es中的數(shù)據(jù)

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

運(yùn)行的后臺(tái)信息

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

查看es頁(yè)面的信息,index management

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

6.進(jìn)行全查詢(xún)以及分頁(yè)

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

進(jìn)行全查詢(xún)

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

{
  "content": [
    {
      "id": 1,
      "detail": "HUAWEI MateBook X Pro 2023 微絨典藏版 13代酷睿i7 32GB 2TB 14.2英寸3.1K原色全面屏 墨藍(lán)",
      "price": 13999.0,
      "stock": 50,
      "status": 1
    },
    {
      "id": 2,
      "detail": "HUAWEI Mate 60 Pro+ 16GB+1TB 宣白",
      "price": 9999.0,
      "stock": 60,
      "status": 1
    },
    {
      "id": 3,
      "detail": "iPhone 15 Pro Max 超視網(wǎng)膜 XDR 顯示屏",
      "price": 9299.0,
      "stock": 46,
      "status": 1
    },
    {
      "id": 4,
      "detail": "MacBook Air Apple M2 芯片 8 核中央處理器 8 核圖形處理器 8GB 統(tǒng)一內(nèi)存 256GB 固態(tài)硬盤(pán)",
      "price": 8999.0,
      "stock": 60,
      "status": 1
    }
  ],
  "pageable": {
    "sort": {
      "empty": true,
      "sorted": false,
      "unsorted": true
    },
    "offset": 0,
    "pageSize": 4,
    "pageNumber": 0,
    "paged": true,
    "unpaged": false
  },
  "totalElements": 4,
  "last": true,
  "totalPages": 1,
  "number": 0,
  "size": 4,
  "sort": {
    "empty": true,
    "sorted": false,
    "unsorted": true
  },
  "numberOfElements": 4,
  "first": true,
  "empty": false
}

帶條件分頁(yè)查詢(xún)

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

注意分頁(yè)查詢(xún)的page從0開(kāi)始,嘗試發(fā)現(xiàn)需要輸入分詞器分詞后最小單元,比如hu不是最小單元,而HUAWEI是

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

分詞器進(jìn)行分詞的結(jié)果

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

es和mysql的數(shù)據(jù)一致性

延遲雙刪

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

    @Override
    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {
        // 把es看做是緩存,如何保證es 和 mysql的 數(shù)據(jù)一致性?
        // 延遲雙刪的模式
        // 1.先刪除緩存 es
        skuESMapper.deleteAll();
        // 2.更新數(shù)據(jù)庫(kù) mysql
        updateById(financeSkuES);
        // 3.延時(shí)操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        // 4.再次刪除緩存 es
        skuESMapper.deleteAll();

        // 5.最后更新緩存 es
        skuESMapper.saveAll(list());
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: "+byId);
        return byId.get();
    }

上面代碼有不妥的地方,我這里是修改,結(jié)果一開(kāi)始直接從es中全部刪除,應(yīng)該是根據(jù)id把修改的數(shù)據(jù)刪除,然后把修改好的數(shù)據(jù)set進(jìn)es里面

加鎖的方式

感覺(jué)好像沒(méi)什么用的樣子,就是用了一下加鎖

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

用rabbitmq進(jìn)行解耦

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

配置yml文件

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

spring:
  main:
    allow-circular-references: true
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    ### 本地的數(shù)據(jù)庫(kù)
    url: jdbc:mysql://127.0.0.1:3306/consumer_finance_product?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&allowMultiQueries=true
    username: root
    password: 123

  # redis的相關(guān)配置
  redis:
    host: 119.3.162.127
    port: 6379
    database: 0
    password: Pet3927

  # rabbitmq相關(guān)
  rabbitmq:
    host: 192.168.111.130
    port: 5672
    username: admin
    password: 123
    virtual-host: /test

    # 生產(chǎn)者保證消息可靠性
    publisher-returns: true
    publisher-confirm-type: correlated

    # 設(shè)置手動(dòng)確認(rèn)
    listener:
      simple:
        acknowledge-mode: manual

rabbitmq的配置類(lèi)

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

將Java對(duì)象轉(zhuǎn)換成json字符串傳輸

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.es.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    public static final String ES_EXCHANGE = "es_exchange";
    public static final String ES_QUEUE = "es_queue";
    public static final String ES_KEY = "es_key";

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(ES_EXCHANGE);
    }

    @Bean
    public Queue esQueue(){
        return new Queue(ES_QUEUE);
    }

    @Bean
    public Binding esQueueToDirectExchange(){
        return BindingBuilder.bind(esQueue())
                .to(directExchange())
                .with(ES_KEY);
    }

    /**
     * 將對(duì)象轉(zhuǎn)換為json字符串
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());// 修改轉(zhuǎn)換器
        return rabbitTemplate;
    }

}

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

callback回調(diào)方法

package com.tianju.es.rabbit;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * 生產(chǎn)者消息可靠性
 */
// RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
@Configuration
@Slf4j
public class CallbackConfig
        implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 初始化
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setMandatory(true);
    }

    /**
     * 不管成功或者失敗都會(huì)執(zhí)行
     * @param correlationData correlation對(duì)象需要在 發(fā)送消息時(shí)候 給
     * @param ack true表示成功,false表示發(fā)送失敗
     * @param cause 如果失敗的話(huà),會(huì)寫(xiě)失敗原因;如果成功,返回為null
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.debug("ack是否成功:"+ack);
        log.debug("cause信息:"+cause);

        if (correlationData!=null){
            JSONObject jsonObject = JSON.parseObject(correlationData.getReturnedMessage().getBody());
            String exchange = correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange();
            String routingKey = correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey();
            log.debug("消息體:"+jsonObject);
            log.debug("交換機(jī):"+exchange);
            log.debug("路由key:"+routingKey);
        }


        if (ack){
            return;
        }

        // 失敗了

        // 1、重試重試上限次數(shù)(默認(rèn)值5)每重試一次時(shí)間間隔會(huì)增加
        // 2、把消息、交換機(jī)名稱(chēng)、路由鍵等相關(guān)的消息保存到數(shù)據(jù)庫(kù),有一個(gè)程序定時(shí)掃描相關(guān)的消息,然后重新發(fā)送消息。
        // 重發(fā)上限次數(shù)(默認(rèn)值5)超過(guò)閾值會(huì)轉(zhuǎn)人工處理

        // 2、把消息體、交換機(jī)名稱(chēng)、路由鍵等相關(guān)的消息保存到數(shù)據(jù)庫(kù),有一個(gè)程序定時(shí)掃描相關(guān)的消息,然后重新發(fā)送消息。
        // 重發(fā)上限次數(shù)(默認(rèn)值5)超過(guò)閾值會(huì)轉(zhuǎn)人工處理
        // 2.1需要把相關(guān)的信息存放到數(shù)據(jù)中,表字段:消息體、交換機(jī)名稱(chēng)、路由鍵、狀態(tài)、次數(shù)
        // 2.2定時(shí)任務(wù)(單體:spring定時(shí)任務(wù)  分布式:XxL-job),發(fā)送消息




    }


    /**
     * 只有失敗了才會(huì)執(zhí)行
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 2、把消息、交換機(jī)名稱(chēng)、路由鍵等相關(guān)的消息保存到數(shù)據(jù)庫(kù),有一個(gè)程序定時(shí)掃描相關(guān)的消息,然后重新發(fā)送消息。

    }
}

自定義發(fā)消息工具類(lèi)

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.common.util;


import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;


@Slf4j
public class RabbitUtil {

    /**
     * 延遲隊(duì)列,發(fā)送消息,到達(dá)時(shí)間后進(jìn)入死信隊(duì)列中
     * @param rabbitTemplate 調(diào)用的rabbitTemplate
     * @param redisTemplate 用來(lái)在redis里面存token
     * @param msg 發(fā)送的消息
     * @param token 發(fā)送的token,用于保證冪等性
     * @param ttl 如果是延遲消費(fèi),則消息的過(guò)期時(shí)間,到達(dá)改時(shí)間后進(jìn)入死信交換機(jī),到死信隊(duì)列中
     * @param exchange 交換機(jī)名字
     * @param routingKey 路由鍵名字
     * @param <T> 發(fā)送消息的實(shí)體類(lèi)
     */
    public static  <T> void sendMsg(RabbitTemplate rabbitTemplate,
                                    StringRedisTemplate redisTemplate,
                                    T msg,String token,Integer ttl,
                                    String exchange,String routingKey) {

        log.debug("給交換機(jī)[{}]通過(guò)路由鍵[{}]發(fā)送消息 {},token為{}",exchange,routingKey,msg,token);

        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                redisTemplate.opsForValue().set(token, token,5*60000);
                message.getMessageProperties().setMessageId(token);
                if (ttl!=null){
                    message.getMessageProperties().setExpiration(ttl.toString());
                }
                return message;
            }
        };

        CorrelationData correlationData = new CorrelationData();
        // 消息體
        Message message = new Message(JSON.toJSONBytes(msg));
        // 交換機(jī)名稱(chēng)
        message.getMessageProperties().setReceivedExchange(exchange);
        // 路由鍵
        message.getMessageProperties().setReceivedRoutingKey(routingKey);
        correlationData.setReturnedMessage(message);

        // 發(fā)送MQ消息
        rabbitTemplate.convertAndSend(exchange, // 發(fā)給交換機(jī)
                routingKey, // 根據(jù)這個(gè)routingKey就會(huì)給到TTL隊(duì)列,到時(shí)間成死信,發(fā)給死信交換機(jī),到死信隊(duì)列
                msg,
                messagePostProcessor,
                correlationData
        );
    }
}

進(jìn)行消息的發(fā)送

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

接口

package com.tianju.es.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.tianju.es.entity.FinanceSkuES;

public interface SkuService extends IService<FinanceSkuES> {

    /**
     * 延遲雙刪的方式,保證es 緩存 和 mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)一致性
     * @param financeSkuES 修改的數(shù)據(jù)
     * @return
     */
    FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES);

    /**
     * 加鎖的方式,不過(guò)感覺(jué)沒(méi)啥用的樣子
     * @param financeSkuES
     * @return
     */
    FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES);

    /**
     * 通過(guò)rabbitmq進(jìn)行解耦
     * @param financeSkuES
     * @return
     */
    String updateByIdRabbitMQ(FinanceSkuES financeSkuES);
}

實(shí)現(xiàn)類(lèi)

package com.tianju.es.service.impl;

import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tianju.common.util.RabbitUtil;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import com.tianju.es.mapper.SkuMybatisPlusMapper;
import com.tianju.es.rabbit.RabbitConfig;
import com.tianju.es.service.SkuService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Collection;
import java.util.Optional;
import java.util.UUID;

@Service
public class SkuServiceImpl extends ServiceImpl<SkuMybatisPlusMapper,FinanceSkuES>
        implements SkuService {

    @Autowired
    private SkuESMapper skuESMapper;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {
        // 把es看做是緩存,如何保證es 和 mysql的 數(shù)據(jù)一致性?
        // 延遲雙刪的模式
        // 1.先刪除緩存 es
        skuESMapper.deleteAll();
        // 2.更新數(shù)據(jù)庫(kù) mysql
        updateById(financeSkuES);
        // 3.延時(shí)操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        // 4.再次刪除緩存 es
        skuESMapper.deleteAll();

        // 5.最后更新緩存 es
        skuESMapper.saveAll(list());
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: "+byId);
        return byId.get();
    }

    @Override
    public FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) {
        // 第二種方式加鎖
        String uuid = UUID.randomUUID().toString();
        // 相當(dāng)于setnx指令
        Boolean skuLock = stringRedisTemplate.opsForValue().setIfAbsent("skuLock", uuid);
        try {
            if (skuLock){ // 搶到了鎖
                skuESMapper.deleteAll();
                updateById(financeSkuES);
            }
        }finally {
            if (uuid.equals(stringRedisTemplate.opsForValue().get("skuLock"))){
                stringRedisTemplate.delete("skuLock");
            }
        }
        skuESMapper.saveAll(list());
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: "+byId);
        return byId.get();
    }

    @Override
    public String updateByIdRabbitMQ(FinanceSkuES financeSkuES) {
        // 采用rabbitmq進(jìn)行解耦

        updateById(financeSkuES);
        FinanceSkuES skuES = getById(financeSkuES.getId());
        String uuid = IdUtil.fastUUID();

        RabbitUtil.sendMsg(
                rabbitTemplate,stringRedisTemplate,skuES,uuid,null,
                RabbitConfig.ES_EXCHANGE,RabbitConfig.ES_KEY
        );

        return "已經(jīng)發(fā)送消息:"+skuES;
    }
}

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

接收到消息,更新es

接收到消息進(jìn)行es的更新,把原來(lái)的刪除,把最新的set進(jìn)去

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.es.rabbit;

import com.rabbitmq.client.Channel;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class ESListener {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private SkuESMapper skuESMapper;

    @RabbitListener(queues = RabbitConfig.ES_QUEUE)
    public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) {
        String messageId = message.getMessageProperties().getMessageId();
        log.debug("進(jìn)行業(yè)務(wù)----> 監(jiān)聽(tīng)到隊(duì)列{}的消息,messageId為{}",financeSkuES,messageId);

        try {
            // 冪等性
            if (redisTemplate.delete(messageId)){
                // 根據(jù)id刪除原有的 es 數(shù)據(jù)
                // 然后把新的數(shù)據(jù)set進(jìn)來(lái)
                log.debug("處理es的業(yè)務(wù),刪除原有的,替換最新的");
                skuESMapper.removeFinanceSkuESById(financeSkuES.getId());
                skuESMapper.save(financeSkuES);
            }

            // 手動(dòng)簽收消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }catch (Exception e){
            // 冪等性
            redisTemplate.opsForValue().set(messageId,messageId,5*60000);

            // 1、重試重試上限次數(shù)(默認(rèn)值5) 每重試一次時(shí)間間隔會(huì)增加
            // 2、把消息、交換機(jī)名稱(chēng)、路由鍵等相關(guān)的消息保存到數(shù)據(jù)庫(kù),有一個(gè)程序定時(shí)掃描相關(guān)的消息,然后重新發(fā)送消息。
            // 重發(fā)上限次數(shù)(默認(rèn)值5)超過(guò)閾值會(huì)轉(zhuǎn)人工處理

            // 已知的消息,交換機(jī),路由器,消息 message.getBody()  消息發(fā)送給的是監(jiān)聽(tīng)的隊(duì)列

            try {
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }
    }
}

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

后臺(tái)打印的日志

Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq


總結(jié)

1.elasticsearch的使用案例,包括結(jié)合MybatisPlus使用ES;
2.如何保證MySQL和es的數(shù)據(jù)一致性;
3.使用了RabbitMQ進(jìn)行解耦,自定義了發(fā)消息的方法。

到了這里,關(guān)于Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 使用Logstash同步mysql數(shù)據(jù)到Elasticsearch(親自踩坑)_將mysql中的數(shù)據(jù)導(dǎo)入es搜索引擎利用logstash(1)

    使用Logstash同步mysql數(shù)據(jù)到Elasticsearch(親自踩坑)_將mysql中的數(shù)據(jù)導(dǎo)入es搜索引擎利用logstash(1)

    先自我介紹一下,小編浙江大學(xué)畢業(yè),去過(guò)華為、字節(jié)跳動(dòng)等大廠,目前阿里P7 深知大多數(shù)程序員,想要提升技能,往往是自己摸索成長(zhǎng),但自己不成體系的自學(xué)效果低效又漫長(zhǎng),而且極易碰到天花板技術(shù)停滯不前! 因此收集整理了一份《2024年最新大數(shù)據(jù)全套學(xué)習(xí)資料》,

    2024年04月28日
    瀏覽(27)
  • 淘寶太細(xì)了:mysql 和 es 的5個(gè)一致性方案,你知道嗎?

    淘寶太細(xì)了:mysql 和 es 的5個(gè)一致性方案,你知道嗎?

    在40歲老架構(gòu)師 尼恩的 讀者交流群 (50+)中,最近有小伙伴拿到了一線(xiàn)互聯(lián)網(wǎng)企業(yè)如拼多多、極兔、有贊、希音的面試資格,遇到一幾個(gè)很重要的面試題: 說(shuō)5種mysql 和 elasticsearch 數(shù)據(jù)一致性方案 與之類(lèi)似的、其他小伙伴遇到過(guò)的問(wèn)題還有: Mysql 和 ES 數(shù)據(jù)一致性問(wèn)題及方案?

    2024年02月13日
    瀏覽(16)
  • DataX實(shí)現(xiàn)Mysql與ElasticSearch(ES)數(shù)據(jù)同步

    DataX實(shí)現(xiàn)Mysql與ElasticSearch(ES)數(shù)據(jù)同步

    jdk1.8及以上 python2 查看是否安裝成功 查看python版本號(hào),判斷是否安裝成功 在datax/job下,json格式,具體內(nèi)容及主要配置含義如下 mysqlreader為讀取mysql數(shù)據(jù)部分,配置mysql相關(guān)信息 username,password為數(shù)據(jù)庫(kù)賬號(hào)密碼 querySql:需要查詢(xún)數(shù)據(jù)的sql,也可通過(guò)colums指定需要查找的字段(

    2024年02月05日
    瀏覽(24)
  • 如何保證ES和數(shù)據(jù)庫(kù)的數(shù)據(jù)一致性?

    如何保證ES和數(shù)據(jù)庫(kù)的數(shù)據(jù)一致性?

    在業(yè)務(wù)中,我們通常需要把數(shù)據(jù)庫(kù)中的數(shù)據(jù)變更同步到ES中,那么如何保證數(shù)據(jù)庫(kù)和ES的一致性呢?通常有以下幾種做法: 雙寫(xiě) 在代碼中,對(duì)數(shù)據(jù)庫(kù)和ES進(jìn)行雙寫(xiě),并且先操作本地?cái)?shù)據(jù)庫(kù),后操作ES,而且還需要把兩個(gè)操作放到一個(gè)事務(wù)中: ?在以上邏輯中,如果寫(xiě)數(shù)據(jù)庫(kù)成功

    2024年04月28日
    瀏覽(23)
  • 【ElasticSearch】ES與MySQL數(shù)據(jù)同步方案及Java實(shí)現(xiàn)

    【ElasticSearch】ES與MySQL數(shù)據(jù)同步方案及Java實(shí)現(xiàn)

    elasticsearch中的酒店數(shù)據(jù)來(lái)自于mysql數(shù)據(jù)庫(kù),當(dāng)mysql中的數(shù)據(jù)發(fā)生改變時(shí),es中的數(shù)據(jù)也要跟著改變,即es與mysql之間的數(shù)據(jù)同步。 操作mysql的微服務(wù)hotel-admin不能直接更新es的索引庫(kù),那就由操作es索引庫(kù)的微服務(wù)hotel-demo來(lái)暴露一個(gè)更新索引庫(kù)的接口給hotel-admin調(diào)用 同步調(diào)用方式

    2024年02月15日
    瀏覽(26)
  • SpringCloud分布式搜索引擎、數(shù)據(jù)聚合、ES和MQ的結(jié)合使用、ES集群的問(wèn)題

    SpringCloud分布式搜索引擎、數(shù)據(jù)聚合、ES和MQ的結(jié)合使用、ES集群的問(wèn)題

    目錄 數(shù)據(jù)聚合 聚合的分類(lèi) ?編輯?DSL實(shí)現(xiàn)Bucket聚合 ?編輯 ?DSL實(shí)現(xiàn)Metrics聚合?編輯 RestAPI實(shí)現(xiàn)聚合 ?對(duì)接前端接口?編輯 ?自定義分詞器?編輯 Completion suggester查詢(xún) Completion suggester查詢(xún) 酒店數(shù)據(jù)自動(dòng)補(bǔ)全 實(shí)現(xiàn)酒店搜索框界面輸入框的自動(dòng)補(bǔ)全 ?數(shù)據(jù)同步問(wèn)題分析?編輯 同

    2024年02月16日
    瀏覽(50)
  • JAVA面試題分享二百五十五:mysql 和 es 的5個(gè)一致性方案,你知道嗎?

    JAVA面試題分享二百五十五:mysql 和 es 的5個(gè)一致性方案,你知道嗎?

    目錄 問(wèn)題場(chǎng)景分析 方案一:同步雙寫(xiě) 方案二:異步雙寫(xiě) 方案2.1 使用內(nèi)存隊(duì)列(如阻塞隊(duì)列)異步 方案2.2 使用消息隊(duì)列(如阻塞隊(duì)列)異步 方案三:定期同步 方案四:數(shù)據(jù)訂閱 方案五:etl 工具 咱們的生產(chǎn)需求上,為了便于商品的聚合搜索,高速搜索,采用兩大優(yōu)化方案

    2024年02月04日
    瀏覽(17)
  • Elasticsearch實(shí)戰(zhàn)(二十三)---ES數(shù)據(jù)建模與Mysql對(duì)比 一對(duì)多模型

    Elasticsearch實(shí)戰(zhàn)(二十三)---ES數(shù)據(jù)建模與Mysql對(duì)比 一對(duì)多模型

    我們?nèi)绾伟袽ysql的模型合理的在ES中去實(shí)現(xiàn)? 就需要你對(duì)要存儲(chǔ)的數(shù)據(jù)足夠的了解,及對(duì)應(yīng)用場(chǎng)景足夠的深入分析,才能建立一個(gè)合適的模型,便于你后期擴(kuò)展 一對(duì)一 模型 一對(duì)多 模型 多對(duì)多 模型 1.一對(duì)多 模型 我們現(xiàn)在有兩個(gè)模型, 一個(gè)商品Product, 一個(gè)分類(lèi)Category , 我們對(duì)比下一

    2024年02月08日
    瀏覽(25)
  • Elasticsearch實(shí)戰(zhàn)(二十二)---ES數(shù)據(jù)建模與Mysql對(duì)比 一對(duì)一模型

    我們?nèi)绾伟袽ysql的模型合理的在ES中去實(shí)現(xiàn)? 就需要你對(duì)要存儲(chǔ)的數(shù)據(jù)足夠的了解,及對(duì)應(yīng)用場(chǎng)景足夠的深入分析,才能建立一個(gè)合適的模型,便于你后期擴(kuò)展 實(shí)體之間的關(guān)系: 一對(duì)一 模型 一對(duì)一(1:1):一個(gè)實(shí)體最多只能能另一個(gè)實(shí)體相關(guān)聯(lián),另一個(gè)實(shí)體如是。 例:一個(gè)只能

    2024年02月10日
    瀏覽(18)
  • 【ES數(shù)據(jù)庫(kù)】Elasticsearch安裝使用

    【ES數(shù)據(jù)庫(kù)】Elasticsearch安裝使用

    Elasticsearch 和 MongoDB/Redis 類(lèi)似,是非關(guān)系型數(shù)據(jù)庫(kù),從索引文檔到文檔能被搜索到只有一個(gè)輕微的延遲,是采用Restful API標(biāo)準(zhǔn)的可擴(kuò)展和高可用的實(shí)時(shí)數(shù)據(jù)分析的全文搜索工具 Elastic Search 的實(shí)現(xiàn)原理是,利用內(nèi)置分詞器(Analyzer)對(duì)數(shù)據(jù)庫(kù)文本進(jìn)行分詞,將解析出的和數(shù)據(jù)

    2024年02月04日
    瀏覽(26)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包