?
?
1.消息可靠性
三種丟失的情形:
1.1? 生產(chǎn)者確認(rèn)機(jī)制?
?啟動MQ
創(chuàng)建Queues:?
兩種Callback:
1.ReturnCallback:全局callback?
?2.ComfirmCallback:?發(fā)送信息時候設(shè)置
?
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.準(zhǔn)備消息
String message = "hello, spring amqp!";
// 2.準(zhǔn)備CorrelationData
// 2.1.消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.準(zhǔn)備ConfirmCallback
correlationData.getFuture().addCallback(result -> {
// 判斷結(jié)果
if (result.isAck()) {
// ACK
log.debug("消息成功投遞到交換機(jī)!消息ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投遞到交換機(jī)失??!消息ID:{}", correlationData.getId());
// 重發(fā)消息
}
}, ex -> {
// 記錄日志
log.error("消息發(fā)送失??!", ex);
// 重發(fā)消息
});
// 3.發(fā)送消息
rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
}
?執(zhí)行成功:
?監(jiān)控頁面:
模擬失?。?/p>
?1.投遞到交互機(jī)失敗
2.投遞到交換機(jī)了,但是沒有進(jìn)入隊列?
?
1.2 消息持久化?
注意: 生產(chǎn)者確認(rèn)只能保證數(shù)據(jù)放到隊列當(dāng)中,但是無法保證數(shù)據(jù)不丟失(比如所在的機(jī)器宕機(jī)了),
所以還需要保證數(shù)據(jù)的持久化
@Configuration
public class CommonConfig {
@Bean
public DirectExchange simpleDirect(){
return new DirectExchange("simple.direct");
}
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
}
}
@Test
public void testDurableMessage() {
// 1.準(zhǔn)備消息 消息持久化
Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2.發(fā)送消息
rabbitTemplate.convertAndSend("simple.queue", message);
}
?注意:
//交換機(jī)不傳值默認(rèn)就是持久化
//交換機(jī)、隊列、消息默認(rèn)都是持久化
@Bean
public DirectExchange simpleDirect(){
return new DirectExchange("simple.direct");
}
public AbstractExchange(String name) {
this(name, true, false);
}
? 演示數(shù)據(jù)是否默認(rèn)持久化:?
? ? ?重啟mq:
?1. 交互機(jī)、隊列、消息都做持久化
? 2.消費者端關(guān)閉防止被消費
? 3.重啟mq后看隊列中數(shù)據(jù)是否還在(是否持久化)
?1.3? 消費者消息確認(rèn)
生產(chǎn)者確認(rèn):能確定消息投遞到隊列
消息持久化:能避免MQ宕機(jī)造成的消息丟失
生產(chǎn)者確認(rèn)和消息持久化能保證消息能投遞到消費者,但是無法保證消息被消費者消費(比如投遞消費者的
同時,消費者所在機(jī)器宕機(jī)了)
1.manual:不推薦 代碼侵入
try{
//業(yè)務(wù)邏輯
ack
} catch(ex){
nack
}
2.auto:推薦 spring全權(quán)完成,不需要手動寫代碼
3.none:不推薦 投遞完成立馬刪除消息,是否成功都不管
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.debug("消費者接收到simple.queue的消息:【" + msg + "】");
//模擬出現(xiàn)異常情況
System.out.println(1 / 0);
log.info("消費者處理消息成功!");
}
}
默認(rèn)為none:拋出異常后消息立即被刪除:
?修改為auto模式:
隊列返回nack會再去發(fā)送信息:?
1.4 失敗重試機(jī)制
?演示失敗重試機(jī)制:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true
initial-interval: 1000
multiplier: 3
max-attempts: 4
?默認(rèn)重試到達(dá)最大次數(shù)后消息就丟棄:
? ? ? ?但是對于一些比較重要不能丟棄的消息需要使用以下策略:? ???
推薦使用第三種方案:將失敗的消息發(fā)送到失敗的交換機(jī)和失敗的隊列中,后面可以告知管理員然后重新
人工去處理
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
?演示:
發(fā)送消息:
?
?面試題:最后一分鐘的總結(jié)
?2. 死信交換機(jī)
?2.1? 初識死信交換機(jī)
1.發(fā)送信息到消費者默認(rèn)的retry重試機(jī)制,達(dá)到最大次數(shù)就會被reject
2.隊列中綁定一個死信交換機(jī),接收被reject的信息,然后發(fā)送到dl.queue
3.這樣就不擔(dān)心死信會丟失
對比消息失敗信息處理策略:
2.2? TTL?
?注意: 存活時間取消息所在隊列中存貨時間 、消息本身存活時間的以短的時間為準(zhǔn)
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg) {
log.info("消費者接收到了dl.queue的延遲消息");
}
}
@Configuration
public class TTLMessageConfig {
@Bean
public DirectExchange ttlDirectExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Queue ttlQueue(){
return QueueBuilder
.durable("ttl.queue")
.ttl(10000)
.deadLetterExchange("dl.direct")
.deadLetterRoutingKey("dl")
.build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
}
}
?
@Test
public void testTTLMessage() {
// 1.準(zhǔn)備消息
Message message = MessageBuilder
.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setExpiration("5000")
.build();
// 2.發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
// 3.記錄日志
log.info("消息已經(jīng)成功發(fā)送!");
}
? 演示延時隊列:
? 1.啟動消費者?
? 2.發(fā)送消息:testTTLMessage()?
2.3 延遲隊列?
p159 27:18?
?
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg) {
log.info("消費者接收到了delay.queue的延遲消息");
}
}
?
?
?文章來源地址http://www.zghlxwxcb.cn/news/detail-701346.html
@Test
public void testSendDelayMessage() throws InterruptedException {
// 1.準(zhǔn)備消息
Message message = MessageBuilder
.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", 5000)
.build();
// 2.準(zhǔn)備CorrelationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.發(fā)送消息
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.info("發(fā)送消息成功");
}
演示延時隊列:
1.啟動消費者
?2.運行testSendDelayMessage
報錯原因:消息沒有做路由
?如何不報錯:添加延遲的判斷:?
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate對象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判斷是否是延遲消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
// 是一個延遲消息,忽略這個錯誤提示
return;
}
// 記錄日志
log.error("消息發(fā)送到隊列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的話,重發(fā)消息
});
}
}
文章來源:http://www.zghlxwxcb.cn/news/detail-701346.html
?
到了這里,關(guān)于高級篇-rabbitmq的高級特性的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!