原文:juejin.cn/post/6998363970037874724
前言
Rabbitmq 是使用 Erlang 語言開發(fā)的開源消息隊列系統(tǒng),基于 AMQP 實現(xiàn),是一種應(yīng)用程序?qū)?yīng)用程序的通信方法,應(yīng)用程序通過讀寫出入隊列的消息來通信,而無需專用連接來鏈接它們。消息傳遞指的是應(yīng)用程序之間通過在消息中發(fā)送數(shù)據(jù)進行通信,而不是通過直接調(diào)用彼此通信,直接調(diào)用通常是指遠程過程調(diào)用的技術(shù)。
核心組成
- Server:又稱 Broker,接收客戶端的連接,實現(xiàn) AMQP 實體服務(wù),安裝 rabbitmq-server
- Connection:連接,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接TCP/IP/三次握手和四次揮手
- Channel:網(wǎng)絡(luò)信道,幾乎所有操作都在 Channel 中進行,Channel 是進行消息讀寫的通道,客戶端可以建立多個 Channel,每個 Channel 代表一個會話任務(wù)。
- Message:消息,服務(wù)與應(yīng)用程序之間傳送的數(shù)據(jù),由 Properties 和 Body 組成,Properties 可以對消息進行修飾,比如消息的優(yōu)先級,延遲等高級特性,Body 則是消息體的內(nèi)容。
- Virtual Host:虛擬地址,用于進行邏輯隔離,最上層的消息路由,一個虛擬主機可以有若干個 exchange 和 queue,同一個虛擬主機里面不能有相同名稱的 exchange
- Exchange:交換機,接收消息,根據(jù)路由鍵發(fā)送消息到綁定的隊列(不具備消息存儲能力)
- Bindings:exchange 和 queue 之間的虛擬連接,binding 中可以保存多個 routing key
- Routing key:是一個路由規(guī)則,虛擬機可以用它來確定如何路由一個特定消息
- Queue:隊列,也稱為 Message Queue,消息隊列,保存消息并將它們轉(zhuǎn)發(fā)給消費者
Rabbitmq 消息模式
3.1 Simple 模式
Simple 模式是最簡單的一個模式,由一個生產(chǎn)者,一個隊列,一個消費者組成,生產(chǎn)者將消息通過交換機(此時,圖中并沒有交換機的概念,如不定義交換機,會使用默認的交換機)把消息存儲到隊列,消費者從隊列中取出消息進行處理。
用 Java demo 實現(xiàn)此模式,推薦一個開源免費的 Spring Boot 最全教程:
https://github.com/javastacks/spring-boot-best-practice
Productor
public class Send {
private final static String QUEUE_NAME = "queue1";
public static void main(String[] args) {
// 1、創(chuàng)建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、創(chuàng)建連接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 3、聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息內(nèi)容
String message = "Hello world";
// 4、發(fā)送消息到指定隊列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (TimeoutException | IOException e) {
e.printStackTrace();
} finally {
// 關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 關(guān)閉連接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Customer
public class Recv {
private final static String QUEUE_NAME = "queue1";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、創(chuàng)建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setVirtualHost("/");
// 2、獲取 Connection和 Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3、聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
觀察可視化界面,會看到消息先會被寫入到隊列中,隨后又被消費者消費了。
3.2 Fanout 模式
Fanout——發(fā)布訂閱模式,是一種廣播機制。
此模式包括:一個生產(chǎn)者、一個交換機 (exchange)、多個隊列、多個消費者。生產(chǎn)者將消息發(fā)送到交換機,交換機不存儲消息,將消息存儲到隊列,消費者從隊列中取消息。如果生產(chǎn)者將消息發(fā)送到?jīng)]有綁定隊列的交換機上,消息將丟失。
用 Java demo 實現(xiàn)此模式
Productor
public class Productor {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) {
// 1、創(chuàng)建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、獲取連接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 消息內(nèi)容
String message = "hello fanout mode";
// 指定路由key
String routeKey = "";
String type = "fanout";
// 3、聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, type);
// 4、聲明隊列
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueDeclare("queue3", true, false, false, null);
channel.queueDeclare("queue4", true, false, false, null);
// 5、綁定 channel 與 queue
channel.queueBind("queue1", EXCHANGE_NAME, routeKey);
channel.queueBind("queue2", EXCHANGE_NAME, routeKey);
channel.queueBind("queue3", EXCHANGE_NAME, routeKey);
channel.queueBind("queue4", EXCHANGE_NAME, routeKey);
// 6、發(fā)布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
System.out.println("消息發(fā)送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息發(fā)送異常");
}finally {
// 關(guān)閉通道和連接......
}
}
}
Customer
public class Customer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 獲取連接、通道
connection = factory.newConnection();
channel = connection.createChannel();
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(queueName + ":開始接收消息");
} catch (IOException |
TimeoutException e) {
e.printStackTrace();
} finally {
// 關(guān)閉通道和連接......
}
}
};
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建線程分別從四個隊列中獲取消息
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
new Thread(runnable, "queue4").start();
}
}
執(zhí)行完 Productor 發(fā)現(xiàn)四個隊列中分別增加了一條消息,而執(zhí)行完 Customer 后四個隊列中的消息都被消費者消費了。
3.3 Direct 模式
Direct 模式是在 Fanout 模式基礎(chǔ)上添加了 routing key,F(xiàn)anout(發(fā)布/訂閱)模式是交換機將消息存儲到所有綁定的隊列中,而 Direct 模式是在此基礎(chǔ)上,添加了過濾條件,交換機只會將消息存儲到滿足 routing key 的隊列中。
在上圖中,我們可以看到交換機綁定了兩個隊列,其中隊列 Q1綁定的 routing key 為 “orange” ,隊列Q2綁定的routing key 為 “black” 和 “green”。在這樣的設(shè)置中,發(fā)布 routing key 為 “orange” 的消息將被路由到 Q1,routing key 為 “black” 或 “green” 的消息將被路由到 Q2
在 rabbitmq 中給隊列綁定 routing_key,routing_key 必須是單詞列表
用 Java demo 實現(xiàn)此模式
Productor
public class Productor {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) {
// 1、創(chuàng)建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、獲取連接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 消息內(nèi)容
String message = "hello direct mode";
// 指定路由key
String routeKey = "email";
String type = "direct";
// 3、聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, type);
// 4、聲明隊列
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueDeclare("queue3", true, false, false, null);
// 5、綁定 channel 與 queue
channel.queueBind("queue1", EXCHANGE_NAME, "email");
channel.queueBind("queue2", EXCHANGE_NAME, "sms");
channel.queueBind("queue3", EXCHANGE_NAME, "vx");
// 6、發(fā)布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
System.out.println("消息發(fā)送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息發(fā)送異常");
} finally {
// 關(guān)閉通道和連接......
}
}
}
可以通過可視化頁面查看,各隊列綁定的 routing_key
由于設(shè)置的 routing_key為 “email”,所以,應(yīng)該只有 queue1 存儲了一條消息。
Customer 與上述 fanout 示例一致。
3.4 Topic 模式
Topic 模式是生產(chǎn)者通過交換機將消息存儲到隊列后,交換機根據(jù)綁定隊列的 routing key 的值進行通配符匹配,如果匹配通過,消息將被存儲到該隊列,如果 routing key 的值匹配到了多個隊列,消息將會被發(fā)送到多個隊列;如果一個隊列也沒匹配上,該消息將丟失。
routing_key 必須是單詞列表,用點分隔,其中 * 和 # 的含義為:
- *:1個單詞
- #:0個或多個單詞
用Java demo 實現(xiàn)此模式
Productor
public class Productor {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) {
// 1、創(chuàng)建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、獲取連接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 消息內(nèi)容
String message = "hello topic mode";
// 指定路由key
String routeKey = "com.order.test.xxx";
String type = "topic";
// 3、聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, type);
// 4、聲明隊列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
// 5、綁定 channel 與 queue
channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#");
channel.queueBind("queue6", EXCHANGE_NAME, "#.test.*");
// 6、發(fā)布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
System.out.println("消息發(fā)送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息發(fā)送異常");
} finally {
// 關(guān)閉通道和連接......
}
}
}
執(zhí)行完 Productor 后,通過可視化頁面查看到,queue 綁定的 routing_key
由于上述例子中,routing_key為:“com.order.test.xxx”,那么 queue5 和 queue6 都將接收到消息。
Customer 與上述實例一樣,執(zhí)行完 Customer 后,再次查看隊列信息,queue5 和 queue6 的消息都被消費了。
3.5 Work 模式
當(dāng)有多個消費者時,如何均衡消息者消費消息的多少,主要有兩種模式:
- 輪詢模式分發(fā):按順序輪詢分發(fā),每個消費者獲得相同數(shù)量的消息
- 公平分發(fā):根據(jù)消費者消費能力公平分發(fā),處理快的處理的多,處理慢的處理的少,按勞分配
3.5.1 輪詢分發(fā)
在這種模式下,rabbitmq 采用輪詢的方式將任務(wù)分配給多個消費者,但可能出現(xiàn)一種情況,當(dāng)分配給某一個消費者的任務(wù)很復(fù)雜時,而有些消費者接收的任務(wù)較輕量,會出現(xiàn)有的消費者很忙,而有的消費者處于空閑的狀態(tài),而 rabbitmq 不會感知到這種情況的發(fā)生,rabbitmq 不考慮消費者未確認消息的數(shù)量,只是盲目的分配任務(wù)。
用 Java demo 實現(xiàn)此模式
Productor
public class Productor {
public static void main(String[] args) {
// 1、創(chuàng)建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、獲取連接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 3、向 Queue1 發(fā)布20個消息
for (int i = 0; i < 20; i++) {
String msg = "feiyangyang: " + i;
channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息發(fā)送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息發(fā)送異常");
} finally {
// 關(guān)閉通道和連接......
}
}
}
Worker1
public class Worker1 {
public static void main(String[] args) {
// 1、創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 獲取連接、通道
connection = factory.newConnection();
channel = connection.createChannel();
Channel finalChannel = channel;
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("Worker1 開始接收消息");
System.in.read();
} catch (IOException |
TimeoutException e) {
e.printStackTrace();
} finally {
// 關(guān)閉通道和連接......
}
}
}
Worker2 與 Worker1 相同
我們看下消息分發(fā)結(jié)果:
Worker1 開始接收消息
Worker1:收到消息是:feiyangyang: 0
Worker1:收到消息是:feiyangyang: 2
Worker1:收到消息是:feiyangyang: 4
Worker1:收到消息是:feiyangyang: 6
Worker1:收到消息是:feiyangyang: 8
Worker1:收到消息是:feiyangyang: 10
Worker1:收到消息是:feiyangyang: 12
Worker1:收到消息是:feiyangyang: 14
Worker1:收到消息是:feiyangyang: 16
Worker1:收到消息是:feiyangyang: 18
Worker2 開始接收消息
Worker2:收到消息是:feiyangyang: 1
Worker2:收到消息是:feiyangyang: 3
Worker2:收到消息是:feiyangyang: 5
Worker2:收到消息是:feiyangyang: 7
Worker2:收到消息是:feiyangyang: 9
Worker2:收到消息是:feiyangyang: 11
Worker2:收到消息是:feiyangyang: 13
Worker2:收到消息是:feiyangyang: 15
Worker2:收到消息是:feiyangyang: 17
Worker2:收到消息是:feiyangyang: 19
可以看出,輪詢分發(fā)模式就是將消息均衡的分配所有消費者。
3.5.2 公平分發(fā)
為了解決 Work 輪詢分發(fā)模式 這個問題,rabbitmq 使用帶有 perfetchCount = 1 設(shè)置的 basicQos 方法。當(dāng)消費者接受處理并確認前一條消息前,不向此消費者發(fā)送新消息,會分配給其他空閑的消費者。
Productor 代碼與上述輪詢模式相同,而 Customer 中稍作修改
Worker1
// Channel 使用 Qos 機制
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
try {
Thread.sleep(1000);
// 改成手動應(yīng)答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
上述實例相較于輪詢分發(fā)模式,添加了 Qos 機制,設(shè)置值為1,代表消費者每次從隊列中獲取幾條消息,將 Worker1 的 sleep 時間設(shè)置為 1s,將 Worker2 的 sleep 時間設(shè)置為 2s,查看消息分發(fā)結(jié)果
Worker1 開始接收消息
Worker1:收到消息是:feiyangyang: 0
Worker1:收到消息是:feiyangyang: 2
Worker1:收到消息是:feiyangyang: 4
Worker1:收到消息是:feiyangyang: 5
Worker1:收到消息是:feiyangyang: 7
Worker1:收到消息是:feiyangyang: 8
Worker1:收到消息是:feiyangyang: 10
Worker1:收到消息是:feiyangyang: 11
Worker1:收到消息是:feiyangyang: 13
Worker1:收到消息是:feiyangyang: 14
Worker1:收到消息是:feiyangyang: 16
Worker1:收到消息是:feiyangyang: 17
Worker1:收到消息是:feiyangyang: 19
Worker2 開始接收消息
Worker2:收到消息是:feiyangyang: 1
Worker2:收到消息是:feiyangyang: 3
Worker2:收到消息是:feiyangyang: 6
Worker2:收到消息是:feiyangyang: 9
Worker2:收到消息是:feiyangyang: 12
Worker2:收到消息是:feiyangyang: 15
Worker2:收到消息是:feiyangyang: 18
當(dāng)使用 Work 公平分發(fā)模式時,要設(shè)置消費者為手動應(yīng)答,并且開啟 Qos 機制。
防止消息丟失機制
4.1 消息確認
消費者完成一項任務(wù)可能需要幾秒鐘,如果其中一個消費者開始了一項長期任務(wù)并且只完成了部分任務(wù)而死亡,如果將 autoAck 設(shè)置為 true ,一旦 RabbitMQ 將消息傳遞給消費者,它會立即將其標(biāo)記為刪除,在這種情況下,我們將丟失所有已分派給該特定消費者但尚未處理的消息。
如果其中一個消費者宕了,rabbitmq 可以將其消息分配給其他消費者。為了確保消息不會丟失,rabbitmq 采用消息確認,消費者發(fā)回確認消息,告訴 rabbitmq 消息已經(jīng)被接收并處理,此時,rabbitmq 可以放心的刪除這條消息。
如果消費者在沒有發(fā)送 ack 的情況下宕了,rabbitmq 將理解為該條消息未被消費者處理完,如果有其他消費者在線,將迅速重新交付給其他消費者,這樣就可以確保不會丟失消息了。
默認情況下rabbitmq 會啟用手動消息確認,也就是 autoAck 默認為 false,一旦我們完成了一項任務(wù),需要手動的進行消息確認,所以 autoAck 需要保持為默認值 false,并使用如下方法進行手動應(yīng)答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4.2 持久化
rabbitmq 的消息確認機制可以保證消息不會丟失,但是如果 rabbitmq 服務(wù)器停止,我們的任務(wù)仍然會丟失。
當(dāng) rabbitmq 退出或崩潰時,如果不進行持久化,隊列和消息都會消失。需要做兩件事來確保消息不會丟失,將隊列和消息都標(biāo)記為持久的。
- 設(shè)置隊列持久
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
- 設(shè)置消息持久
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
將消息標(biāo)記為持久性并不能完全保證消息不會丟失,當(dāng) rabbitmq 接收到消息并且還沒保存時,仍然有很短的時間窗口會使消息丟失,如果需要更強的保證,可以使用發(fā)布者確認機制。
使用場景
解耦、削峰、異步
解耦
在微服務(wù)架構(gòu)體系中,微服務(wù)A需要與微服務(wù)B進行通信,傳統(tǒng)的做法是A調(diào)用B的接口。但這樣做如果系統(tǒng)B無法訪問或連接超時,系統(tǒng)A需要等待,直到系統(tǒng)B做出響應(yīng),并且A與B存在嚴(yán)重的耦合現(xiàn)象。如果引入消息隊列進行系統(tǒng)AB的通信,流程是這樣的:
- 系統(tǒng)A將消息存儲到消息隊列中,返回成功信息
- 系統(tǒng)B從隊列中獲取消息,進行處理操作
系統(tǒng)A將消息放到隊列中,就不用關(guān)心系統(tǒng)B是否可以獲取等其他事情了,實現(xiàn)了兩個系統(tǒng)間的解耦。
使用場景:
- 短信、郵件通知
削峰
系統(tǒng)A每秒請求100個,系統(tǒng)可以穩(wěn)定運行,但如果在秒殺活動中,每秒并發(fā)達到1w個,但系統(tǒng)最大處理能力只能每秒處理 1000 個,所以,在秒殺活動中,系統(tǒng)服務(wù)器會出現(xiàn)宕機的現(xiàn)象。如果引入 MQ ,可以解決這個問題。每秒 1w個請求會導(dǎo)致系統(tǒng)崩潰,那我們讓用戶發(fā)送的請求都存儲到隊列中,由于系統(tǒng)最大處理能力是每秒1000個請求,讓系統(tǒng)A每秒只從隊列中拉取1000個請求,保證系統(tǒng)能穩(wěn)定運行,在秒殺期間,請求大量進入到隊列,積壓到MQ中,而系統(tǒng)每秒只從隊列中取1000個請求處理。這種短暫的高峰期積壓是沒問題的,因為高峰期一旦過去,每秒請求數(shù)迅速遞減,而系統(tǒng)每秒還是從隊列中取1000個請求進行處理,系統(tǒng)會快速將積壓的消息消費掉。
使用場景:
- 秒殺活動
- 團搶活動
異步
用戶注冊,需要發(fā)送注冊郵件和注冊短信,傳統(tǒng)的做法有兩種:串行、并行。
- 串行方式:將注冊信息寫庫后(50ms),發(fā)送郵件(50ms),再發(fā)送短信(50ms),任務(wù)完成后,返回客戶端,共耗時(150ms)
- 并行方式:將注冊信息寫庫后(50ms),開啟子線程讓發(fā)送郵件和發(fā)送短信同時進行(50ms),返回客戶端,共耗時(100ms)
- 引入MQ,將注冊信息寫庫(50ms),將發(fā)送郵件和短信的操作寫入隊列(5s),返回客戶端,而消費者什么時候從隊列中取消息進行處理,不用關(guān)心,共耗時(55ms)
使用場景:
- 將不是必須等待響應(yīng)結(jié)果的業(yè)務(wù)邏輯進行異步處理
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2022最新版)
2.勁爆!Java 協(xié)程要來了。。。
3.Spring Boot 2.x 教程,太全了!
4.別再寫滿屏的爆爆爆炸類了,試試裝飾器模式,這才是優(yōu)雅的方式??!
5.《Java開發(fā)手冊(嵩山版)》最新發(fā)布,速速下載!文章來源:http://www.zghlxwxcb.cn/news/detail-457242.html
覺得不錯,別忘了隨手點贊+轉(zhuǎn)發(fā)哦!文章來源地址http://www.zghlxwxcb.cn/news/detail-457242.html
到了這里,關(guān)于面試必問:RabbitMQ 有哪幾種消息模式?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!