RabbitMQ隊列類型
Classic經(jīng)典隊列
-
這是RabbitMQ最為經(jīng)典的隊列類型。在單機環(huán)境中,擁有比較高的消息可靠性。
-
經(jīng)典隊列可以選擇是否持久化(Durability)以及是否自動刪除(Auto delete)兩個屬性。
-
Durability有兩個選項,Durable和Transient。 Durable表示隊列會將消息保存到硬盤,這樣消息的安全性更高。但是同時,由于需要有更多的IO操作,所以生產(chǎn)和消費消息的性能,相比Transient會比較低。
-
Auto delete屬性如果選擇為是,那隊列將在至少一個消費者已經(jīng)連接,然后所有的消費者都斷開連接后刪除自己。
-
經(jīng)典隊列不適合積累太多的消息。如果隊列中積累的消息太多了,會嚴(yán)重影響客戶端生產(chǎn)消息以及消費消息的性能。因此,經(jīng)典隊列主要用在數(shù)據(jù)量比較小,并且生產(chǎn)消息和消費消息的速度比較穩(wěn)定的業(yè)務(wù)場景。比如內(nèi)部系統(tǒng)之間的服務(wù)調(diào)用。
Quorum仲裁隊列
-
仲裁隊列,是3.8引入的一個新隊列類型;仲裁隊列相比Classic經(jīng)典隊列,在分布式環(huán)境下對消息的可靠性保障更高。
-
Quorum是基于Raft一致性協(xié)議實現(xiàn)的一種新型的分布式消息隊列,他實現(xiàn)了持久化,多備份的FIFO隊列,主要就是針對RabbitMQ的鏡像模式設(shè)計的。簡單理解就是quorum隊列中的消息需要有集群中多半節(jié)點同意確認(rèn)后,才會寫入到隊列中。
-
Classic與Quorum對比、少了一些高級特性:
-
Quorum隊列更適合于 隊列長期存在,并且對容錯、數(shù)據(jù)安全方面的要求比低延遲、不持久等高級隊列更能要求更嚴(yán)格的場景。例如 電商系統(tǒng)的訂單,引入MQ后,處理速度可以慢一點,但是訂單不能丟失。
-
Quorum不適合的場景如下:
- 隊列的臨時性:暫時性或獨占隊列、高隊列變動率(聲明和刪除率)
- 盡可能低的延遲:由于其數(shù)據(jù)安全功能,底層共識算法固有的延遲更高
- 當(dāng)數(shù)據(jù)安全不是優(yōu)先事項時(例如,應(yīng)用程序不使用手動確認(rèn),不使用發(fā)布者確認(rèn))
- 很長的隊列積壓(流可能更適合)
創(chuàng)建Quorum隊列
Spring創(chuàng)建仲裁隊列需要設(shè)置參數(shù)“-x-queue-type”為“quorum”
@Configuration
public class QuorumConfig {
public final static String QUEUE_TYPE = "x-queue-type";
public final static String QUEUE_TYPE_VAL = "quorum";
public final static String QUEUE_NAME = "quorumQueue";
@Bean
public Queue quorumQueue() {
HashMap<String, Object> params = new HashMap<>();
params.put(QUEUE_TYPE,QUEUE_TYPE_VAL);
Queue queue = new Queue(QUEUE_NAME, true, false, false, params);
return queue;
}
}
Rabbit Client創(chuàng)建Quorum隊列:
Map<String,Object> params = new HashMap<>();
params.put("x-queue-type","quorum");
//聲明Quorum隊列的方式就是添加一個x-queue-type參數(shù),指定為quorum。默認(rèn)是classic
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
? Quorum隊列的消息是必須持久化的,所以durable參數(shù)必須設(shè)定為true,如果聲明為false,就會報錯。同樣,exclusive參數(shù)必須設(shè)置為false。這些聲明,在Producer和Consumer中是要保持一致的。
Stream隊列
- Stream隊列是3.9.0版本引入新隊列類型。
- 持久化到磁盤并且具備分布式備份的,更適合于消費者多,讀消息非常頻繁的場景。
- Stream隊列的核心是以append-only只添加的日志來記錄消息,整體來說,就是消息將以append-only的方式持久化到日志文件中,然后通過調(diào)整每個消費者的消費進(jìn)度offset,來實現(xiàn)消息的多次分發(fā)。類似kafka;
創(chuàng)建Stream隊列
Spring AMQP目前還不支持創(chuàng)建Stream隊列;只能使用原生API創(chuàng)建
Map<String,Object> params = new HashMap<>();
params.put("x-queue-type","stream");
params.put("x-max-length-bytes", 20_000_000_000L); // maximum stream size: 20 GB
params.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
Stream隊列的durable參數(shù)必須聲明為true,exclusive參數(shù)必須聲明為false。
x-max-length-bytes 表示日志文件的最大字節(jié)數(shù), x-stream-max-segment-size-bytes 每一個日志文件的最大大小。這兩個是可選參數(shù),通常為了防止stream日志無限制累計,都會配合stream隊列一起聲明。
消費者:
Map<String,Object> consumeParam = new HashMap<>();
consumeParam.put("x-stream-offset","last");
channel.basicConsume(QUEUE_NAME, false,consumeParam, myconsumer);
x-stream-offset的類型:
- first: 從日志隊列中第一個可消費的消息開始消費
- last: 消費消息日志中最后一個消息
- next: 相當(dāng)于不指定offset,消費不到消息。
- Offset: 一個數(shù)字型的偏移量
- Timestamp:一個代表時間的Data類型變量,表示從這個時間點開始消費。例如 一個小時前Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
Stream隊列產(chǎn)品目前不夠成熟,目前用的最多的還是Classic經(jīng)典隊列。RabbitMQ目前主推的是Quorum隊列;
死信消息
有以下三種情況,RabbitMQ會將一個正常消息轉(zhuǎn)成死信
-
消息被消費者確認(rèn)拒絕。消費者把requeue參數(shù)設(shè)置為true(false),并且在消費后,向 RabbitMQ返回拒絕。channel.basicReject或者channel.basicNack。
-
消息達(dá)到預(yù)設(shè)的TTL時限還一直沒有被消費。
-
消息由于隊列已經(jīng)達(dá)到最長長度限制而被丟掉
-
TTL即最長存活時間 Time-To-Live 。消息在隊列中保存時間超過這個TTL,即會被認(rèn)為死亡。死亡的消息會被丟入死信隊列,如果沒有配置死信隊列的話,RabbitMQ會保證死了的消息不會再次被投遞,并且在未來版本中,會主動刪除掉這些死掉的消息。
-
聲明隊列時、設(shè)置"x-message-ttl"值;
-
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);
如何判斷消息是否為死信
消息被作為死信轉(zhuǎn)移到死信隊列后,header中還會加上第一次成為死信的三個屬性,并且這三個屬性在以后的傳遞過程中都不會更改。具體可以調(diào)試去看看;
- x-first-death-reason :原因
- x-first-death-queue : 隊列
- x-first-death-exchange : 交換機
死信隊列
- 存在死信消息的隊列;
- RabbitMQ中有兩種方式可以聲明死信隊列,一種是針對某個單獨隊列指定對應(yīng)的死信隊列。另一種就是以策略的方式進(jìn)行批量死信隊列的配置。
流程圖如下:
代碼:
死信交換機、隊列:
@Configuration
public class DeadConfig {
public final static String DEAD_EXCHANGE = "deadExchange";
public final static String DEAD_QUEUE_NAME = "deadQueue";
@Bean
public FanoutExchange deadExchange() {
FanoutExchange directExchange = new FanoutExchange(DEAD_EXCHANGE);
return directExchange;
}
@Bean
public Queue deadQueue() {
Queue queue = new Queue(DEAD_QUEUE_NAME);
return queue;
}
@Bean
public Binding deadBinding(FanoutExchange deadExchange, Queue deadQueue) {
return BindingBuilder.bind(deadQueue).to(deadExchange);
}
}
發(fā)送者:
@Controller
public class MessageTx {
@Autowired
private MessageService messageService;
@GetMapping("/sendDeadMsg")
@ResponseBody
public String sendMoreMsgTx(){
//發(fā)送10條消息
for (int i = 0; i < 10; i++) {
String msg = "msg"+i;
System.out.println("發(fā)送消息 msg:"+msg);
// xiangjiao.exchange 交換機
// xiangjiao.routingKey 隊列
messageService.sendMessage(MessageConfig.EXCHANGE_NAME, "", msg);
//每兩秒發(fā)送一次
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "send ok";
}
}
@Slf4j
@Component
public class MessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange,String routingKey,Object msg) {
// 暫時關(guān)閉 return 配置
//rabbitTemplate.setReturnCallback(this);
//發(fā)送消息
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
}
}
消費者:
public class MessageConsumer {
// @RabbitHandler : 標(biāo)記的方法只能有一個參數(shù),類型為String ,若是傳Map參數(shù)、則需要傳入map參數(shù)
// @RabbitListener:標(biāo)記的方法可以傳入Channel, Message參數(shù)
@RabbitListener(queues = MessageConfig.MESSAGE_QUEUE_NAME)
public void listenObjectQueue(Channel channel, Message message, String msg) throws IOException {
System.out.println("接收到object.queue的消息" + msg);
System.out.println("消息ID : " + message.getMessageProperties().getDeliveryTag());
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("拒絕消息 , tag = " + message.getMessageProperties().getDeliveryTag());
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (IOException exception) {
//拒絕確認(rèn)消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒絕消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
注入容器:
@Configuration
public class MessageConfig {
public final static String EXCHANGE_NAME = "deadMessageTestExchange";
public final static String MESSAGE_QUEUE_NAME = "deadMessageTestQueue";
public final static String MESSAGE_ROUTE_KEY = "deadMessageTestRoutingKey";
public final static String DEAD_EXCHANGE_KEY = "x-dead-letter-exchange";
@Bean
public FanoutExchange deadMessageTestExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
@Bean
public Queue deadMessageTestQueue() {
HashMap<String, Object> params = new HashMap<>();
params.put(DEAD_EXCHANGE_KEY, DeadConfig.DEAD_EXCHANGE);
return new Queue(MESSAGE_QUEUE_NAME, true, false, false, params);
}
@Bean
public MessageConsumer deadMessageTestConsumer() {
return new MessageConsumer();
}
@Bean
public Binding messageBinding(Queue deadMessageTestQueue, FanoutExchange deadMessageTestExchange) {
return BindingBuilder.bind(deadMessageTestQueue).to(deadMessageTestExchange);
}
}
延遲隊列
RabbitMQ有提供插件使用延遲隊列, 另外可借助 死信隊列 實現(xiàn)延遲隊列;
實現(xiàn)思路:
- 給普通隊列設(shè)置消息過期時間(延遲時間), 不設(shè)置消費者;
- 當(dāng)消息過期后,將消息放入死信隊列, 給死信隊列設(shè)置消費者;
懶隊列
懶隊列會盡可能早的將消息內(nèi)容保存到硬盤當(dāng)中,并且只有在用戶請求到時,才臨時從硬盤加載到RAM內(nèi)存當(dāng)中。 可解決部分消息積壓問題、(海量消息積壓,RabbitMQ存不下就得使用分布式存儲消息)
適用的一些場景:
- 消費者服務(wù)宕機了
- 有一個突然的消息高峰,生產(chǎn)者生產(chǎn)消息超過消費者
- 消費者消費太慢了
? 默認(rèn)情況下,RabbitMQ接收到消息時,會保存到內(nèi)存以便使用,同時把消息寫到硬盤。但是,
消息寫入硬盤的過程是會阻塞隊列的。RabbitMQ雖然做了優(yōu)化,但是在長隊列中表現(xiàn)不是很理想,所以有了懶隊列、 以磁盤IO為代價解決消息積壓問題;
SpringBoot懶隊列聲明方式:
@Configuration
public class LazyQueueConfig {
@Bean
public Queue lazyQueue() {
HashMap<String, Object> params = new HashMap<>();
params.put("x-queue-mode", "lazy");
return new Queue("lazyQueue", true, false, false, params);
}
}
原生API方式:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
懶隊列適合消息量大且長期有堆積的隊列,可以減少內(nèi)存使用,加快消費速度。但是這是以大量消耗集群的網(wǎng)絡(luò)及磁盤IO為代價的。
集群模式
分布式環(huán)境下,是不允許單點故障存在,需要保證高可用, 因此需要集群環(huán)境保證高可用,另外若存在海量消息,還需要保證存放得下、即分布式存儲;
普通集群模式
- 集群的各個節(jié)點之間只會有相同的元數(shù)據(jù),即隊列結(jié)構(gòu),而消息不會進(jìn)行冗余,只存在一個節(jié)點中。
- 消費時,如果消費的不是存有數(shù)據(jù)的節(jié)點, RabbitMQ會臨時在節(jié)點之間進(jìn)行數(shù)據(jù)傳輸,將消息從存有數(shù)據(jù)的節(jié)點傳輸?shù)较M的節(jié)點。
- 此模式解決分布式存儲問題、但可靠性不高,相當(dāng)于多個單機服務(wù),每個都是獨立的,一個都不可以宕機。某臺機器宕機、則存儲的消息無法消費、若未開啟持久化、則丟失消息, 若消費者正在處理消息,則機器無法收到確認(rèn)信息,該消息重新入隊,則重復(fù)消費;
- 普通集群模式不支持高可用,即當(dāng)某一個節(jié)點服務(wù)掛了后,需要手動重啟服務(wù),才能保證這一部分消息能正常消費。
鏡像集群模式
- 在普通集群的基礎(chǔ)上,每次保存消息后,機器主動同步到多臺機器上, 而不是消費者獲取消息時,再去其他節(jié)點上獲??;
- 集群會選舉主節(jié)點master, 當(dāng)主節(jié)點掛了,則會重新選舉;
- 此方式實現(xiàn)了集群高可用,但是集群之間同步消息頻繁,海量數(shù)據(jù)時、同步頻率更大,導(dǎo)致占滿帶寬;
消息常見問題
RabbitMQ如何保證消息不丟失
先看看哪些情況下,會存在丟失消息?
1,2,4步驟是可能丟消息的,因為三個步驟都是跨網(wǎng)絡(luò)的;
生產(chǎn)者保證消息正確發(fā)送到RibbitMQ
- 對于單個數(shù)據(jù),可以使用生產(chǎn)者確認(rèn)機制。通過多次確認(rèn)的方式,保證生產(chǎn)者的消息能夠正確的發(fā)送到RabbitMQ中。
- ?RabbitMQ的生產(chǎn)者確認(rèn)機制分為同步確認(rèn)和異步確認(rèn)。同步確認(rèn)主要是通過在生產(chǎn)者端使用Channel.waitForConfirmsOrDie()指定一個等待確認(rèn)的完成時間。異步確認(rèn)機制則是通過channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)在生產(chǎn)者端注入兩個回調(diào)確認(rèn)函數(shù)。第一個函數(shù)是在生產(chǎn)者消息發(fā)送成功時調(diào)用,第二個函數(shù)則是生產(chǎn)者消息發(fā)送失敗時調(diào)用。兩個函數(shù)需要通過sequenceNumber自行完成消息的前后對應(yīng)。sequenceNumber的生成方式需要通過channel的序列獲取。int sequenceNumber = channel.getNextPublishSeqNo();
- ? 如果發(fā)送批量消息,在RabbitMQ中,另外還有一種手動事務(wù)的方式,可以保證消息正確發(fā)送
- 手動事務(wù)機制主要有幾個關(guān)鍵的方法: channel.txSelect() 開啟事務(wù); channel.txCommit() 提交事務(wù); channel.txRollback() 回滾事務(wù); 用這幾個方法來進(jìn)行事務(wù)管理。但是這種方式需要手動控制事務(wù)邏輯,并且手動事務(wù)會對channel產(chǎn)生阻塞,造成吞吐量下降
RabbitMQ消息存盤不丟消息
消息若是只存內(nèi)存中,則宕機會丟失消息, 因此隊列需要開啟持久化,durable參數(shù)、默認(rèn)創(chuàng)建隊列,durable都會為true; 而Quorum和Stream隊列默認(rèn)都是開啟持久化;
RabbitMQ 主從消息同步時不丟消息
普通集群模式,消息是分散存儲的,不會主動進(jìn)行消息同步了,是有可能丟失消息的。而鏡像模式集群,數(shù)據(jù)會主動在集群各個節(jié)點當(dāng)中同步,這時丟失消息的概率不會太高。
RabbitMQ消費者不丟失消息
消費者確認(rèn),分為自動確認(rèn),手動確認(rèn);若是自動確認(rèn),則消息處理完,會返回確認(rèn)ack;若是處理出現(xiàn)異常, 則會重新入隊,再次處理, 因此存在重復(fù)消費問題;
若是手動確認(rèn),消息處理過程中使用channel#basicAck, basicNack, basicReject返回確認(rèn)或拒絕;SpringBoot配置文件中通過屬性spring.rabbitmq.listener.simple.acknowledge-mode需要設(shè)置mutual手動確認(rèn);
SpringBoot配置文件中通過屬性spring.rabbitmq.listener.simple.acknowledge-mode 進(jìn)行指定??梢栽O(shè)定為 AUTO 自動應(yīng)答; MANUAL 手動應(yīng)答;NONE 不應(yīng)答;
如何保證消息冪等?
當(dāng)消費者消費消息處理業(yè)務(wù)邏輯時,如果拋出異常,或者不向RabbitMQ返回響應(yīng),默認(rèn)情況下,RabbitMQ會無限次數(shù)的重復(fù)進(jìn)行消息消費。
處理冪等問題,要設(shè)定RabbitMQ的重試次數(shù)。在SpringBoot集成RabbitMQ時,可以在配置文件
中指定spring.rabbitmq.listener.simple.retry開頭的一系列屬性,來制定重試策略。
需要在業(yè)務(wù)上處理冪等問題, 處理冪等問題的關(guān)鍵是要給每個消息一個唯一的標(biāo)識;雖然RabbitMQ會給每條消息帶上MessageId (處理冪等問題的關(guān)鍵是要給每個消息一個唯一的標(biāo)識);
SpringBoot框架集成RabbitMQ后,可以給每個消息指定一個全局唯一的MessageID,在消費者端針對MessageID做冪等性判斷。
//發(fā)送者
Message message2 = MessageBuilder.withBody(message.getBytes()).setMessageId(UUID.randomUUID().toString()).build();
rabbitTemplate.send(message2);
//消費者獲取MessageID,自己做冪等性判斷
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message) throws Exception {
// 獲取消息Id
String messageId = message.getMessageProperties().getMessageId();
...
}
可為了業(yè)務(wù)上的方便,再封裝一層, 專門用來放入消息ID, 否則設(shè)置ID的代碼隨處可見;
如何保證消息的順序?
RabbitMQ中保證順序的方法是 單隊列+單消息推送; 若是多隊列的情況下,RabbitMQ沒有很好的解決方案;
個人思考:如果RabbitMQ架構(gòu)上很難處理,可以通過業(yè)務(wù)設(shè)置保證順序, 即給每條消息設(shè)置序號, 消費時、查詢數(shù)據(jù)庫之前的消息是否處理完,若沒有查到,則等待一會, 若查得到,則處理消息,處理完后,把消息id + 序號 放入數(shù)據(jù)庫代表已經(jīng)處理完;文章來源:http://www.zghlxwxcb.cn/news/detail-631204.html
RabbitMQ的數(shù)據(jù)堆積問題
bbitMQ一直以來都有一個缺點,就是對于消息堆積問題的處理不好。當(dāng)RabbitMQ中有大量消息堆積時,整體性能會嚴(yán)重下降。而目前新推出的Quorum隊列以及Stream隊列,目的就在于解決這個核心問題。目前大部分企業(yè)還是圍繞Classic經(jīng)典隊列構(gòu)建應(yīng)用。因此,在使用RabbitMQ時,還是要非常注意消息堆積的問題。盡量讓消息的消費速度和生產(chǎn)速度保持一致。文章來源地址http://www.zghlxwxcb.cn/news/detail-631204.html
- 對于生產(chǎn)者:
最明顯的方式自然是降低消息生產(chǎn)的速度。但是,生產(chǎn)者端產(chǎn)生消息的速度通常是跟業(yè)務(wù)息息相關(guān)的,一般情況下不太好直接優(yōu)化。但是可以選擇盡量多采用批量消息的方式,降低IO頻率。 - 對于服務(wù)器端
- 可使用懶隊列方式存儲 部分消息積壓(單機的磁盤容量還是有限)
- 可使用Sharding分片隊列(分布式存儲)
- 對于消費者
- 檢查業(yè)務(wù)代碼是不是太挫了, 優(yōu)化代碼
- 代碼性能沒問題、則要增加消費者數(shù)量,提升消費速度;
- 若是經(jīng)常存在海量消息,則可以放入數(shù)據(jù)庫、慢慢消費;
到了這里,關(guān)于RaabitMQ(三) - RabbitMQ隊列類型、死信消息與死信隊列、懶隊列、集群模式、MQ常見消息問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!