目錄
一.消息可靠性傳遞概述
二.生產(chǎn)者消息確認(rèn)機(jī)制
三.publisher-comfirm
四.publisher-return
五.消息持久化
六.消費(fèi)者消息確認(rèn)機(jī)制
七.如何確保RabbitMQ消息的可靠性?
八.死信交換機(jī)
九.延遲隊(duì)列
十.惰性隊(duì)列
十一.MQ集群
一.消息可靠性傳遞概述
生產(chǎn)者發(fā)送消息到交換機(jī),交換機(jī)將消息路由到隊(duì)列,消費(fèi)者從隊(duì)列獲取消息。哪些環(huán)節(jié)會(huì)導(dǎo)致消息丟失。
1.生產(chǎn)者發(fā)送消息丟失
- 生產(chǎn)者沒有將消息發(fā)送到交換機(jī)
- 交換機(jī)沒有成功將消息路由到隊(duì)列
2.MQ宕機(jī)導(dǎo)致消息丟失
3.消費(fèi)者處理消息丟失
消費(fèi)者獲取到消息后,未來(lái)得及處理,宕機(jī)
消費(fèi)者獲取到消息后,處理消息拋異常。
二.生產(chǎn)者消息確認(rèn)機(jī)制
生產(chǎn)者消息確認(rèn)機(jī)制一共有2種方式
- publisher-comfirm
- publisher-return
在publisher這個(gè)微服務(wù)的application.yml中添加配置
spring: ? ? rabbitmq: ? ? publisher-confirm-type: correlated? ? ? publisher-returns: true? ? ? ? template: ? ? ? ? mandatory: true
1.publish-confirm-type:
開啟publisher-confirm,這里支持兩種類型:
- simple:同步等待confirm結(jié)果,直到超時(shí)
- correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
2.publish-returns:
開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback
3.template.mandatory:
定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
三.publisher-comfirm
作用:開啟后,生產(chǎn)者發(fā)送消息到RabbitMQ交換機(jī),RabbitMQ會(huì)進(jìn)行結(jié)果返回。
- ack:生產(chǎn)者成功將消息發(fā)送到隊(duì)列
- nack:生產(chǎn)者發(fā)送到交換機(jī)失敗
如何使用
1.在生產(chǎn)者配置文件中開啟
spring:
? rabbitmq:
? ? publisher-confirm-type: correlated ?# 異步回調(diào)
2.如何接受RabbitMQ結(jié)果返回
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
? ? ? ? ? ? ? ??
? ? ? ? ? ? }
? ? ? ? });
correlationData:消息的唯一標(biāo)識(shí)
ack
- true:生產(chǎn)者成功將消息發(fā)送到交換機(jī)
- false:生產(chǎn)者發(fā)送到交換機(jī)失敗
cause:失敗原因
注意點(diǎn):rabbitTemplate.setConfirmCallback方法的調(diào)用,只能調(diào)用一次。
3.如何保證rabbitTemplate.setConfirmCallback方法只會(huì)被調(diào)用一次
方案1:初始化方法
@PostConstruct
public void init() {
? ? rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
? ? ? ? ?@Override
? ? ? ? ?public void confirm(CorrelationData correlationData, boolean ack, String cause) {
? ? ? ? ? ? }
? ? ? ? });
? ? }
方案2:CommandLineRunner(推薦)
@Component
public class MyComandLineRunner implements CommandLineRunner {
? ??
? ? @Autowired
? ? private RabbitTemplate rabbitTemplate;
? ? @Override
? ? public void run(String... args) throws Exception {
? ? ? ? rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void confirm(CorrelationData correlationData, boolean ack, String cause) {
? ? ? ? ? ? ? ??
? ? ? ? ? ? }
? ? ? ? });
? ? }
}
方案3:實(shí)現(xiàn)ApplicationContextAware實(shí)現(xiàn)類
@Component
public class MyApplicationContext implements ApplicationContextAware {
? ? @Override
? ? public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
? ? ? ? RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
? ? ? ? rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void confirm(CorrelationData correlationData, boolean ack, String cause) {
? ? ? ? ? ? ? ??
? ? ? ? ? ? }
? ? ? ? });
? ? }
}
4.發(fā)送消息的時(shí)候,需要給消息指定一個(gè)唯一標(biāo)識(shí)
?
rabbitTemplate.convertAndSend(exchangeName,routingKey,message,new CorrelationData(id));
5.邏輯問題:如果生產(chǎn)者發(fā)送消息到交換機(jī)失敗了?怎么重發(fā)
- 在發(fā)送消息之前,將消息先存儲(chǔ)到數(shù)據(jù)庫(kù)(MySQL,Redis)
- 如果消息發(fā)送交換機(jī)失敗,讀取Redis中信息,重新發(fā)送
6.測(cè)試
1.成功向交換機(jī)發(fā)送消息:觀察
- correlationData
- ack
- cause
2.發(fā)送消息到交換機(jī)失?。簞h除交換機(jī)
四.publisher-return
作用:開啟后,交換機(jī)將消息路由到消息隊(duì)列失敗,RabbitMQ會(huì)進(jìn)行結(jié)果返回。
如何使用
1.在生產(chǎn)者配置文件中開啟
spring:
? rabbitmq:
? ? publisher-returns: true
? ? ? template:
? ? ? ? mandatory: true
template:mandatory: true定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
2.如何接受RabbitMQ結(jié)果返回
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
? ? ? ? ? ? ? ??
? ? ? ? ? ? }
? ? ? ? });
- message:交換機(jī)路由隊(duì)列失敗的那個(gè)消息
- replyCode:錯(cuò)誤碼
- replyText:錯(cuò)誤信息
- exchange:交換機(jī)
- routingKey:路由key
3.注意點(diǎn)
rabbitTemplate.setReturnCallback方法的調(diào)用,只能調(diào)用一次。
方案1:初始化方法
@PostConstruct
? ? public void init() {
? ? ? ? rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
? ? ? ? ? ? ? ??
? ? ? ? ? ? }
? ? ? ? });
? ? }
方案2:
@Component
public class MyComandLineRunner implements CommandLineRunner {
? ??
? ? @Autowired
? ? private RabbitTemplate rabbitTemplate;
? ? @Override
? ? public void run(String... args) throws Exception {
? ? ? ? ? ? rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
? ? ? ? ? ? ? ??
? ? ? ? ? ? }
? ? ? ? });
? ? }
}
方案3:實(shí)現(xiàn)ApplicationContextAware實(shí)現(xiàn)類
@Component
public class MyApplicationContext implements ApplicationContextAware {
? ? @Override
? ? public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
? ? ? ? RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
? ? ? ? ?rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
? ? ? ? ? ? ? ??
? ? ? ? ? ? }
? ? ? ? });
? ? }
}
4.邏輯問題:交換機(jī)路由消息到隊(duì)列失敗了,如何重新發(fā)送
直接調(diào)用rabbitTemplate.convertAndSet發(fā)送
5.測(cè)試
將路由key故意改錯(cuò)
- message:交換機(jī)路由隊(duì)列失敗的那個(gè)消息
- replyCode:錯(cuò)誤碼
- replyText:錯(cuò)誤信息
- exchange:交換機(jī)
- routingKey:路由key
五.消息持久化
交換機(jī)、隊(duì)列、消息持久化(都默認(rèn)持久化)
交換機(jī)
ExchangeBuilder.directExchange(ITCAST_DIRECT ).durable(true).build();
new DirectExchange(ITCAST_DIRECT,true,false);?
隊(duì)列
QueueBuilder.durable(DIRECT_QUEUE1).build()
new Queue(DIRECT_QUEUE2,true);
消息持久化
1.如果發(fā)送普通字符串,默認(rèn)持久化
2.如果期望消息不持久化。
Message msg = MessageBuilder.withBody(message.getBytes("utf-8")) ? ? ? ? ? ? ? ? ? ? ? ? ? ? .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) ? ? ? ? ? ? ? ? ? ? ? ? ? ? .build();
六.消費(fèi)者消息確認(rèn)機(jī)制
解決問題:
- 消費(fèi)者處理消息丟失
- 消費(fèi)者獲取到消息后,未來(lái)得及處理,宕機(jī)
- 消費(fèi)者獲取到消息后,處理消息拋異常。
1.開啟消費(fèi)者消息確認(rèn)機(jī)制
spring:
? rabbitmq:
? ? listener:
? ? ? simple:
? ? ? ? acknowledge-mode: auto
- manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
- auto:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
- none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除
作用:消費(fèi)者獲取到消息后,如果處理消息出現(xiàn)異常,會(huì)給MQ返回nack. MQ將消息放入隊(duì)列頭部再給消費(fèi)者。
消費(fèi)者獲取到消息后,如果處理消息正常,會(huì)給MQ返回ack. MQ將消息從消息隊(duì)列中刪除。
2.消費(fèi)者消息確認(rèn)機(jī)制-問題
如果消費(fèi)者代碼寫的有問題
無(wú)限重試,導(dǎo)致MQ壓力過大
3.開啟消費(fèi)者消息重試機(jī)制
優(yōu)勢(shì)
1.重試在消費(fèi)者本地重試。
2.重試可以有延遲時(shí)間。
3.重試有次數(shù)限制
如何使用
rabbitmq:
? ? listener:
? ? ? simple:
? ? ? ? retry:
? ? ? ? ? enabled: true ?#開啟失敗重試
? ? ? ? ? initial-interval: 100 # 初次失敗,間隔時(shí)間
? ? ? ? ? multiplier: 2 # 間隔時(shí)間倍數(shù)
? ? ? ? ? max-attempts: 3 #最大重試次數(shù)
? ? ? ? ? stateless: true #是否是無(wú)狀態(tài),true無(wú)狀態(tài),和事務(wù)相關(guān),有事務(wù)寫false
重試耗盡
觸發(fā)重試耗盡策略
MessageRecover
RejectAndDonotMessageRecover(默認(rèn))重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
ImmediaRequeueMessageRecover重試耗盡后,返回nack,消息重新入隊(duì)
RepublishMessageRecover
1.創(chuàng)建錯(cuò)誤交換機(jī)
@Bean public DirectExchange errorMessageExchange(){ ? ? return new DirectExchange("error.direct");}
2.錯(cuò)誤隊(duì)列
@Bean public Queue errorQueue(){ ? ? return new Queue("error.queue", true); }
3.綁定
@Bean public Binding errorBinding(){ ? ? return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}
4.RepublishMessageRecover交由spring管理,進(jìn)行重發(fā)。
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ ? ? return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
5.重試耗盡后,將失敗消息投遞到指定的交換機(jī)
七.如何確保RabbitMQ消息的可靠性?
- 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
- 開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失
- 開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
- 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
八.死信交換機(jī)
死信?
死信滿足之一
- 消息被消費(fèi)者拒絕,不讓重新入隊(duì)
- 消息隊(duì)列滿了,溢出的消息。
- 消息在消息隊(duì)列中超時(shí)過期
去哪里?
- 被丟棄
- 如果隊(duì)列指定了死信交換機(jī)。
死信交換機(jī)
普通交換機(jī)
怎么給隊(duì)列指定死信交換機(jī)
- 給隊(duì)列設(shè)置dead-letter-exchange屬性,指定一個(gè)交換機(jī)
- 給隊(duì)列設(shè)置dead-letter-routing-key屬性,設(shè)置死信交換機(jī)與死信隊(duì)列的RoutingKey
死信交換機(jī) + 消息ttl實(shí)現(xiàn)延遲消息隊(duì)列
延遲消息隊(duì)列
生產(chǎn)者----->消息,消費(fèi)者不能立即消費(fèi),需要等待一定時(shí)間才能消費(fèi)。
如何實(shí)現(xiàn)
給消息設(shè)置ttl有2種方式
1.創(chuàng)建隊(duì)列設(shè)置消息過期時(shí)間? ? ? ? ?ttl()? ? ? ? ? ?x-message-ttl
2.創(chuàng)建消息的時(shí)候可以指定過期時(shí)間
Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)) ? ? ? ? ? ? ? ? .setContentType("text/plain") ? ? ? ? ? ? ? ? .setExpiration("5000").build();
實(shí)現(xiàn)方式
我們聲明消費(fèi)者一組死信交換機(jī)和隊(duì)列,基于注解方式:
@RabbitListener(bindings = @QueueBinding( ? ? ?? ? value = @Queue(name = "dl.queue", durable = "true"), ? ? ? ? exchange = @Exchange(name = "dl.direct"), ? ? ? ? key = "dl")) public void listenDlQueue(String msg){ log.info("接收到 dl.queue的延遲消息:{}", msg);}
消費(fèi)者config中要給隊(duì)列設(shè)置超時(shí)時(shí)間,需要在聲明隊(duì)列時(shí)配置x-message-ttl屬性:
@Bean public DirectExchange ttlExchange(){? ? return new DirectExchange("ttl.direct"); } @Bean public Queue ttlQueue(){? ? return QueueBuilder.durable("ttl.queue") // 指定隊(duì)列名稱,并持久化 ? ? ? ? ? .ttl(10000) // 設(shè)置隊(duì)列的超時(shí)時(shí)間,10秒? ? ? ? ? .deadLetterExchange("dl.direct") // 指定死信交換機(jī) .deadLetterRoutingKey("dl") // 指定死信RoutingKey? .build();} @Bean? public Binding simpleBinding(){ return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}
發(fā)送消息時(shí),給消息本身設(shè)置超時(shí)時(shí)間
@Test? public void testTTLMsg() { ? // 創(chuàng)建消息 Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)) .setExpiration("5000") .build();? // 消息ID,需要封裝到CorrelationData中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 發(fā)送消息 rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);}
如何實(shí)現(xiàn)發(fā)送一個(gè)消息20秒后消費(fèi)者才收到消息?
- 給消息的目標(biāo)隊(duì)列指定死信交換機(jī)
- 消費(fèi)者監(jiān)聽與死信交換機(jī)綁定的隊(duì)列
- 發(fā)送消息時(shí)給消息設(shè)置ttl為20秒
九.延遲隊(duì)列
使用場(chǎng)景包括:
- 延遲發(fā)送短信
- 用戶下單,如果用戶在15 分鐘內(nèi)未支付,則自動(dòng)取消
- 預(yù)約工作會(huì)議,20分鐘后自動(dòng)通知所有參會(huì)人員
延遲隊(duì)列插件的使用步驟包括哪些?
- 聲明一個(gè)交換機(jī),添加delayed屬性為true
- 發(fā)送消息時(shí),添加x-delay頭,值為超時(shí)時(shí)間
安裝DelayExchange插件
DelayExchange的本質(zhì)還是官方的三種交換機(jī),只是添加了延遲功能。因此使用時(shí)只需要聲明一個(gè)交換機(jī),交換機(jī)的類型可以是任意類型,然后設(shè)定delayed屬性為true即可。
1.基于注解方式:
2.基于java代碼的方式:
然后我們向這個(gè)delay為true的交換機(jī)中發(fā)送消息,一定要給消息添加一個(gè)header:x-delay,值為延遲的時(shí)間,單位為毫秒:
?? ??? ??? ??? ?
十.惰性隊(duì)列
消息堆積問題
- 生產(chǎn)者>消費(fèi)者消費(fèi)速度
- 如果消息堆積超過隊(duì)列容量上限,溢出的消息就會(huì)稱為死信。死信會(huì)被丟棄。
怎么解決
- 增加更多消費(fèi)者,提高消費(fèi)速度
- 在消費(fèi)者內(nèi)開啟線程池加快消息處理速度
- 擴(kuò)大隊(duì)列容積(使用惰性隊(duì)列),提高堆積上限
惰性隊(duì)列特點(diǎn)
- 將消息直接存入磁盤,不存儲(chǔ)內(nèi)存
- 支持海量消息存儲(chǔ)
- 消費(fèi)者要獲取消息,MQ將消息加載到內(nèi)容。
創(chuàng)建
- lazy()
- 注解
- 管理控制臺(tái)
優(yōu)點(diǎn)
- 基于磁盤存儲(chǔ),消息上限高
- 沒有間歇性的page-out,性能比較穩(wěn)定
缺點(diǎn)
- 基于磁盤存儲(chǔ),消息時(shí)效性會(huì)降低
- 性能受限于磁盤的IO
十一.MQ集群
普通集群(分布式)
- 會(huì)在集群的各個(gè)節(jié)點(diǎn)間共享部分?jǐn)?shù)據(jù),包括:交換機(jī)、隊(duì)列元信息。不包含隊(duì)列中的消息。
- 當(dāng)訪問集群某節(jié)點(diǎn)時(shí),如果隊(duì)列不在該節(jié)點(diǎn),會(huì)從數(shù)據(jù)所在節(jié)點(diǎn)傳遞到當(dāng)前節(jié)點(diǎn)并返回
- 隊(duì)列所在節(jié)點(diǎn)宕機(jī),隊(duì)列中的消息就會(huì)丟失
鏡像集群(主從)數(shù)據(jù)存在延遲
1.主從架構(gòu)集群,隊(duì)列可以在多個(gè)節(jié)點(diǎn)上有
2.主節(jié)點(diǎn):在那個(gè)節(jié)點(diǎn)上創(chuàng)建隊(duì)列,那個(gè)節(jié)點(diǎn)就是主節(jié)點(diǎn)
3.鏡像節(jié)點(diǎn):備份主節(jié)點(diǎn)上隊(duì)列的節(jié)點(diǎn)
4.創(chuàng)建備份策略:
?? ??? ??? ??? ?exactly
?? ??? ??? ??? ?all
?? ??? ??? ??? ?nodes5.創(chuàng)建隊(duì)列,根據(jù)隊(duì)列名稱,指定那些節(jié)點(diǎn)作為鏡像節(jié)點(diǎn)。
仲裁隊(duì)列代替鏡像集群
1.生產(chǎn)者----->主節(jié)點(diǎn)隊(duì)列------->鏡像節(jié)點(diǎn)隊(duì)列
2.與鏡像隊(duì)列一樣,都是主從模式,支持主從數(shù)據(jù)同步
3.使用非常簡(jiǎn)單,沒有復(fù)雜的配置
4.主從同步基于Raft協(xié)議,強(qiáng)一致
5.創(chuàng)建隊(duì)列
????????指定類型quorum
????????java代碼 quorum();
????????默認(rèn)5個(gè)鏡像節(jié)點(diǎn)
java代碼怎么操作集群
和單機(jī)區(qū)別;文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-828470.html
spring:
? rabbitmq:
? ? addresses: 192.168.200.128:8071,192.168.200.128:8072,192.168.200.128:8073
? ? username: itcast
? ? password: 123
? ? virtual-host: /
?? ?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-828470.html
到了這里,關(guān)于微服務(wù)RabbitMQ高級(jí)篇的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!