前言
本篇博客是一篇elasticsearch的使用案例,包括結(jié)合MybatisPlus使用ES,如何保證MySQL和es的數(shù)據(jù)一致性,另外使用了RabbitMQ進(jìn)行解耦,自定義了發(fā)消息的方法。
其他相關(guān)的Elasticsearch的文章列表如下:
-
Elasticsearch的Docker版本的安裝和參數(shù)設(shè)置 & 端口開(kāi)放和瀏覽器訪(fǎng)問(wèn)
-
Elasticsearch的可視化Kibana工具安裝 & IK分詞器的安裝和使用
-
Elasticsearch的springboot整合 & Kibana進(jìn)行全查詢(xún)和模糊查詢(xún)
引出
1.elasticsearch的使用案例,包括結(jié)合MybatisPlus使用ES;
2.如何保證MySQL和es的數(shù)據(jù)一致性;
3.使用了RabbitMQ進(jìn)行解耦,自定義了發(fā)消息的方法。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-719303.html
結(jié)合MybatisPlus使用ES
1.引入依賴(lài)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!--mysql驅(qū)動(dòng)-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- druid-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<!-- springboot 整合mybaits plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.進(jìn)行配置
package com.tianju.es.config;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
/**
* 你也可以不繼承 AbstractElasticsearchConfiguration 類(lèi),而將 ESConfig 寫(xiě)成一般的配置類(lèi)的型式。
* 不過(guò)繼承 AbstractElasticsearchConfiguration 好處在于,它已經(jīng)幫我們配置好了elasticsearchTemplate 直接使用。
*/
@Configuration
public class ESConfig extends AbstractElasticsearchConfiguration {
@Override
public RestHighLevelClient elasticsearchClient() {
ClientConfiguration clientConfiguration =
ClientConfiguration.builder()
.connectedTo("192.168.111.130:9200")
.build();
return RestClients.create(clientConfiguration).rest();
}
}
3.實(shí)體類(lèi)上加入注解
package com.tianju.es.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.math.BigDecimal;
/**
* 產(chǎn)品,包括庫(kù)存,價(jià)格信息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("finance_sku")
@Document(indexName = "finance_sku")
public class FinanceSkuES {
@TableId(value = "ID",type = IdType.AUTO)
private Long id;
@TableField("finance_sku_describe")
@Field(index = true,analyzer = "ik_smart",
searchAnalyzer = "ik_smart",type = FieldType.Text)
private String detail; // 詳情
@TableField("finance_sku_price")
private BigDecimal price;
@TableField("finance_sku_stock")
private Long stock;
@TableField("finance_state")
private Integer status;
}
參數(shù)解釋
@Document(indexName = "books", shards = 1, replicas = 0)
@Data
public class Book {
@Id
@Field(type = FieldType.Integer)
private Integer id;
@Field(type = FieldType.Keyword)
private String title;
@Field(type = FieldType.Text)
private String press;
@Field(type = FieldType.Keyword)
private String author;
@Field(type = FieldType.Keyword,index=false)
private BigDecimal price;
@Field(type = FieldType.Text)
private String description;
}
- @Document :注解會(huì)對(duì)實(shí)體中的所有屬性建立索引;
indexName = “books” :表示創(chuàng)建一個(gè)名稱(chēng)為 “books” 的索引;
shards = 1 : 表示只使用一個(gè)分片;
replicas = 0 : 表示不使用復(fù)制備份;
index = false: 不能索引查詢(xún) - @Field(type = FieldType.Keyword) : 用以指定字段的數(shù)據(jù)類(lèi)型。
4.創(chuàng)建操作的 Repository
從它的祖先們那里繼承了大量的現(xiàn)成的方法,除此之外,它還可以按 spring data 的規(guī)則定義特定的方法。
package com.tianju.es.mapper;
import com.tianju.es.entity.FinanceSkuES;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
/**
* 操作es,類(lèi)似于之前的mapper
*/
@Repository
public interface SkuESMapper extends ElasticsearchRepository<FinanceSkuES, Long> {
/**
* 根據(jù)關(guān)鍵字進(jìn)行 分詞 分頁(yè)查詢(xún) sku數(shù)據(jù)
* @param detail 查詢(xún)條件
* @param pageable 分頁(yè)
* @return
*/
Page<FinanceSkuES> findFinanceSkuESByDetail(String detail, Pageable pageable);
/**
* 根據(jù)id進(jìn)行刪除
* @param id
*/
void removeFinanceSkuESById(Long id);
}
5.初始化es中的數(shù)據(jù)
運(yùn)行的后臺(tái)信息
查看es頁(yè)面的信息,index management
6.進(jìn)行全查詢(xún)以及分頁(yè)
進(jìn)行全查詢(xún)
{
"content": [
{
"id": 1,
"detail": "HUAWEI MateBook X Pro 2023 微絨典藏版 13代酷睿i7 32GB 2TB 14.2英寸3.1K原色全面屏 墨藍(lán)",
"price": 13999.0,
"stock": 50,
"status": 1
},
{
"id": 2,
"detail": "HUAWEI Mate 60 Pro+ 16GB+1TB 宣白",
"price": 9999.0,
"stock": 60,
"status": 1
},
{
"id": 3,
"detail": "iPhone 15 Pro Max 超視網(wǎng)膜 XDR 顯示屏",
"price": 9299.0,
"stock": 46,
"status": 1
},
{
"id": 4,
"detail": "MacBook Air Apple M2 芯片 8 核中央處理器 8 核圖形處理器 8GB 統(tǒng)一內(nèi)存 256GB 固態(tài)硬盤(pán)",
"price": 8999.0,
"stock": 60,
"status": 1
}
],
"pageable": {
"sort": {
"empty": true,
"sorted": false,
"unsorted": true
},
"offset": 0,
"pageSize": 4,
"pageNumber": 0,
"paged": true,
"unpaged": false
},
"totalElements": 4,
"last": true,
"totalPages": 1,
"number": 0,
"size": 4,
"sort": {
"empty": true,
"sorted": false,
"unsorted": true
},
"numberOfElements": 4,
"first": true,
"empty": false
}
帶條件分頁(yè)查詢(xún)
注意分頁(yè)查詢(xún)的page從0開(kāi)始,嘗試發(fā)現(xiàn)需要輸入分詞器分詞后最小單元,比如hu不是最小單元,而HUAWEI是
分詞器進(jìn)行分詞的結(jié)果
es和mysql的數(shù)據(jù)一致性
延遲雙刪
@Override
public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {
// 把es看做是緩存,如何保證es 和 mysql的 數(shù)據(jù)一致性?
// 延遲雙刪的模式
// 1.先刪除緩存 es
skuESMapper.deleteAll();
// 2.更新數(shù)據(jù)庫(kù) mysql
updateById(financeSkuES);
// 3.延時(shí)操作
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 4.再次刪除緩存 es
skuESMapper.deleteAll();
// 5.最后更新緩存 es
skuESMapper.saveAll(list());
Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
log.debug("byId: "+byId);
return byId.get();
}
上面代碼有不妥的地方,我這里是修改,結(jié)果一開(kāi)始直接從es中全部刪除,應(yīng)該是根據(jù)id把修改的數(shù)據(jù)刪除,然后把修改好的數(shù)據(jù)set進(jìn)es里面
加鎖的方式
感覺(jué)好像沒(méi)什么用的樣子,就是用了一下加鎖
用rabbitmq進(jìn)行解耦
配置yml文件
spring:
main:
allow-circular-references: true
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
### 本地的數(shù)據(jù)庫(kù)
url: jdbc:mysql://127.0.0.1:3306/consumer_finance_product?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&allowMultiQueries=true
username: root
password: 123
# redis的相關(guān)配置
redis:
host: 119.3.162.127
port: 6379
database: 0
password: Pet3927
# rabbitmq相關(guān)
rabbitmq:
host: 192.168.111.130
port: 5672
username: admin
password: 123
virtual-host: /test
# 生產(chǎn)者保證消息可靠性
publisher-returns: true
publisher-confirm-type: correlated
# 設(shè)置手動(dòng)確認(rèn)
listener:
simple:
acknowledge-mode: manual
rabbitmq的配置類(lèi)
將Java對(duì)象轉(zhuǎn)換成json字符串傳輸
package com.tianju.es.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
public static final String ES_EXCHANGE = "es_exchange";
public static final String ES_QUEUE = "es_queue";
public static final String ES_KEY = "es_key";
@Bean
public DirectExchange directExchange(){
return new DirectExchange(ES_EXCHANGE);
}
@Bean
public Queue esQueue(){
return new Queue(ES_QUEUE);
}
@Bean
public Binding esQueueToDirectExchange(){
return BindingBuilder.bind(esQueue())
.to(directExchange())
.with(ES_KEY);
}
/**
* 將對(duì)象轉(zhuǎn)換為json字符串
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());// 修改轉(zhuǎn)換器
return rabbitTemplate;
}
}
callback回調(diào)方法
package com.tianju.es.rabbit;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* 生產(chǎn)者消息可靠性
*/
// RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
@Configuration
@Slf4j
public class CallbackConfig
implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
// 初始化
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setMandatory(true);
}
/**
* 不管成功或者失敗都會(huì)執(zhí)行
* @param correlationData correlation對(duì)象需要在 發(fā)送消息時(shí)候 給
* @param ack true表示成功,false表示發(fā)送失敗
* @param cause 如果失敗的話(huà),會(huì)寫(xiě)失敗原因;如果成功,返回為null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.debug("ack是否成功:"+ack);
log.debug("cause信息:"+cause);
if (correlationData!=null){
JSONObject jsonObject = JSON.parseObject(correlationData.getReturnedMessage().getBody());
String exchange = correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange();
String routingKey = correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey();
log.debug("消息體:"+jsonObject);
log.debug("交換機(jī):"+exchange);
log.debug("路由key:"+routingKey);
}
if (ack){
return;
}
// 失敗了
// 1、重試重試上限次數(shù)(默認(rèn)值5)每重試一次時(shí)間間隔會(huì)增加
// 2、把消息、交換機(jī)名稱(chēng)、路由鍵等相關(guān)的消息保存到數(shù)據(jù)庫(kù),有一個(gè)程序定時(shí)掃描相關(guān)的消息,然后重新發(fā)送消息。
// 重發(fā)上限次數(shù)(默認(rèn)值5)超過(guò)閾值會(huì)轉(zhuǎn)人工處理
// 2、把消息體、交換機(jī)名稱(chēng)、路由鍵等相關(guān)的消息保存到數(shù)據(jù)庫(kù),有一個(gè)程序定時(shí)掃描相關(guān)的消息,然后重新發(fā)送消息。
// 重發(fā)上限次數(shù)(默認(rèn)值5)超過(guò)閾值會(huì)轉(zhuǎn)人工處理
// 2.1需要把相關(guān)的信息存放到數(shù)據(jù)中,表字段:消息體、交換機(jī)名稱(chēng)、路由鍵、狀態(tài)、次數(shù)
// 2.2定時(shí)任務(wù)(單體:spring定時(shí)任務(wù) 分布式:XxL-job),發(fā)送消息
}
/**
* 只有失敗了才會(huì)執(zhí)行
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 2、把消息、交換機(jī)名稱(chēng)、路由鍵等相關(guān)的消息保存到數(shù)據(jù)庫(kù),有一個(gè)程序定時(shí)掃描相關(guān)的消息,然后重新發(fā)送消息。
}
}
自定義發(fā)消息工具類(lèi)
package com.tianju.common.util;
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
@Slf4j
public class RabbitUtil {
/**
* 延遲隊(duì)列,發(fā)送消息,到達(dá)時(shí)間后進(jìn)入死信隊(duì)列中
* @param rabbitTemplate 調(diào)用的rabbitTemplate
* @param redisTemplate 用來(lái)在redis里面存token
* @param msg 發(fā)送的消息
* @param token 發(fā)送的token,用于保證冪等性
* @param ttl 如果是延遲消費(fèi),則消息的過(guò)期時(shí)間,到達(dá)改時(shí)間后進(jìn)入死信交換機(jī),到死信隊(duì)列中
* @param exchange 交換機(jī)名字
* @param routingKey 路由鍵名字
* @param <T> 發(fā)送消息的實(shí)體類(lèi)
*/
public static <T> void sendMsg(RabbitTemplate rabbitTemplate,
StringRedisTemplate redisTemplate,
T msg,String token,Integer ttl,
String exchange,String routingKey) {
log.debug("給交換機(jī)[{}]通過(guò)路由鍵[{}]發(fā)送消息 {},token為{}",exchange,routingKey,msg,token);
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
redisTemplate.opsForValue().set(token, token,5*60000);
message.getMessageProperties().setMessageId(token);
if (ttl!=null){
message.getMessageProperties().setExpiration(ttl.toString());
}
return message;
}
};
CorrelationData correlationData = new CorrelationData();
// 消息體
Message message = new Message(JSON.toJSONBytes(msg));
// 交換機(jī)名稱(chēng)
message.getMessageProperties().setReceivedExchange(exchange);
// 路由鍵
message.getMessageProperties().setReceivedRoutingKey(routingKey);
correlationData.setReturnedMessage(message);
// 發(fā)送MQ消息
rabbitTemplate.convertAndSend(exchange, // 發(fā)給交換機(jī)
routingKey, // 根據(jù)這個(gè)routingKey就會(huì)給到TTL隊(duì)列,到時(shí)間成死信,發(fā)給死信交換機(jī),到死信隊(duì)列
msg,
messagePostProcessor,
correlationData
);
}
}
進(jìn)行消息的發(fā)送
接口
package com.tianju.es.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.tianju.es.entity.FinanceSkuES;
public interface SkuService extends IService<FinanceSkuES> {
/**
* 延遲雙刪的方式,保證es 緩存 和 mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)一致性
* @param financeSkuES 修改的數(shù)據(jù)
* @return
*/
FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES);
/**
* 加鎖的方式,不過(guò)感覺(jué)沒(méi)啥用的樣子
* @param financeSkuES
* @return
*/
FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES);
/**
* 通過(guò)rabbitmq進(jìn)行解耦
* @param financeSkuES
* @return
*/
String updateByIdRabbitMQ(FinanceSkuES financeSkuES);
}
實(shí)現(xiàn)類(lèi)
package com.tianju.es.service.impl;
import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tianju.common.util.RabbitUtil;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import com.tianju.es.mapper.SkuMybatisPlusMapper;
import com.tianju.es.rabbit.RabbitConfig;
import com.tianju.es.service.SkuService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
@Service
public class SkuServiceImpl extends ServiceImpl<SkuMybatisPlusMapper,FinanceSkuES>
implements SkuService {
@Autowired
private SkuESMapper skuESMapper;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {
// 把es看做是緩存,如何保證es 和 mysql的 數(shù)據(jù)一致性?
// 延遲雙刪的模式
// 1.先刪除緩存 es
skuESMapper.deleteAll();
// 2.更新數(shù)據(jù)庫(kù) mysql
updateById(financeSkuES);
// 3.延時(shí)操作
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 4.再次刪除緩存 es
skuESMapper.deleteAll();
// 5.最后更新緩存 es
skuESMapper.saveAll(list());
Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
log.debug("byId: "+byId);
return byId.get();
}
@Override
public FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) {
// 第二種方式加鎖
String uuid = UUID.randomUUID().toString();
// 相當(dāng)于setnx指令
Boolean skuLock = stringRedisTemplate.opsForValue().setIfAbsent("skuLock", uuid);
try {
if (skuLock){ // 搶到了鎖
skuESMapper.deleteAll();
updateById(financeSkuES);
}
}finally {
if (uuid.equals(stringRedisTemplate.opsForValue().get("skuLock"))){
stringRedisTemplate.delete("skuLock");
}
}
skuESMapper.saveAll(list());
Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
log.debug("byId: "+byId);
return byId.get();
}
@Override
public String updateByIdRabbitMQ(FinanceSkuES financeSkuES) {
// 采用rabbitmq進(jìn)行解耦
updateById(financeSkuES);
FinanceSkuES skuES = getById(financeSkuES.getId());
String uuid = IdUtil.fastUUID();
RabbitUtil.sendMsg(
rabbitTemplate,stringRedisTemplate,skuES,uuid,null,
RabbitConfig.ES_EXCHANGE,RabbitConfig.ES_KEY
);
return "已經(jīng)發(fā)送消息:"+skuES;
}
}
接收到消息,更新es
接收到消息進(jìn)行es的更新,把原來(lái)的刪除,把最新的set進(jìn)去
package com.tianju.es.rabbit;
import com.rabbitmq.client.Channel;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class ESListener {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private SkuESMapper skuESMapper;
@RabbitListener(queues = RabbitConfig.ES_QUEUE)
public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) {
String messageId = message.getMessageProperties().getMessageId();
log.debug("進(jìn)行業(yè)務(wù)----> 監(jiān)聽(tīng)到隊(duì)列{}的消息,messageId為{}",financeSkuES,messageId);
try {
// 冪等性
if (redisTemplate.delete(messageId)){
// 根據(jù)id刪除原有的 es 數(shù)據(jù)
// 然后把新的數(shù)據(jù)set進(jìn)來(lái)
log.debug("處理es的業(yè)務(wù),刪除原有的,替換最新的");
skuESMapper.removeFinanceSkuESById(financeSkuES.getId());
skuESMapper.save(financeSkuES);
}
// 手動(dòng)簽收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
// 冪等性
redisTemplate.opsForValue().set(messageId,messageId,5*60000);
// 1、重試重試上限次數(shù)(默認(rèn)值5) 每重試一次時(shí)間間隔會(huì)增加
// 2、把消息、交換機(jī)名稱(chēng)、路由鍵等相關(guān)的消息保存到數(shù)據(jù)庫(kù),有一個(gè)程序定時(shí)掃描相關(guān)的消息,然后重新發(fā)送消息。
// 重發(fā)上限次數(shù)(默認(rèn)值5)超過(guò)閾值會(huì)轉(zhuǎn)人工處理
// 已知的消息,交換機(jī),路由器,消息 message.getBody() 消息發(fā)送給的是監(jiān)聽(tīng)的隊(duì)列
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
后臺(tái)打印的日志
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-719303.html
總結(jié)
1.elasticsearch的使用案例,包括結(jié)合MybatisPlus使用ES;
2.如何保證MySQL和es的數(shù)據(jù)一致性;
3.使用了RabbitMQ進(jìn)行解耦,自定義了發(fā)消息的方法。
到了這里,關(guān)于Elasticsearch使用——結(jié)合MybatisPlus使用ES & es和MySQL數(shù)據(jù)一致性 & 結(jié)合RabbitMQ實(shí)現(xiàn)解耦的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!