在生產(chǎn)環(huán)境中由于一些不明原因,導(dǎo)致 rabbitmq 重啟,在 RabbitMQ 重啟期間生產(chǎn)者消息投遞失敗,導(dǎo)致消息丟失,需要手動(dòng)處理和恢復(fù)。于是,我們?nèi)绾尾拍苓M(jìn)行 RabbitMQ 的消息可靠投遞。
發(fā)布確認(rèn) ?
發(fā)布確認(rèn)方案
?架構(gòu)
?
配置文件?
在配置文件當(dāng)中添加?spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.host=43.139.59.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
配置類(lèi)
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
//聲明業(yè)務(wù) Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
// 聲明確認(rèn)隊(duì)列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 聲明確認(rèn)隊(duì)列綁定關(guān)系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
?生產(chǎn)者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MyCallBack myCallBack;
//依賴(lài)注入 rabbitTemplate 之后再設(shè)置它的回調(diào)對(duì)象
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
//指定消息 id 為 1
CorrelationData correlationData1=new CorrelationData("1");
String routingKey="key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
CorrelationData correlationData2=new CorrelationData("2");
routingKey="key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);
log.info("發(fā)送消息內(nèi)容:{}",message);
}
}
?回調(diào)接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交換機(jī)不管是否收到消息的一個(gè)回調(diào)方法
* CorrelationData
* 消息相關(guān)數(shù)據(jù)
* ack
* 交換機(jī)是否收到消息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交換機(jī)已經(jīng)收到 id 為:{}的消息",id);
}else{
log.info("交換機(jī)還未收到 id 為:{}消息,由于原因:{}",id,cause);
}
}
}
?消費(fèi)者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues =CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg=new String(message.getBody());
log.info("接受到隊(duì)列 confirm.queue 消息:{}",msg);
}
}
?結(jié)果
?回退消息
Mandatory 參數(shù)?
?生產(chǎn)者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;
@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MyCallBack myCallBack;
//依賴(lài)注入 rabbitTemplate 之后再設(shè)置它的回調(diào)對(duì)象
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
/**
* true:
* 交換機(jī)無(wú)法將消息進(jìn)行路由時(shí),會(huì)將該消息返回給生產(chǎn)者
* false:
* 如果發(fā)現(xiàn)消息無(wú)法進(jìn)行路由,則直接丟棄
*/
rabbitTemplate.setMandatory(true);
//設(shè)置回退消息交給誰(shuí)處理
rabbitTemplate.setReturnCallback(myCallBack);
}
@GetMapping("sendMessage")
public void sendMessage(String message){
//讓消息綁定一個(gè) id 值
CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message+"key1",correlationData1)
;
log.info("發(fā)送消息 id 為:{}內(nèi)容為{}",correlationData1.getId(),message+"key1");
CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",message+"key2",correlationData2)
;
log.info("發(fā)送消息 id 為:{}內(nèi)容為{}",correlationData2.getId(),message+"key2");
}
}
?回調(diào)接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
/**
* 交換機(jī)不管是否收到消息的一個(gè)回調(diào)方法
* CorrelationData
* 消息相關(guān)數(shù)據(jù)
* ack
* 交換機(jī)是否收到消息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交換機(jī)已經(jīng)收到 id 為:{}的消息",id);
}else{
log.info("交換機(jī)還未收到 id 為:{}消息,由于原因:{}",id,cause);
}
}
//當(dāng)消息無(wú)法路由的時(shí)候的回調(diào)方法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
log.error(" 消 息 {}, 被交換機(jī) {} 退回,退回原因 :{}, 路 由 key:{}",new
String(message.getBody()),exchange,replyText,routingKey);
}
}
?結(jié)果
?接收到被退回的消息
備份交換機(jī)?
架構(gòu)
?配置類(lèi)
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
// 聲明確認(rèn)隊(duì)列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//聲明確認(rèn)隊(duì)列綁定關(guān)系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
//聲明備份 Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//聲明確認(rèn) Exchange 交換機(jī)的備份交換機(jī)
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
ExchangeBuilder exchangeBuilder =
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
//設(shè)置該交換機(jī)的備份交換機(jī)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}
// 聲明警告隊(duì)列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
// 聲明報(bào)警隊(duì)列綁定關(guān)系
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}
// 聲明備份隊(duì)列
@Bean("backQueue")
public Queue backQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
// 聲明備份隊(duì)列綁定關(guān)系
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}
}
?報(bào)警消費(fèi)者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues =CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg=new String(message.getBody());
log.info("接受到隊(duì)列 confirm.queue 消息:{}",msg);
}
}
?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-670579.html
結(jié)果?
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-670579.html
?
到了這里,關(guān)于springboot整合rabbitmq發(fā)布確認(rèn)高級(jí)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!