// consumer處理成功后,通知broker刪除隊列中的消息,如果設(shè)置multiple=true,表示支持批量確認機制以減少網(wǎng)絡(luò)流量
channel.basicAck(deliveryTag, multiple);
// 拒絕deliveryTag對應(yīng)的消息,第二個參數(shù)是否requeue,true則重新入隊列,否則丟棄或者進入死信隊列,該方法reject后,該消費者還是會消費到該條被reject的消息
channel.basicReject(deliveryTag, requeue);
// 不確認 deliveryTag 對應(yīng)的消息,第二個參數(shù)是否應(yīng)用于多消息,第三個參數(shù)是否requeue,與basic.reject區(qū)別就是同時支持多個消息,可以nack該消費者先前接收未ack的所有消息。nack后的消息也會被自己消費到。
channel.basicNack(deliveryTag, multiple, requeue);
// 是否恢復(fù)消息到隊列,參數(shù)是是否requeue,true則重新入隊列,并且盡可能的將之前recover的消息投遞給其他消費者消費,而不是自己再次消費。false則消息會重新被投遞給自己。
channel.basicRecover(false);
搭建項目
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
@Configuration
public class RabbitMQConfig {
// 正常業(yè)務(wù)
public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
public static final String NORMAL_QUEUE_A = "normal-queue-a";
public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
// 死信隊列
public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
public static final String DEAD_QUEUE_A = "dead-queue-a";
public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
// 聲明交換機
@Bean("businessExchange")
public TopicExchange normalExchangeA() {
return new TopicExchange(NORMAL_EXCHANGE_A);
}
@Bean("deadExchange")
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE_A);
}
// 聲明隊列
@Bean("businessQueueA")
public Queue businessQueueA() {
HashMap<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
}
@Bean("deadQueueA")
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE_A).build();
}
// 聲明綁定關(guān)系
@Bean
public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
}
@Bean
public Binding bindingDead(@Qualifier("deadQueueA") Queue queue, @Qualifier("deadExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING_KEY_A);
}
}
@Component
public class SmsListener {
@RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("收到消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
}
channel.basicAck(deliveryTag, false);
}
}
@Component
public class DeadListener {
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE_A)
public void deadListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("dead listener: " + body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RestController
public class HelloController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/hello")
public Boolean hello(String msg) {
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
return true;
}
}
使用注解
@Configuration
public class RabbitMQConfig {
// 正常業(yè)務(wù)
public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
public static final String NORMAL_QUEUE_A = "normal-queue-a";
public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
// 死信隊列
public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
public static final String DEAD_QUEUE_A = "dead-queue-a";
public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
// 聲明交換機
@Bean("businessExchange")
public TopicExchange normalExchangeA() {
return new TopicExchange(NORMAL_EXCHANGE_A);
}
// 聲明隊列
@Bean()
public Queue businessQueueA() {
HashMap<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
}
// 聲明綁定關(guān)系
@Bean
public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
}
}
死信隊列使用注解實現(xiàn)
@Component
public class DeadListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A),
exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, type = ExchangeTypes.DIRECT),
key = RabbitMQConfig.DEAD_ROUTING_KEY_A
))
public void deadListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("死信隊列消費消息: " + body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@Component
public class SmsListener {
@RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
// exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
// key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
// ))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消費消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
// return;
}
channel.basicAck(deliveryTag, false);
}
}
報錯:
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
// 由于程序編寫不嚴謹,在 basicNack 執(zhí)行后沒有退出方法,導(dǎo)致最后還執(zhí)行了 basicAck,出現(xiàn)了上述錯誤
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消費消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
return;
}
channel.basicAck(deliveryTag, false);
}
// 問題二: 控制臺報錯,但是也能正常消費mq消息,這里與第一種唯一的區(qū)別是在于 @RabbitListener, 我的推測是 自定義 bean 和注解生成的 bean 重復(fù)導(dǎo)致,看能不能使用注解綁定死信隊列
2023-04-23 22:03:25.630 ERROR 8580 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'normal-queue-a' in vhost '/': received none but current is the value 'dead-exchange-a' of type 'longstr', class-id=50, method-id=10)
Broker not available; cannot force queue declarations during start: java.io.IOException
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {
String TRUE = "true";
String FALSE = "false";
@AliasFor("name")
String value() default "";
@AliasFor("value")
String name() default "";
String type() default "direct";
String durable() default "true";
String autoDelete() default "false";
String internal() default "false";
String ignoreDeclarationExceptions() default "false";
String delayed() default "false";
Argument[] arguments() default {};
String declare() default "true";
String[] admins() default {};
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC, arguments = {
@Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
@Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
}),
key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消費消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
return;
}
channel.basicAck(deliveryTag, false);
}
可以使用注解的方式來綁定 死信隊列,但是還是會報上面的錯誤,繼續(xù)修改 參數(shù)試試
java - How to set x-dead-letter-exchange in Rabbit? - Stack Overflow
但是使用注解綁定的話好像又不生效了,問題原因,tmd將死信參數(shù)綁到交換機上了,c
修改代碼
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
@Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
@Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
}),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消費消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
return;
}
channel.basicAck(deliveryTag, false);
}
?至于問題二是由于隊列和交換機默認持久化,這樣就導(dǎo)第二次啟動項目時重復(fù)
Springboot純注解版的RabbitMq 死信隊列_注解聲明私信隊列_lopo呀的博客-CSDN博客
全注解版
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
// 正常業(yè)務(wù)
public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
public static final String NORMAL_QUEUE_A = "normal-queue-a";
public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
// 死信隊列
public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
public static final String DEAD_QUEUE_A = "dead-queue-a";
public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
@Component
public class SmsListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
@Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
@Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
}),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消費消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
return;
}
channel.basicAck(deliveryTag, false);
}
}
@Component
public class DeadListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A, durable = "false"),
exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, durable = "false"),
key = RabbitMQConfig.DEAD_ROUTING_KEY_A
))
public void deadListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("死信隊列消費消息: " + body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@GetMapping("/hello")
public Boolean hello(String msg) {
System.out.println("發(fā)送消息:" + msg);
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
return true;
}
// conslog
發(fā)送消息:dead
正常消費消息:dead
死信隊列消費消息: dead
明天在研究下回調(diào)啥的
springboot整合rabbitMQ confirm 確認模式 return 退回模式_weixin_44318244的博客-CSDN博客
回調(diào)
在使用 RabbitMQ 的時候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場景。 RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。
-
confirm 確認模式
-
return 退回模式
rabbitmq 整個消息投遞的路徑為:producer—>rabbitmq broker—>exchange—>queue—>consumer
-
消息從 producer 到 exchange 則會返回一個 confirmCallback 。
-
消息從 exchange–>queue 投遞失敗則會返回一個 returnCallback
我們將利用這兩個 callback 控制消息的可靠性投遞
消息的可靠投遞小結(jié) ? 設(shè)置ConnectionFactory的publisher-confirms=“true” 開啟 確認模式。 ? 使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。當消息發(fā)送到exchange后回調(diào)confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。
? 設(shè)置ConnectionFactory的publisher-returns=“true” 開啟 退回模式。 ? 使用rabbitTemplate.setReturnCallback設(shè)置退回函數(shù),當消息從exchange路由到 queue失敗后,如果設(shè)置了rabbitTemplate.setMandatory(true)參數(shù),則會將消息退回給producer。并執(zhí)行回調(diào)函數(shù)returnedMessage。
確認模式
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
publisher-confirm-type: correlated # 發(fā)布確認屬性配置
publisher-returns: true # 開啟 退回模式
public enum ConfirmType {
/**
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
* within scoped operations.
SIMPLE值經(jīng)測試有兩種效果,其一效果和CORRELATED值一樣會觸發(fā)回調(diào)方法,其二在發(fā)布消息成功后使用rabbitTemplate調(diào)用waitForConfirms或 waitForConfirmsOrDie方法等待broker節(jié)點返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會 關(guān)閉channel,則接下來無法發(fā)送消息到broker;
*/
SIMPLE,
/**
* Use with {@code CorrelationData} to correlate confirmations with sent 發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法
* messsages.
*/
CORRELATED,
/**
* Publisher confirms are disabled (default).
*/
NONE
}
@Configuration
public class PublisherConfirmHandler implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("發(fā)送消息到交換機成功!MessageId: " + correlationData.getId());
}else {
System.out.println("發(fā)送消息到交換機失?。essageId: " + correlationData.getId() + ", 退回原因:" + cause);
}
}
}
@Resource
private PublisherConfirmHandler publisherConfirmHandler;
rabbitTemplate.setConfirmCallback(publisherConfirmHandler);
文章來源:http://www.zghlxwxcb.cn/news/detail-434000.html
回退模式 ?
@Configuration
public class ReturnsCallbackHandler implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("return 執(zhí)行了!" + returned);
}
}
?文章來源地址http://www.zghlxwxcb.cn/news/detail-434000.html
//
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallbackHandler);
到了這里,關(guān)于RabbitMQ 死信隊列實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!