??個(gè)人主頁(yè):沫洺的主頁(yè)
????系列專欄:????JavaWeb專欄???JavaSE專欄 ???Java基礎(chǔ)專欄??vue3專欄?
? ? ? ? ? ? ? ? ? ? ? ? ? ???MyBatis專欄??Spring專欄??SpringMVC專欄??SpringBoot專欄
?????????????????????????????Docker專欄??Reids專欄??MQ專欄??SpringCloud專欄? ? ?
????如果文章對(duì)你有所幫助請(qǐng)留下三連??
??延遲隊(duì)列
使用rabbitmq的延時(shí)隊(duì)列插件,實(shí)現(xiàn)同一個(gè)隊(duì)列中有多個(gè)不同超時(shí)時(shí)間的消息,并按時(shí)間超時(shí)順序出隊(duì)
??下載延遲插件
在 RabbitMQ 的 3.5.7 版本之后,提供了一個(gè)插件(rabbitmq-delayed-message-exchange)來實(shí)現(xiàn)延遲隊(duì)列 ,同時(shí)需保證 Erlang/OPT 版本為 18.0 之后。
我這里 MQ 的版本是 3.10.5,現(xiàn)在去 GitHub 上根據(jù)版本號(hào)下載插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases?
?安裝插件并啟用
下載完成后直接把插件放在 /root/211 目錄,然后拷貝到容器內(nèi)plugins目錄下(rabbitmq是容器的name,也可以使用容器id)
?
docker cp /home/211/rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins
進(jìn)入 Docker 容器
docker exec -it rabbitmq /bin/bash
在plugins內(nèi)啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器
exit
重啟 RabbitMQ
docker restart rabbitmq
安裝成功
?通過UI查看
??SpringBoot使用延遲隊(duì)列
消費(fèi)者
自定義交換機(jī)CustomExchange
@Component public class DelayConsumer { private static final String ENAME = "211-DelayExchage-01"; private static final String QNAME1 = "211-DelayQueue-01"; //自定義交換機(jī) @Bean public CustomExchange customExchange() { HashMap<String, Object> args = new HashMap<>(); args.put("x-delayed-type","direct"); //延遲交換機(jī) return new CustomExchange(ENAME, "x-delayed-message", true, false, args); } //定義一個(gè)隊(duì)列 @Bean public Queue queue() { return QueueBuilder.durable(QNAME1).build(); } //創(chuàng)建隊(duì)列和交換機(jī)的綁定關(guān)系 @Bean public Binding binding1() { return BindingBuilder.bind(queue()).to(customExchange()).with("diancan").noargs(); } //消費(fèi)者 @RabbitHandler @RabbitListener(queues = QNAME1) public void process1(UserRegisterOk userRegisterOk) { System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"消費(fèi)者收到:" + userRegisterOk.getName() + "," + userRegisterOk.getHeight()); } }
messages delayed: 0 默認(rèn)延遲時(shí)間0s
生產(chǎn)者
設(shè)置延遲時(shí)間
????????message -> { ? ? ? ? ? ??//設(shè)置消息延遲時(shí)間5秒,5秒之后投遞給隊(duì)列 針對(duì)的是交換機(jī) ? ? ? ? ? ? message.getMessageProperties().setDelay(5*1000); ? ? ? ? ? ? return message; ? ? ? ? }
@Component public class DelayProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(){ //延遲5秒 UserRegisterOk userRegisterOk1 = UserRegisterOk.builder().name("張一").phone("123456").height("1.8.5").build(); //要將對(duì)象序列化,轉(zhuǎn)成字符串,使用消息轉(zhuǎn)換器MessageConverter rabbitTemplate.convertAndSend("211-DelayExchage-01","diancan",userRegisterOk1,message -> { message.getMessageProperties().setDelay(5*1000); return message; }); System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"生產(chǎn)者1生產(chǎn)-->張一發(fā)送成功"); //延遲8秒 UserRegisterOk userRegisterOk2 = UserRegisterOk.builder().name("張二").phone("123456").height("1.8.5").build(); //要將對(duì)象序列化,轉(zhuǎn)成字符串,使用消息轉(zhuǎn)換器MessageConverter rabbitTemplate.convertAndSend("211-DelayExchage-01","diancan",userRegisterOk2,message -> { message.getMessageProperties().setDelay(8*1000); return message; }); System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"生產(chǎn)者2生產(chǎn)-->張二發(fā)送成功"); } }
文章來源:http://www.zghlxwxcb.cn/news/detail-428283.html
整個(gè)的流程就是生產(chǎn)者生產(chǎn)消息后,在交換機(jī)中停留指定的延遲時(shí)間,后發(fā)送到隊(duì)列,消費(fèi)者獲取隊(duì)列中的消息?文章來源地址http://www.zghlxwxcb.cn/news/detail-428283.html
補(bǔ)充延遲隊(duì)列不常用的兩種方式
創(chuàng)建具有超時(shí)功能且綁定死信交換機(jī)的消息隊(duì)列
創(chuàng)建通用延時(shí)消息
到了這里,關(guān)于[MQ] 延遲隊(duì)列/延遲插件下載的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!