文章目錄
- 1.場景描述
-
- 1.1 場景1
- 1.2 場景2
- 2.原理
- 3.實戰(zhàn)開發(fā)
-
- 3.1 建表
- 3.2 集成mybatis-plus
- 3.3 集成RabbitMq
-
- 3.3.1 安裝mq
- 3.3.2 springBoot集成mq
- 3.4 具體實現(xiàn)
-
- 3.4.1 mq配置類
- 3.4.2 生產(chǎn)者
- 3.4.3 消費者
1.場景描述
消息中間件是分布式系統(tǒng)常用的組件,無論是異步化、解耦、削峰等都有廣泛的應用價值。我們通常會認為,消息中間件是一個可靠的組件——這里所謂的可靠是指,只要我把消息成功投遞到了消息中間件,消息就不會丟失,即消息肯定會至少保證消息能被消費者成功消費一次,這是消息中間件最基本的特性之一,也就是我們常說的“AT LEAST ONCE”,即消息至少會被“成功消費一遍”。
1.1 場景1
什么意思呢?舉個例子:一個消息M發(fā)送到了消息中間件,消息投遞到了消費程序A,A接受到了消息,然后進行消費,但在消費到一半的時候程序重啟了,這時候這個消息并沒有標記為消費成功,這個消息還會繼續(xù)投遞給這個消費者,直到其消費成功了,消息中間件才會停止投遞。
這種情景就會出現(xiàn)消息可能被多次地投遞。
1.2 場景2
還有一種場景是程序A接受到這個消息M并完成消費邏輯之后,正想通知消息中間件“我已經(jīng)消費成功了”的時候,程序就重啟了,那么對于消息中間件來說,這個消息并沒有成功消費過,所以他還會繼續(xù)投遞。這時候對于應用程序A來說,看起來就是這個消息明明消費成功了,但是消息中間件還在重復投遞。
以上兩個場景對于消息隊列來說就是同一個messageId的消息重復投遞下來了。
我們利用消息id來判斷消息是否已經(jīng)消費過,如果該信息被消費過,那么消息表中已經(jīng) 會有一條數(shù)據(jù),由于消費時會先執(zhí)行插入操作,此時會因為主鍵沖突無法重復插入,我們就利用這個原理來進行冪等的控制,消息內容可以用json格式來進行傳輸?shù)摹?/p>
3.實戰(zhàn)開發(fā)
3.1 建表
DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (
`message_id` varchar(50) NOT NULL COMMENT '消息ID',
`message_content` varchar(2000) DEFAULT NULL COMMENT '消息內容',
`status` int DEFAULT '0' COMMENT '消費狀態(tài)(0-未消費成功;1-消費成功)',
`retry_times` int DEFAULT '0' COMMENT '重試次數(shù)',
`type` int DEFAULT '0' COMMENT '消費類型',
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.2 集成mybatis-plus
《springBoot集成mybatisPlus》
3.3 集成RabbitMq
3.3.1 安裝mq
推薦使用docker安裝rabbitmq,還未安裝的可以參考以下信息:
- docker安裝
3.3.2 springBoot集成mq
- 1.添加依賴
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.4 生產(chǎn)者具體實現(xiàn)
3.4.1 mq配置類
- DirectRabbitConfig
具體如何開啟可以參考《rabbitMq實現(xiàn)死信隊列》
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
//正常交換機的名字
public final static String EXCHANGE\_NAME = "exchange\_name";
//正常隊列的名字
public final static String QUEUE\_NAME="queue\_name";
//死信交換機的名字
public final static String EXCHANGE\_DEAD = "exchange\_dead";
//死信隊列的名字
public final static String QUEUE\_DEAD="queue\_dead";
//死信路由key
public final static String DEAD\_KEY="dead.key";
//創(chuàng)建正常交換機
@Bean(EXCHANGE\_NAME)
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)
//持久化 mq重啟后數(shù)據(jù)還在
.durable(true)
.build();
}
//創(chuàng)建正常隊列
@Bean(QUEUE\_NAME)
public Queue queue(){
//正常隊列和死信進行綁定 轉發(fā)到 死信隊列,配置參數(shù)
Map<String,Object>map=getMap();
return new Queue(QUEUE\_NAME,true,false,false,map);
}
//正常隊列綁定正常交換機 設置規(guī)則 執(zhí)行綁定 定義路由規(guī)則 requestmaping映射
@Bean
public Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,
@Qualifier(EXCHANGE\_NAME) Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
//路由規(guī)則
.with("app.#")
.noargs();
}
//創(chuàng)建死信隊列
@Bean(QUEUE\_DEAD)
public Queue queueDead(){
return new Queue(QUEUE\_DEAD,true,false,false);
}
//創(chuàng)建死信交換機
@Bean(EXCHANGE\_DEAD)
public Exchange exchangeDead(){
return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD)
.durable(true) //持久化 mq重啟后數(shù)據(jù)還在
.build();
}
//綁定死信隊列和死信交換機
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(queueDead())
.to(exchangeDead())
//路由規(guī)則 正常路由key
.with(DEAD\_KEY)
.noargs();
}
/\*\*
獲取死信的配置信息
\*
\*\*/
public Map<String,Object>getMap(){
//3種方式 任選其一,選擇其他方式之前,先把交換機和隊列刪除了,在啟動項目,否則報錯。
//方式一
Map<String,Object> map=new HashMap<>(16);
//死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;
map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);
//死信消息路由鍵,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設置,則使用消息的原來的路由鍵值
map.put("x-dead-letter-routing-key", DEAD\_KEY);
//方式二
//消息的過期時間,單位:毫秒;達到時間 放入死信隊列
// map.put("x-message-ttl",5000);
//方式三
//隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;放入死信隊列一條數(shù)據(jù)
// map.put("x-max-length",3);
return map;
}
}
- 延遲隊列配置
具體如何開啟可以參考《rabbitMq實現(xiàn)死信隊列》
由于rabbitMq中不直接支持死信隊列,需要我們利用插件rabbitmq_delayed_messgae_exchage進行開啟
/**
* 定義延遲交換機
*/
@Configuration
public class RabbitMQDelayedConfig {
//隊列
private static final String DELAYQUEUE = "delayedqueue";
//交換機
private static final String DELAYEXCHANGE = "delayedExchange";
@Bean
public Queue delayqueue(){return new Queue(DELAYQUEUE);}
//自定義延遲交換機
@Bean
public CustomExchange delayedExchange(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type","direct");
/**
* 1、交換機名稱
* 2、交換機類型
* 3、是否需要持久化
* 4、是否需要自動刪除
* 5、其他參數(shù)
*/
return new CustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);
}
//綁定隊列和延遲交換機
@Bean
public Binding delaybinding(){
return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();
}
}
3.4.2 生產(chǎn)者
- 1.消費隊列的生產(chǎn)者
import com.example.shop.config.RabbitmqConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class Sender_Direct {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 用于消費訂單
*
* @param orderId
*/
public void send2Direct(String orderId) {
//創(chuàng)建消費對象,并指定全局唯一ID(這里使用UUID,也可以根據(jù)業(yè)務規(guī)則生成,只要保證全局唯一即可)
MessageProperties messageProperties = new MessageProperties();
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, "內容設置", message -> {
//設置消息的id為唯一
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
message.getMessageProperties().setMessageId(orderId);
return message;
});
}
}
3.4.3 消費者
1.開啟手動ack配置
spring:
application:
name: shop
rabbitmq:
host: 192.168.1.102
port: 5673
virtual-host: /
username: guest
password: guest
listener:
simple:
# 表示消費者消費成功消息以后需要手工的進行簽收(ack確認),默認為 auto
acknowledge-mode: manual
消費者要配置ack重試機制,具體參考前幾篇文章,使用的是mysql消息ID的唯一性,有時候可能生成一樣的訂單,具體的沒有進行實驗,內容是json生成的,可以執(zhí)行業(yè)務文章來源:http://www.zghlxwxcb.cn/news/detail-694486.html
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.des.Bean.MessageIdempotent;
import com.example.des.Bean.Shop;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Receiver_Direct {
private static final Integer delayTimes = 30;//延時消費時間,單位:秒
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = {"smsQueue"})
public void receiveD(Message message, Channel channel) throws IOException {
try {
// 獲取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody());//獲取消息
//向數(shù)據(jù)庫插入數(shù)據(jù)
MessageIdempotent messageIdempotent = new MessageIdempotent();
messageIdempotent.setMessageId(messageId);
messageIdempotent.setMessageContent(msg);
messageIdempotent.setRetryTimes(0);
System.out.println(messageIdempotent.toString());
Boolean save = true; //設置保存成功,消息投遞失敗是在確認模式那里
if (!save) {//說明屬于重重復請求
//1、處理消息內容的業(yè)務,解析json數(shù)據(jù)
//2、創(chuàng)建訂單,并保存
Boolean flag = consumeOrder(new Shop());
if (flag){
//投入延遲隊列,如果30分鐘訂單還沒有消費,就刪除訂單
rabbitTemplate.convertAndSend("delayedExchange","sectest",message,message1->{
//設置發(fā)送消息的延長時間 單位:ms,表示30分鐘
message1.getMessageProperties().setDelay(1000*60*30);
return message1;
});
//更新消息狀態(tài),消費成功,
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}else {
//延遲投入死信,進行重試
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
} else {
//1、處理消息內容的業(yè)務,解析json數(shù)據(jù)
//2、創(chuàng)建訂單,并保存
//投入死信隊列
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}catch (Exception e){
System.out.println("錯誤信息");
}
}
private boolean consumeOrder(Shop shop) {
return true;
}
@RabbitListener(queues = {" delay.queue.demo.delay.queue"})
public void dead(String payload, Message message, Channel channel) throws IOException {
System.out.println("死信隊列:"+payload);
//刪除消息 將數(shù)據(jù)庫狀態(tài)更新為失敗,更新郵件或者消息通知,有時候可以人工消費
long deliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,true);
}
@RabbitListener(queues = "delayedqueue")
public void receivemsg(Message messages){
//查詢有沒有被消費,也就是更新成功,有時候需要樂觀鎖
}
}
至此mq的消息重復以及冪等的信息處理就很完美的解決了,當然本文以數(shù)據(jù)庫為例進行實現(xiàn),感興趣的可以嘗試使用redis來進行實現(xiàn)文章來源地址http://www.zghlxwxcb.cn/news/detail-694486.html
到了這里,關于rabbitmq+springboot實現(xiàn)冪等性操作的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!