接上一篇:RabbitMQ(三) | 死信交換機(jī)、死信隊(duì)列、TTL、延遲隊(duì)列(安裝DelayExchange插件)
1.消息堆積問題
當(dāng)生產(chǎn)者發(fā)送消息的速度超過了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問題。
解決消息堆積有兩種思路:
- 增加更多消費(fèi)者,提高消費(fèi)速度。也就是我們之前說的work queue模式
- 給消費(fèi)者開啟多線程,提高消費(fèi)速度
- 擴(kuò)大隊(duì)列容積,提高堆積上限
要提升隊(duì)列容積,把消息保存在內(nèi)存中顯然是不行的。
2.惰性隊(duì)列
從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:
- 接收到消息后直接存入磁盤而非內(nèi)存
- 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存
- 支持?jǐn)?shù)百萬條的消息存儲(chǔ)
2.1.基于命令行設(shè)置lazy-queue
而要設(shè)置一個(gè)隊(duì)列為惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定x-queue-mode屬性為lazy即可??梢酝ㄟ^命令行將一個(gè)運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
-
rabbitmqctl
:RabbitMQ的命令行工具 -
set_policy
:添加一個(gè)策略 -
Lazy
:策略名稱,可以自定義 -
"^lazy-queue$"
:用正則表達(dá)式匹配隊(duì)列的名字 -
'{"queue-mode":"lazy"}'
:設(shè)置隊(duì)列模式為lazy模式 -
--apply-to queues
:策略的作用對(duì)象,是所有的隊(duì)列
2.2.基于@Bean聲明lazy-queue
在consumer服務(wù)端的新增惰性隊(duì)列配置類LazyConfig:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** 惰性隊(duì)列配置 */
@Configuration
public class LazyConfig {
/** 惰性隊(duì)列 - 增加了.lazy()屬性 */
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy() // 開啟x-queue-mode為lazy
.build();
}
/** 正常隊(duì)列 */
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal.queue")
.build();
}
}
2.3.基于@RabbitListener聲明LazyQueue
在consumer服務(wù)端的消息監(jiān)聽類中新增方法
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {
log.info("接收到 lazy.queue 的消息:{}", msg);
}
2.4.發(fā)送消息
這里對(duì)正常隊(duì)列和惰性隊(duì)列進(jìn)行十萬條數(shù)據(jù)的測(cè)試,啟動(dòng)consumer服務(wù),在publisher服務(wù)的SpringAmqpTest類中新增下列兩個(gè)方法:
/** 測(cè)試惰性隊(duì)列 */
@Test
public void testLazyQueue() throws InterruptedException {
// 模擬發(fā)送十萬條數(shù)據(jù)
long b = System.nanoTime();
for (int i = 0; i < 1000000; i++) {
// 1.準(zhǔn)備消息
Message message = MessageBuilder
.withBody("hello, LazyQueue".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 改成非持久化,可以看一下LazyQueue的效果
.build();
// 2.發(fā)送消息
rabbitTemplate.convertAndSend("lazy.queue", message);
}
long e = System.nanoTime();
System.out.println(e - b);
}
/** 測(cè)試正常隊(duì)列 */
@Test
public void testNormalQueue() throws InterruptedException {
// 模擬發(fā)送十萬條數(shù)據(jù)
long b = System.nanoTime();
for (int i = 0; i < 1000000; i++) {
// 1.準(zhǔn)備消息
Message message = MessageBuilder
.withBody("hello, Spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 改成非持久化,可以看一下正常隊(duì)列的效果
.build();
// 2.發(fā)送消息
rabbitTemplate.convertAndSend("normal.queue", message);
}
long e = System.nanoTime();
System.out.println(e - b);
}
RabbitMQ控制臺(tái)隊(duì)列的數(shù)據(jù)變化:
初始:
運(yùn)行兩個(gè)生產(chǎn)者方法后,看一下兩個(gè)隊(duì)列總覽
惰性隊(duì)列數(shù)據(jù)變化
全部一進(jìn)來就會(huì)存儲(chǔ)到磁盤,內(nèi)存中只有需要消費(fèi)的:
正常隊(duì)列數(shù)據(jù)變化
而正常隊(duì)列都在內(nèi)存中(這里是沒有開啟持久化的情況):
3.總結(jié)
消息堆積問題的解決方案?
- 隊(duì)列上綁定多個(gè)消費(fèi)者,提高消費(fèi)速度
- 給消費(fèi)者開啟多線程,提高消費(fèi)速度
- 使用惰性隊(duì)列,可以在mq中保存更多消息
惰性隊(duì)列的優(yōu)點(diǎn)有哪些?
- 基于磁盤存儲(chǔ),消息上限高
- 沒有間歇性的page-out,性能比較穩(wěn)定
惰性隊(duì)列的缺點(diǎn)有哪些?文章來源:http://www.zghlxwxcb.cn/news/detail-428106.html
- 基于磁盤存儲(chǔ),消息時(shí)效性會(huì)降低
- 性能受限于磁盤的IO
下一篇:RabbitMQ(五) | MQ集群搭建、部署、仲裁隊(duì)列、集群擴(kuò)容文章來源地址http://www.zghlxwxcb.cn/news/detail-428106.html
到了這里,關(guān)于RabbitMQ(四) | 惰性隊(duì)列 - 解決消息堆積問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!