一. 消息隊(duì)列
1. 定義
2. 作用
2.1 流量消峰
2.2 應(yīng)用解耦
2.3 異步處理
3. 分類
4. MQ的選擇
大量數(shù)據(jù):Kafaka;高并發(fā):RocketMQ; 中小型數(shù)據(jù)量少:RabbitMQ
5. RabbitMQ
5.1 概念
快遞站
5.2 四大概念
生產(chǎn)者,消費(fèi)者,交換機(jī),隊(duì)列
交換機(jī)可以對(duì)應(yīng)多個(gè)隊(duì)列
5.3 六大模式
簡(jiǎn)單模式
工作模式
發(fā)布/訂閱模式
路由模式
主題模式
發(fā)布確定模式
5.4 RabbitMQ 工作原理
5.5 安裝
密碼:123456
用戶名:admin
6. 代碼實(shí)現(xiàn)
有基于SpringBoot的代碼 實(shí)現(xiàn)起來比較簡(jiǎn)單,參考資源下載。
二. Hello World (簡(jiǎn)單模式)
1. 生產(chǎn)者代碼
-
導(dǎo)入依賴
-
代碼
密碼改成自己的
2. 消費(fèi)者代碼
package com.chent;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static final String QUEUE_NAME = "HELLO";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.86.130");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待消息被消費(fèi)..");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
System.out.println(message);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println("消費(fèi)被中斷");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
三. Work Queues (工作隊(duì)列模式)
1. 輪詢分發(fā)消息
-
直接用idea啟動(dòng)兩個(gè)線程的技巧
右上角run旁邊edict configuration 然后Build and Run最后 modify option選項(xiàng) 選擇 allow mutiple instances
2. 消息應(yīng)答
2.1 概念
2.2 自動(dòng)應(yīng)答
不靠譜
2.3 手動(dòng)應(yīng)答
-
參數(shù)Multiple的解釋
-
代碼實(shí)現(xiàn)
2.3 消息應(yīng)答重新入隊(duì)
如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn), RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確保不會(huì)丟失任何消息。
默認(rèn)的自動(dòng)應(yīng)答會(huì)使消息丟失(自動(dòng)應(yīng)答就是發(fā)送消息后即認(rèn)為成功發(fā)射),要想實(shí)現(xiàn)消息不丟失,必須采用手動(dòng)應(yīng)答
3. RabbitMQ持久化
3.1 隊(duì)列持久化
-
注意事項(xiàng)
已有消息隊(duì)列改成持久化,必須先刪除
3.2 消息持久化
3.3 不公平分發(fā)
輪訓(xùn)分發(fā)-公平分發(fā),但是不合理
3.4 預(yù)取值
限制緩沖區(qū)的大小,以避免緩沖區(qū)里面無限制的未確認(rèn)消息問題。
通過使用 basic.qos 方法設(shè)置“預(yù)取計(jì)數(shù)” 值來完成的。 該值定義通道上允許的未確認(rèn)消息的最大數(shù)量
四. 發(fā)布確認(rèn)
1. 原理
生產(chǎn)者將信道設(shè)置成 confirm 模式,一旦信道進(jìn)入 confirm 模式,所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的 ID( 從 1 開始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker 就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了**,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫入磁盤之后發(fā)出,broker 回傳給生產(chǎn)者的確認(rèn)消息中 delivery-tag 域包含了確認(rèn)消息的序列號(hào),此外 broker 也可以設(shè)置basic.ack 的multiple 域,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理。**
confirm 模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過回調(diào)方法來處理該確認(rèn)消息,如果 RabbitMQ 因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條 nack 消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該 nack 消息。
2. 發(fā)布確認(rèn)的策略
2.1 開啟發(fā)布確認(rèn)的方法
**confirmSelect()**開啟發(fā)布
2.2 單個(gè)確認(rèn)發(fā)布
發(fā)布速度特別的慢
channel.waitForConfirms(): 確認(rèn)訂閱成功
2.3 批量確認(rèn)發(fā)布
先發(fā)布一批消息然后一起確認(rèn)可以極大地提高吞吐量,當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問題時(shí),不知道是哪個(gè)消息出現(xiàn)問題了,我們必須將整個(gè)批處理保存在內(nèi)存中,以記錄重要的信息而后重新發(fā)布消息。
2.4 異步確認(rèn)發(fā)布 (內(nèi)置了一個(gè)監(jiān)聽器)
一個(gè)監(jiān)聽器+兩個(gè)回調(diào)函數(shù)(一個(gè)成功,一個(gè)失敗返回)
完整代碼:channel.addConfirmListener(成功返回函數(shù),失敗返回函數(shù))
2.5 確認(rèn)未處理的消息
最好的解決的解決方案就是把未確認(rèn)的消息放到一個(gè)基于內(nèi)存的能被發(fā)布線程訪問的隊(duì)列,比如說用 ConcurrentLinkedQueue 這個(gè)隊(duì)列在 confirm callbacks 與發(fā)布線程之間進(jìn)行消息的傳遞。
package com.chent;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* 發(fā)布訂閱模式:異步發(fā)布確認(rèn)
*/
public class Publish {
public static void main(String[] args) throws Exception {
Publish.publishAsync();
}
public static void publishAsync() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.86.130");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "publishTest";
channel.queueDeclare(queueName,true,false,false,null);//申明隊(duì)列
channel.confirmSelect();//開啟發(fā)布確認(rèn)
//異步發(fā)布的邏輯
//線程安全有序hashmap 適用于高并發(fā)情況
ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
//消息成功確認(rèn)回調(diào)函數(shù)
ConfirmCallback ackCallback = (var1,var3)->{
//2.刪除確認(rèn)的消息
if(var3){//VAR3表示處理批量情況
ConcurrentNavigableMap<Long, String> confirmMessage = map.headMap(var1);
confirmMessage.clear();}
else{
map.remove(var1);
}
System.out.println("成功確認(rèn):"+ var1);
};
//消息失敗確認(rèn)回調(diào)函數(shù)
ConfirmCallback nackCallback = (var1,var3)->{
//3.打印未確認(rèn)的消息
String message = map.get(var1);
System.out.println("未確認(rèn)的消息是:"+message);
};
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for(int i = 0;i<1000;i++){
String message = "消息" + i;
channel.basicPublish("",queueName,null,message.getBytes());
//1.記錄所有要發(fā)送的消息
map.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("===========================================異步耗時(shí):" + (end-begin)+"============================");
}
}
2.6 對(duì)比
五. 交換機(jī)
在上一節(jié)中,我們創(chuàng)建了一個(gè)工作隊(duì)列。我們假設(shè)的是工作隊(duì)列背后,每個(gè)任務(wù)都恰好交付給一個(gè)消費(fèi)者(工作進(jìn)程)。在這一部分中,我們將做一些完全不同的事情-我們將消息傳達(dá)給多個(gè)消費(fèi)者。這種模式稱為 ”發(fā)布/訂閱
1. 概述
-
作用
RabbitMQ 消息傳遞模型的核心思想是**: 生產(chǎn)者生產(chǎn)的消息從不會(huì)直接發(fā)送到隊(duì)列**。實(shí)際上,通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊(duì)列中。
相反, 生產(chǎn)者只能將消息發(fā)送到交換機(jī)(exchange),**交換機(jī)工作的內(nèi)容非常簡(jiǎn)單,一方面它接收來自生產(chǎn)者的消息,另一方面將它們推入隊(duì)列。**交換機(jī)必須確切知道如何處理收到的消息。是應(yīng)該把這些消息放到特定隊(duì)列還是說把他們到許多隊(duì)列中還是說應(yīng)該丟棄它們。這就的由交換機(jī)的類型來決定 -
交換機(jī)類型
直接(direct), 主題(topic) ,標(biāo)題(headers) , 扇出(fanout) -
無名Exchange
-
臨時(shí)隊(duì)列
-
綁定Bind
2. fanout扇形交換機(jī)
將接收到的所有消息廣播到它知道的所有隊(duì)列中
- 消費(fèi)者代碼
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
// 1. 聲明fanout交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一個(gè)臨時(shí)的隊(duì)列 隊(duì)列的名稱是隨機(jī)的
* 當(dāng)消費(fèi)者斷開和該隊(duì)列的連接時(shí) 隊(duì)列自動(dòng)刪除
*/
//2. 聲明一個(gè)臨時(shí)隊(duì)列
String queueName = channel.queueDeclare().getQueue();
//把該臨時(shí)隊(duì)列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串
//3. 綁定交換機(jī)和隊(duì)列的關(guān)系
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收到的消息打印在屏幕........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{String message = new String(delivery.getBody(), "UTF-8");
System.out.println("控制臺(tái)打印接收到的消息"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
- 生產(chǎn)者代碼
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
/**
* 聲明一個(gè) exchange
* 1.exchange 的名稱
* 2.exchange 的類型
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("請(qǐng)輸入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息" + message);
}
}
}
}
3. direct直接交換機(jī)
在上一節(jié)中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志記錄系統(tǒng)。我們能夠向許多接收者廣播日志消息。在本節(jié)我們將向其中添加一些特別的功能-比方說我們只讓某個(gè)消費(fèi)者訂閱發(fā)布的部分消息。例如我們只把嚴(yán)重錯(cuò)誤消息定向存儲(chǔ)到日志文件(以節(jié)省磁盤空間),同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息。
隊(duì)列只對(duì)它綁定的交換機(jī)的消息感興趣。綁定用參數(shù): routingKey 來表示也可稱該參數(shù)為 binding key,創(chuàng)建綁定我們用代碼:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);綁定之后的意義由其交換類型決定。
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
4. Topic主題交換機(jī)
-
引入
在上一個(gè)小節(jié)中,我們改進(jìn)了日志記錄系統(tǒng)。我們沒有使用只能進(jìn)行隨意廣播的 fanout 交換機(jī),而是使用了 direct 交換機(jī),從而有能實(shí)現(xiàn)有選擇性地接收日志。
盡管使用direct 交換機(jī)改進(jìn)了我們的系統(tǒng),但是它仍然存在局限性-比方說我們想接收的日志類型有info.base 和 info.advantage,某個(gè)隊(duì)列只想 info.base 的消息,那這個(gè)時(shí)候direct 就辦不到了。這個(gè)時(shí)候就只能使用 topic 類型 -
命名規(guī)范
5. 自我總結(jié)
交換機(jī)和隊(duì)列綁定后,可以指定消息發(fā)給對(duì)應(yīng)的隊(duì)列
扇形交換機(jī)會(huì)給全部綁定的隊(duì)列發(fā)消息,routingKey為空
直接交換機(jī)會(huì)給對(duì)應(yīng)RoutingKey的隊(duì)列發(fā)消息
主題交換機(jī)會(huì)給對(duì)應(yīng)類型的routingKey(在直接交換機(jī)基礎(chǔ)上改進(jìn))的隊(duì)列發(fā)消息
說白了三種交換機(jī)其實(shí)就是在改變r(jià)outingKey參數(shù)
channel.exchangeDeclare(交換機(jī)名稱,類型);
channel.queueBind(隊(duì)列名稱,交換機(jī)名稱, routingKey);
六. 死信隊(duì)列
1. 死信的概念
-
概念
死信,顧名思義就是無法被消費(fèi)的消息,字面意思可以這樣理解,一般來說,producer 將消息投遞到 broker 或者直接到queue 里了, consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時(shí)候由于特定的原因導(dǎo)致 queue 中的某些消息無法被消費(fèi),這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列。 -
應(yīng)用場(chǎng)景:
為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時(shí),將消息投入死信隊(duì)列中.還有比如說: 用戶在商城下單成功并點(diǎn)擊去支付后在指定時(shí)間未支付時(shí)自動(dòng)失效
2. 死信的來源
- 消息 TTL 過期
- 隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了,無法再添加數(shù)據(jù)到 mq 中)
- 消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false.
3. 實(shí)戰(zhàn)
- 代碼架構(gòu)圖
- 模擬TTL過期
-
生產(chǎn)者代碼
在生產(chǎn)者中設(shè)置過期時(shí)間
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel())
{ channel.exchangeDeclare(NORMAL_EXCHANGE,
BuiltinExchangeType.DIRECT);
//設(shè)置消息的 TTL 時(shí)間
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//該信息是用作演示隊(duì)列個(gè)數(shù)限制
for (int i = 1; i <11 ; i++) {
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,
message.getBytes());
System.out.println("生產(chǎn)者發(fā)送消息:"+message);
}
}
}
}
-
消費(fèi)者C1代碼
聲明普通/死信隊(duì)列和交換機(jī),綁定routingKey關(guān)系;
核心部分:普通隊(duì)列通過聲明隊(duì)列的參數(shù) param綁定死信交換機(jī)的關(guān)系
public class Consumer01 {
//普通交換機(jī)名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機(jī)名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
//聲明死信和普通交換機(jī) 類型為 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明死信隊(duì)列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信隊(duì)列綁定死信交換機(jī)與 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常隊(duì)列綁定死信隊(duì)列信息
Map<String, Object> params = new HashMap<>();
//正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
}
}
-
消費(fèi)者C2代碼
只是簡(jiǎn)單的讓C2消費(fèi)死信
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信隊(duì)列消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信隊(duì)列的消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
-
隊(duì)列達(dá)到最長(zhǎng)長(zhǎng)度
如何設(shè)置隊(duì)列長(zhǎng)度:在隊(duì)列聲明中添加參數(shù)
七. 延遲隊(duì)列
1. 概念及應(yīng)用 ***
2. RabbitMQ中的兩種TTL
3. 延時(shí)隊(duì)列基礎(chǔ)模型(基于死信隊(duì)列)
-
定義
-
代碼見資源下載(基于SpringBoot來實(shí)現(xiàn))
-
代碼架構(gòu)
-
效果展示
-
存在的問題
如果這樣使用的話,豈不是每增加一個(gè)新的時(shí)間需求,就要新增一個(gè)隊(duì)列,這里只有 10S 和 40S兩個(gè)時(shí)間選項(xiàng),如果需要一個(gè)小時(shí)后處理,那么就需要增加TTL 為一個(gè)小時(shí)的隊(duì)列,如果是預(yù)定會(huì)議室然后提前通知這樣的場(chǎng)景,豈不是要增加無數(shù)個(gè)隊(duì)列才能滿足需求?
4. 優(yōu)化(基于死信隊(duì)列)
-
架構(gòu)
新建一個(gè)隊(duì)列,但是不設(shè)置TTL;在生產(chǎn)者端設(shè)置TTL即可 -
效果展示
-
缺點(diǎn)
看起來似乎沒什么問題,但是在最開始的時(shí)候,就介紹過如果使用在消息屬性上設(shè)置 TTL 的方式,消息可能并不會(huì)按時(shí)“死亡“,因?yàn)?RabbitMQ 只會(huì)檢查第一個(gè)消息是否過期,如果過期則丟到死信隊(duì)列,如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。
5. 插件實(shí)現(xiàn)延遲隊(duì)列
-
效果展示
-
實(shí)現(xiàn)步驟
-
安裝插件
-
架構(gòu)圖及代碼
-
延時(shí)隊(duì)列的其他選擇
當(dāng)然,延時(shí)隊(duì)列還有很多其它選擇,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的時(shí)間輪,這些方式各有特點(diǎn),看需要適用的場(chǎng)景
八. 發(fā)布確認(rèn)高級(jí)
1. 基于SpringBoot的基本代碼及存在問題
交換機(jī)發(fā)出了確認(rèn)回調(diào),但實(shí)際上隊(duì)列沒有收到消息
2. 回退消息
-
概念:
在僅開啟了生產(chǎn)者確認(rèn)機(jī)制的情況下,交換機(jī)接收到消息后,會(huì)直接給消息生產(chǎn)者發(fā)送確認(rèn)消息,如果發(fā)現(xiàn)該消息不可路由,那么消息會(huì)被直接丟棄,此時(shí)生產(chǎn)者是不知道消息被丟棄這個(gè)事件的。那么如何讓無法被路由的消息幫我想辦法處理一下?最起碼通知我一聲,我好自己處理啊。通過設(shè)置 mandatory 參數(shù)可以在當(dāng)消息傳遞過程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者。 -
核心代碼
-
效果展示
3. 備份交換機(jī)
-
概念
備份交換機(jī)可以理解為 RabbitMQ 中交換機(jī)的“備胎” ,當(dāng)我們?yōu)槟骋粋€(gè)交換機(jī)聲明一個(gè)對(duì)應(yīng)的備份交換機(jī)時(shí), 就是為它創(chuàng)建一個(gè)備胎,當(dāng)交換機(jī)接收到一條不可路由消息時(shí),將會(huì)把這條消息轉(zhuǎn)發(fā)到備份交換機(jī)中,由備份交換機(jī)來進(jìn)行轉(zhuǎn)發(fā)和處理,通常備份交換機(jī)的類型為 Fanout ,這樣就能把所有消息都投遞到與其綁定的隊(duì)列中,然后我們在備份交換機(jī)下綁定一個(gè)隊(duì)列,這樣所有那些原交換機(jī)無法被路由的消息,就會(huì)都進(jìn)入這個(gè)隊(duì)列了。當(dāng)然,我們還可以建立一個(gè)報(bào)警隊(duì)列,用獨(dú)立的消費(fèi)者來進(jìn)行監(jiān)測(cè)和報(bào)警。 -
代碼框架
- 代碼實(shí)現(xiàn)
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
// 聲明確認(rèn)隊(duì)列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//聲明確認(rèn)隊(duì)列綁定關(guān)系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
//聲明備份 Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//聲明確認(rèn) Exchange 交換機(jī)的備份交換機(jī)
@Bean("confirmExchange")
public DirectExchange
confirmExchange(){ExchangeBuilder
exchangeBuilder =
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
//設(shè)置該交換機(jī)的備份交換機(jī)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}
// 聲明警告隊(duì)列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
// 聲明報(bào)警隊(duì)列綁定關(guān)系
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange
backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}
// 聲明備份隊(duì)列
@Bean("backQueue")
public Queue backQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
// 聲明備份隊(duì)列綁定關(guān)系
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}
}
九. RabbitMQ補(bǔ)充知識(shí) *****
1. 冪等性
1.1 概念
用戶對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用。
1.2 消息重復(fù)消費(fèi)
- 問題描述
消費(fèi)者在消費(fèi) MQ 中的消息時(shí), MQ 已把消息發(fā)送給消費(fèi)者,消費(fèi)者在給MQ 返回 ack 時(shí)網(wǎng)絡(luò)中斷,故 MQ 未收到確認(rèn)信息,該條消息會(huì)重新發(fā)給其他的消費(fèi)者,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費(fèi)者,但實(shí)際上該消費(fèi)者已成功消費(fèi)了該條消息,造成消費(fèi)者消費(fèi)了重復(fù)的消息。 - 解決方案
MQ 消費(fèi)者的冪等性的解決一般使用全局 ID 或者寫個(gè)唯一標(biāo)識(shí)比如時(shí)間戳 或者 UUID 或者訂單消費(fèi)者消費(fèi) MQ 中的消息也可利用 MQ 的該 id 來判斷,或者可按自己的規(guī)則生成一個(gè)全局唯一 id,每次消費(fèi)消息時(shí)用該 id 先判斷該消息是否已消費(fèi)過。
1.3 消費(fèi)端的冪等性保障
在海量訂單生成的業(yè)務(wù)高峰期,生產(chǎn)端有可能就會(huì)重復(fù)發(fā)生了消息,這時(shí)候消費(fèi)端就要實(shí)現(xiàn)冪等性。
-
方案一:唯一ID+指紋碼機(jī)制
指紋碼:我們的一些規(guī)則或者時(shí)間戳加別的服務(wù)給到的唯一信息碼,它并不一定是我們系統(tǒng)生成的,基本都是由我們的業(yè)務(wù)規(guī)則拼接而來,但是一定要保證唯一性,然后就利用查詢語句進(jìn)行判斷這個(gè) id 是否存在數(shù)據(jù)庫中,優(yōu)勢(shì)就是實(shí)現(xiàn)簡(jiǎn)單就一個(gè)拼接,然后查詢判斷是否重復(fù);劣勢(shì)就是在高并發(fā)時(shí),如果是單個(gè)數(shù)據(jù)庫就會(huì)有寫入性能瓶頸當(dāng)然也可以采用分庫分表提升性能,但也不是我們最推薦的方式。 -
方案二:Redis原子性
利用 redis 執(zhí)行 setnx 命令,天然具有冪等性。從而實(shí)現(xiàn)不重復(fù)消費(fèi)
2. 優(yōu)先隊(duì)列
-
應(yīng)用場(chǎng)景
-
實(shí)現(xiàn)原理
- 代碼實(shí)現(xiàn)
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
//給消息賦予一個(gè) priority 屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
for (int i = 1; i <11; i++){
String message = "info"+i;
if(i==5){
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("發(fā)送消息完成:" + message);
}
}
}
}
public class Consumer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception
{Channel channel = RabbitMqUtils.getChannel();
//設(shè)置隊(duì)列的最大優(yōu)先級(jí) 最大可以設(shè)置到 255 官網(wǎng)推薦 1-10 如果設(shè)置太高比較吃內(nèi)存和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
System.out.println("消費(fèi)者啟動(dòng)等待消費(fèi)..............");
DeliverCallback deliverCallback=(consumerTag, delivery)-
>{ String receivedMessage = new
String(delivery.getBody());System.out.println("接收到消
息:"+receivedMessage);
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)-
>{System.out.println("消費(fèi)者無法消費(fèi)消息時(shí)調(diào)用,如隊(duì)列被刪除");
});
}
3.惰性隊(duì)列
-
定義
惰性隊(duì)列會(huì)盡可能的將消息存入磁盤中,而在消費(fèi)者消費(fèi)到相應(yīng)的消息時(shí)才會(huì)被加載到內(nèi)存中,它的一個(gè)重要的設(shè)計(jì)目標(biāo)是能夠支持更長(zhǎng)的隊(duì)列,即支持更多的消息存儲(chǔ)。 -
應(yīng)用場(chǎng)景
當(dāng)消費(fèi)者由于各種各樣的原因(比如消費(fèi)者下線、宕機(jī)亦或者是由于維護(hù)而關(guān)閉等)而致使長(zhǎng)時(shí)間內(nèi)不能消費(fèi)消息造成堆積時(shí),惰性隊(duì)列就很有必要了。 - 兩種設(shè)置模式
-
內(nèi)存開銷對(duì)比
十. RabbitMQ集群
1. 搭建
2. 鏡像隊(duì)列
引入鏡像隊(duì)列(Mirror Queue)的機(jī)制,可以將隊(duì)列鏡像到集群中的其他 Broker 節(jié)點(diǎn)之上,如果集群中的一個(gè)節(jié)點(diǎn)失效了,隊(duì)列能自動(dòng)地切換到鏡像中的另一個(gè)節(jié)點(diǎn)上以保證服務(wù)的可用性。
3. Haproxy + Keepalive實(shí)現(xiàn)高可用負(fù)載均衡
Haproxy 實(shí)現(xiàn)負(fù)載均衡
Keepalived 實(shí)現(xiàn)雙機(jī)(主備)熱備
4. Federation Exchange/Queue
文章來源:http://www.zghlxwxcb.cn/news/detail-500664.html
5. Shovel
文章來源地址http://www.zghlxwxcb.cn/news/detail-500664.html
到了這里,關(guān)于RabbitMQ學(xué)習(xí)筆記(尚硅谷)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!