Lison
<dreamlison@163.com>
, v1.0.0
, 2023.06.23
RabbitMQ高級特性(消費端限流、利用限流實現(xiàn)不公平分發(fā)、消息存活時間、優(yōu)先級隊列
消費端限流
之前我們講過MQ可以對請求進行“削峰填谷”,即通過消費端限流的方式限制消息的拉取速度,達到保護消費端的目的。
1、生產(chǎn)者批量發(fā)送消息
@Test
public void testSendBatch() {
// 發(fā)送十條消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
}
}
2、消費端配置限流機制
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 1233456
virtual-host: /
listener:
simple:
# 限流機制必須開啟手動簽收
acknowledge-mode: manual
# 消費端最多拉取5條消息消費,簽收后不滿5條才會繼續(xù)拉取消息。
prefetch: 5
3、消費者監(jiān)聽隊列
@Component
public class QosConsumer{
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
// 1.獲取消息
System.out.println(new String(message.getBody()));
// 2.模擬業(yè)務處理
Thread.sleep(3000);
// 3.簽收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}
利用限流實現(xiàn)不公平分發(fā)
在RabbitMQ中,多個消費者監(jiān)聽同一條隊列,則隊列默認采用的輪詢分發(fā)。但是在某種場景下這種策略并不是很好,例如消費者1處 理任務的速度非???,而其他消費者處理速度卻很慢。此時如果采用公平分發(fā),則消費者1有很大一部分時間處于空閑狀態(tài)。此時可以 采用不公平分發(fā),即誰處理的快,誰處理的消息多
1、生產(chǎn)者批量發(fā)送消息
@Test
public void testSendBatch() {
// 發(fā)送十條消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
}
}
端配置不公平分發(fā)
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 1233456
virtual-host: /
listener:
simple:
# 限流機制必須開啟手動簽收
acknowledge-mode: manual
# 消費端最多拉取1條消息消費,這樣誰處理的快誰拉取下一條消息,實現(xiàn)了不公平分發(fā)
prefetch: 1
編寫兩個消費者
@Component
public class UnfairConsumer {
// 消費者1
@RabbitListener(queues = "my_queue")
public void listenMessage1(Message message, Channel channel) throws Exception
{
//1.獲取消息
System.out.println("消費者1:"+new String(message.getBody(),"UTF-8"));
//2. 處理業(yè)務邏輯
Thread.sleep(500); // 消費者1處理快
//3. 手動簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
// 消費者2
@RabbitListener(queues = "my_queue")
public void listenMessage2(Message message, Channel channel) throws Exception
{
//1.獲取消息
System.out.println("消費者2:"+new String(message.getBody(),"UTF-8"));
//2. 處理業(yè)務邏輯
Thread.sleep(3000);// 消費者2處理慢
//3. 手動簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
消息存活時間
RabbitMQ可以設(shè)置消息的存活時間(Time To Live,簡稱TTL), 當消息到達存活時間后還沒有被消費,會被移出隊列。RabbitMQ 可以對隊列的所有消息設(shè)置存活時間,也可以對某條消息設(shè)置存活時間。
設(shè)置隊列所有消息存活時間
1、在創(chuàng)建隊列時設(shè)置其存活時間:
@Configuration
public class RabbitConfig2 {
private final String EXCHANGE_NAME="my_topic_exchange2";
private final String QUEUE_NAME="my_queue2";
// 1.創(chuàng)建交換機
@Bean("bootExchange2")
public Exchange getExchange2(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true).
build();
}
// 2.創(chuàng)建隊列
@Bean("bootQueue2")
public Queue getMessageQueue2(){
return QueueBuilder
.durable(QUEUE_NAME)
.ttl(10000) //隊列的每條消息存活10s
.build();
}
// 3.將隊列綁定到交換機
@Bean
public Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,@Qualifier("bootQueue2") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
}
}
2、生產(chǎn)者批量生產(chǎn)消息,測試存活時間
@Test
public void testSendBatch2() throws InterruptedException {
// 發(fā)送十條消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);
}
}
設(shè)置單條消息存活時間
@Test
public void testSendMessage() {
//設(shè)置消息屬性
MessageProperties messageProperties = new MessageProperties();
//設(shè)置存活時間
messageProperties.setExpiration("10000");
// 創(chuàng)建消息對象
Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);
// 發(fā)送消息
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}
注意:
1 如果設(shè)置了單條消息的存活時間,也設(shè)置了隊列的存活時間,以時間短的為準。
2 消息過期后,并不會馬上移除消息,只有消息消費到隊列頂端時,才會移除該消息。
@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.創(chuàng)建消息屬性
MessageProperties messageProperties = new MessageProperties();
// 2.設(shè)置存活時間
messageProperties.setExpiration(“10000”);
// 3.創(chuàng)建消息對象
Message message = new Message((“send message…” +i).getBytes(),messageProperties);
// 4.發(fā)送消息
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, message);
} else {
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, “sendmessage…” + i);
}
}
}
在以上案例中,i=5的消息才有過期時間,10s后消息并沒有 馬上被移除,但該消息已經(jīng)不會被消費了,當它到達隊列頂 端時會被移除。
優(yōu)先級隊列
假設(shè)在電商系統(tǒng)中有一個訂單催付的場景,即客戶在一段時間內(nèi)未付款會給用戶推送一條短信提醒,但是系統(tǒng)中分為大型商家和小型商家。比如像蘋果,小米這樣大商家一年能給我們創(chuàng)造很大的利潤,所以在訂單量大時,他們的訂單必須得到優(yōu)先處理,此時就需要為不同的消息設(shè)置不同的優(yōu)先級,此時我們要使用優(yōu)先級隊列。
1、創(chuàng)建隊列和交換機
@Configuration
public class RabbitConfig3 {
private final String EXCHANGE_NAME="priority_exchange";
private final String QUEUE_NAME="priority_queue";
// 1.創(chuàng)建交換機
@Bean(EXCHANGE_NAME)
public Exchange priorityExchange(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true).
build();
}
// 2.創(chuàng)建隊列
@Bean(QUEUE_NAME)
public Queue priorityQueue(){
return QueueBuilder
.durable(QUEUE_NAME)
//設(shè)置隊列的最大優(yōu)先級,最大可以設(shè)置到255,官網(wǎng)推薦不要超過10,,如果設(shè)置太高比較浪費資源
.maxPriority(10)
.build();
}
// 3.將隊列綁定到交換機
@Bean
public Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
}
}
2、編寫生產(chǎn)者文章來源:http://www.zghlxwxcb.cn/news/detail-610949.html
@Test
public void testPriority() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// i為5時消息的優(yōu)先級較高
MessageProperties messageProperties = new MessageProperties();
messageProperties.setPriority(9);
Message message = new Message(("send message..." +i).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
} else {
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);
}
}
}
3、編寫消費者文章來源地址http://www.zghlxwxcb.cn/news/detail-610949.html
@Component
public class PriorityConsumer {
@RabbitListener(queues = "priority_queue")
public void listenMessage(Message message, Channel channel) throws Exception
{
//獲取消息
System.out.println(new String(message.getBody()));
//手動簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
到了這里,關(guān)于(四)RabbitMQ高級特性(消費端限流、利用限流實現(xiàn)不公平分發(fā)、消息存活時間、優(yōu)先級隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!