? ? ? ? 解決沒法優(yōu)先發(fā)送延時時間短的消息。文章來源地址http://www.zghlxwxcb.cn/news/detail-524957.html
- 插件安裝
下載:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
將插件放入:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins下
進入目錄:cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
安裝:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重啟:systemctl restart rabbitmq-server
- 配置類
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.CustomAutowireConfigurer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayConfig {
//死信隊列
public static final String DELAY_QUEUE = "DELAY_QUEUE";
//死信交換機
public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
//死信routingKey
public static final String DELAY_ROUNTING_KEY = "DELAY_ROUNTING_KEY";
//聲明交換機
@Bean
public CustomExchange delayEchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
//1.交換機名稱
//2.交換機類型
//3.是否需要持久化
//4.是否需要自動刪除
//5.其他參數(shù)
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);
}
//聲明隊列
@Bean
public Queue delayQueue() {
//創(chuàng)建隊列
return new Queue(DELAY_QUEUE);
}
//綁定隊列
@Bean
public Binding delayBindingQueue(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayEchange") CustomExchange delayEchange) {
return BindingBuilder.bind(delayQueue).to(delayEchange).with(DELAY_ROUNTING_KEY).noargs();
}
}
- 生產(chǎn)者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 延遲隊列----插件
*
*
**/
@Slf4j
@RestController
@RequestMapping("/delayed")
public class SendDelayMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
//延遲隊列
@GetMapping("/sendExpireMsg/{message}/{ttlTime}")
public void sendExpireMsg(@PathVariable String message, @PathVariable Integer ttlTime) {
log.info("發(fā)送定時消息");
rabbitTemplate.convertAndSend(DelayConfig.DELAY_EXCHANGE, DelayConfig.DELAY_ROUNTING_KEY, "消息來自定時消息:" + message, msg -> {
//發(fā)消息的時候,延遲時長
msg.getMessageProperties().setDelay(ttlTime);
return msg;
});
}
}
- 消費者
import cn.my.config.rabbitmq.delaybyplugin02.DelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DelayConsume {
@Autowired
private StringRedisTemplate stringRedisTemplate;
//接收消息
@RabbitListener(queues = DelayConfig.DELAY_QUEUE)
public void receiveDelay(Message msg) {
String message = new String(msg.getBody());
log.info("接收到了插件延遲隊列消息:" + message);
}
}
文章來源:http://www.zghlxwxcb.cn/news/detail-524957.html
到了這里,關(guān)于Rabbitmq 延遲隊列---插件的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!