1、前往RabbitMQ官網(wǎng)下載往RabbitMQ添加延遲消息的插件
RabbitMQ官網(wǎng)下載插件的網(wǎng)址:https://www.rabbitmq.com/community-plugins.html
2、下載rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下載的插件就得是什么版本的,得對應(yīng)上,以下截圖為官方文檔的對插件版本的要求說明)?
?3、把這個插件傳輸?shù)椒?wù)器上
4、根據(jù)官網(wǎng)的指示把插件放到RabbitMQ指定的文件夾下
RabbitMQ官網(wǎng)指示安裝插件步驟的網(wǎng)址:https://www.rabbitmq.com/installing-plugins.html
我這里安裝RabbitMQ的系統(tǒng)是CentOS,所以放在
5、拷貝插件到指定的目錄下
例:
cp rabbitmq_delayed_message_exchange-3.10.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins/
效果圖:
?6、安裝延遲隊列插件
輸入以下命令安裝延遲隊列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
效果圖:
7、重啟RabbitMQ
輸入以下命令重啟RabbitMQ
systemctl restart rabbitmq-server.service
效果圖:
8、查看插件是否安裝成功
?進(jìn)入RabbitMQ的管理頁面,進(jìn)入Exchange的管理頁面,新增Exchange,在Type里面可以看到x-delayed-message的選項,證明延遲隊列插件安裝成功
9、基于插件實現(xiàn)延遲隊列的原理示意圖
原先我們沒下插件之前實現(xiàn)延遲隊列是基于圖下這種方式實現(xiàn)的
但我們下載插件后就能通過交換機延遲消息的方式來實現(xiàn)消息的延遲了(由步驟8可見,我們驗證插件是否安裝成功是從Exchange進(jìn)去的,而不是從Queues進(jìn)去的)
10、基于插件延遲隊列的代碼實現(xiàn)
(1)在config包里新建一個名為DelayedQueueConfig的類用于編寫配置隊列延遲的代碼
代碼如下:
package com.ken.springbootrqbbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
//隊列
public static final String DELAYED_QUEUE_NAME = "delayed_queue";
//交換機
public static final String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";
//交換機
public static final String DELAYED_ROUTING_KEY = "delayed";
//聲明延遲隊列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//聲明延遲交換機
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>(3);
//設(shè)置延遲類型
arguments.put("x-delayed-type","direct");
/**
* 聲明自定義交換機
* 第一個參數(shù):交換機的名稱
* 第二個參數(shù):交換機的類型
* 第三個參數(shù):是否需要持久化
* 第四個參數(shù):是否自動刪除
* 第五個參數(shù):其他參數(shù)
*/
return new CustomExchange(DELAYED_QUEUE_NAME,"x-delayed-message",true,false,arguments);
}
//綁定隊列和延遲交換機
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") Exchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
?(2)在SendMsgController類里寫一個接口,讓其能往延遲隊列里發(fā)送消息
代碼如下:
package com.ken.springbootrqbbitmq.controller;
import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* 發(fā)送延遲消息
*/
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("當(dāng)前時間:{},發(fā)送一條消息給兩個TTL隊列:{}",new Date(),message);
rabbitTemplate.convertAndSend("normal_exchange","normal01","消息來著ttl為10s的隊列:" + message);
rabbitTemplate.convertAndSend("normal_exchange","normal02","消息來著ttl為40s的隊列:" + message);
}
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒的TTL消息給normal03隊列:{}", new Date(),ttlTime,message);
rabbitTemplate.convertAndSend("normal_exchange","normal03",message,msg -> {
//發(fā)送消息的時候延遲時長
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
/**
* 給延遲隊列發(fā)送消息
* @param message
* @param delayTime
*/
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒的消息給延遲隊列:{}", new Date(),delayTime,message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_QUEUE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
//發(fā)送消息的時候延遲時長
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
}
(3)在consumer包里新建一個名為DelayQueueConsumer的類用于編寫消費延遲隊列的消費者代碼
效果圖:
代碼如下:
package com.ken.springbootrqbbitmq.consumer;
import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 延遲隊列消費者
*/
@Slf4j
@Component
public class DelayQueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
private void receiveDelayQueue(Message message) {
String msg = new String(message.getBody());
log.info("當(dāng)前時間{},收到延遲隊列的消息",new Date(),msg);
}
}
(4)啟動項目,往瀏覽器輸入接口地址和參數(shù),從而調(diào)用接口
[1]第一條消息
http://localhost:8080/ttl/sendDelayMsg/我是第一條消息/20000
[2]第二條消息
http://localhost:8080/ttl/sendDelayMsg/我是第二條消息/2000
效果圖:
文章來源:http://www.zghlxwxcb.cn/news/detail-597116.html
結(jié)論:基于測試發(fā)現(xiàn)在使用延遲插件的情況下,延遲時間短的消息會被先消費,這證明基于插件的延遲消息達(dá)到預(yù)期效果文章來源地址http://www.zghlxwxcb.cn/news/detail-597116.html
到了這里,關(guān)于RabbitMQ系列(18)--RabbitMQ基于插件實現(xiàn)延遲隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!