一、什么是延遲消息
假設(shè)有一個(gè)業(yè)務(wù)場景:超過30分鐘未付款的訂單自動(dòng)關(guān)閉,這個(gè)功能應(yīng)該怎么實(shí)現(xiàn)?
RabbitMQ使用死信隊(duì)列,可以實(shí)現(xiàn)消息的延遲接收。
1、隊(duì)列的屬性
隊(duì)列有一個(gè)消息過期屬性。就像豐巢超過24小時(shí)就收費(fèi)一樣,通過設(shè)置這個(gè)屬性,超過了指定事件的消息將會(huì)被丟棄。
這個(gè)屬性交:x-message-ttl
所有隊(duì)列中的消息超過時(shí)間未被消費(fèi)時(shí),都會(huì)過期。不管是誰發(fā)送的消息都一視同仁。
@Bean("ttlQueue")
public Queue queue() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 11000); // 隊(duì)列中的消息未被消費(fèi)11秒后過期
// map.put("x-expire", 30000); // 隊(duì)列30秒沒有使用以后會(huì)被刪除
return new Queue("TTL_QUEUE", true, false, false, map);
}
但是這種方式似乎并不是那么的靈活。所以RabbitMQ的消息也有單獨(dú)的過期時(shí)間屬性。
2、消息的屬性
在生產(chǎn)者發(fā)送消息時(shí),可以通過MessageProperties指定消息屬性。
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("4000"); // 消息的過期屬性,單位ms
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("這條消息4秒后過期".getBytes(), messageProperties);
rabbitTemplate.send("TTL_EXCHANGE", "test.ttl", message);
那么問題來了:如果隊(duì)列的TTL是6秒過期,消息的TTL是10秒過期,這個(gè)消息會(huì)在什么時(shí)候被丟棄?
答:如果同時(shí)指定了Message TTL和Queue TTL,那么小的那個(gè)會(huì)生效。
3、什么是死信
上面我們了解到,rabbitMQ的消息可以設(shè)置過期時(shí)間,消息過期后會(huì)被直接丟棄,我們可以通過配置死信隊(duì)列,將這種消息變成死信(Dead Letter),然后將這種過期的消息丟入死信隊(duì)列。
隊(duì)列在創(chuàng)建的時(shí)候可以指定一個(gè)死信交換機(jī)DLX(Dead Letter Exchange)。死信交換機(jī)綁定的隊(duì)列被稱為死信隊(duì)列DLQ(Dead Letter Queue),DLX實(shí)際上也是普通的交換機(jī),DLQ也是普通的隊(duì)列。
也就是說,如果消息過期了,隊(duì)列指定了DLX,就會(huì)發(fā)送到DLX。如果DLX綁定了DLQ,就會(huì)路由到DLQ。路由到DLQ之后,我們就可以消費(fèi)死信隊(duì)列了。
4、使用死信隊(duì)列的缺點(diǎn)
(1)如果統(tǒng)一用隊(duì)列來設(shè)置消息的TTL,當(dāng)梯度非常多的情況下,比如1分鐘、2分鐘、5分鐘、10分鐘……需要?jiǎng)?chuàng)建很多交換機(jī)和隊(duì)列來路由消息,這時(shí)可以考慮使用消息的TTL。
(2)如果單獨(dú)設(shè)置消息的TTL,則可能會(huì)造成隊(duì)列中的消息阻塞
——前一條消息沒有出隊(duì)(沒有被消費(fèi)),后面的消息無法投遞(比如第一條消息的過期時(shí)間是30分鐘,第二條消息的過期時(shí)間是10分鐘。10分鐘后,即使第二條消息應(yīng)該投遞了,但是由于第一條消息還未出隊(duì),所以無法投遞)。
(3)可能存在一定的時(shí)間誤差。
5、延時(shí)消息插件
在RabbitMQ 3.5.7及以后的版本提供了一個(gè)插件(rabbitmq-delayed-message-exchange)來實(shí)現(xiàn)延時(shí)隊(duì)列功能(Linux和Windows都可以用)。同時(shí)插件依賴Erlang/OPT 18.0及以上。
插件源碼地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
找到對應(yīng)版本的插件,然后下載。
# 下載到plugins目錄
cd rabbitmq_server-3.7.7/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
# 啟用插件
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 停用插件
./rabbitmq-plugins disable rabbitmq_delayed_message_exchange
此時(shí),在管理界面的創(chuàng)建交換機(jī)頁面,會(huì)出現(xiàn)一個(gè)x-delayed-message類型的交換機(jī):
二、JavaAPI利用死信隊(duì)列實(shí)現(xiàn)RabbitMQ延遲消息
1、代碼實(shí)現(xiàn)
引包:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.AMQBasicProperties;
import java.util.HashMap;
import java.util.Map;
/**
* 消息生產(chǎn)者,通過TTL測試死信隊(duì)列
*/
public class DlxProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));// rabbitmq.uri=amqp://admin:admin@192.168.56.10:5672
// 建立連接
Connection conn = factory.newConnection();
// 創(chuàng)建消息通道
Channel channel = conn.createChannel();
String msg = "Hello world, Rabbit MQ, DLX MSG";
// 設(shè)置屬性,消息10秒鐘過期
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化消息
.contentEncoding("UTF-8")
.expiration("5000") // TTL
.build();
// 發(fā)送消息,普通隊(duì)列
channel.basicPublish("ORI_USE_EXCHANGE", "ORI_USE_QUEUE", properties, msg.getBytes());
channel.close();
conn.close();
}
}
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 5秒鐘后,消息會(huì)從正常隊(duì)列 ORI_USE_QUEUE 到達(dá)死信交換機(jī) DEAD_LETTER_EXCHANGE ,然后路由到死信隊(duì)列 DEAD_LETTER_QUEUE
*
*/
public class DlxConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // rabbitmq.uri=amqp://admin:admin@192.168.56.10:5672
// 建立連接
Connection conn = factory.newConnection();
// 創(chuàng)建消息通道
Channel channel = conn.createChannel();
// 指定隊(duì)列的死信交換機(jī)
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("x-dead-letter-exchange","DEAD_LETTER_EXCHANGE");
// arguments.put("x-expires",9000L); // 設(shè)置隊(duì)列的TTL
// arguments.put("x-max-length", 4); // 如果設(shè)置了隊(duì)列的最大長度,超過長度時(shí),先入隊(duì)的消息會(huì)被發(fā)送到DLX
// 聲明交換機(jī)
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare("ORI_USE_EXCHANGE","direct",false, false, null);
// 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare("ORI_USE_QUEUE", false, false, false, arguments);
// 綁定隊(duì)列和交換機(jī),以及routingKey
channel.queueBind("ORI_USE_QUEUE","ORI_USE_EXCHANGE","ORI_USE_QUEUE");
// 聲明死信交換機(jī)
channel.exchangeDeclare("DEAD_LETTER_EXCHANGE","topic", false, false, false, null);
// 聲明死信隊(duì)列
channel.queueDeclare("DEAD_LETTER_QUEUE", false, false, false, null);
// 綁定,此處 Dead letter routing key 設(shè)置為 #
channel.queueBind("DEAD_LETTER_QUEUE","DEAD_LETTER_EXCHANGE","#");
System.out.println(" Waiting for message....");
// 創(chuàng)建消費(fèi)者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("Received message : '" + msg + "'");
}
};
// 開始獲取消息,消費(fèi)死信隊(duì)列
// String queue, boolean autoAck, Consumer callback
channel.basicConsume("DEAD_LETTER_QUEUE", true, consumer);
}
}
2、基本流程
利用消息的過期時(shí)間,過期之后投遞到死信交換機(jī)(DLX),路由到死信隊(duì)列(DLQ),我們消費(fèi)者監(jiān)聽死信隊(duì)列(DLQ),實(shí)現(xiàn)延遲消息。
消息的流轉(zhuǎn)流程:生產(chǎn)者- 原交換機(jī) - 原隊(duì)列(超過TTL之后) - 死信交換機(jī) - 死信隊(duì)列 - 最終消費(fèi)者。
三、JavaAPI利用插件實(shí)現(xiàn)RabbitMQ延遲消息
1、代碼實(shí)現(xiàn)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 使用延時(shí)插件實(shí)現(xiàn)的消息投遞-消費(fèi)者
* 必須要在服務(wù)端安裝rabbitmq-delayed-message-exchange插件
* 先啟動(dòng)消費(fèi)者
*/
public class DelayPluginConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:admin@192.168.56.10:5672");
// 建立連接
Connection conn = factory.newConnection();
// 創(chuàng)建消息通道
Channel channel = conn.createChannel();
// 聲明x-delayed-message類型的exchange
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-delayed-type", "direct");
channel.exchangeDeclare("DELAY_EXCHANGE", "x-delayed-message", false,
false, argss);
// 聲明隊(duì)列
channel.queueDeclare("DELAY_QUEUE", false,false,false,null);
// 綁定交換機(jī)與隊(duì)列
channel.queueBind("DELAY_QUEUE", "DELAY_EXCHANGE", "DELAY_KEY");
// 創(chuàng)建消費(fèi)者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("收到消息:[" + msg + "]\n接收時(shí)間:" +sf.format(new Date()));
}
};
// 開始獲取消息
// String queue, boolean autoAck, Consumer callback
channel.basicConsume("DELAY_QUEUE", true, consumer);
}
}
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 使用延時(shí)插件實(shí)現(xiàn)的消息投遞-生產(chǎn)者
* 必須要在服務(wù)端安裝rabbitmq-delayed-message-exchange插件
* 先啟動(dòng)消費(fèi)者
*/
public class DelayPluginProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:admin@192.168.56.10:5672");
// 建立連接
Connection conn = factory.newConnection();
// 創(chuàng)建消息通道
Channel channel = conn.createChannel();
// 延時(shí)投遞,比如延時(shí)10秒
Date now = new Date();
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND, +10);// 10秒
Date delayTime = calendar.getTime();
// 定時(shí)投遞,把這個(gè)值替換delayTime即可
// Date exactDealyTime = new Date("2019/01/14,22:30:00");
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String msg = "發(fā)送時(shí)間:" + sf.format(now) + ",投遞時(shí)間:" + sf.format(delayTime);
// 延遲的間隔時(shí)間,目標(biāo)時(shí)刻減去當(dāng)前時(shí)刻
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", delayTime.getTime() - now.getTime());
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
.headers(headers);
channel.basicPublish("DELAY_EXCHANGE", "DELAY_KEY", props.build(),
msg.getBytes());
channel.close();
conn.close();
}
}
2、基本原理
rabbitMQ的延遲消息插件,可以有效的避免消息堵塞問題。文章來源:http://www.zghlxwxcb.cn/news/detail-549091.html
相當(dāng)于投遞給一個(gè)延遲消息的交換機(jī),并指定延遲時(shí)間,大大簡化了開發(fā)。文章來源地址http://www.zghlxwxcb.cn/news/detail-549091.html
四、Springboot利用死信隊(duì)列實(shí)現(xiàn)延遲消息
1、配置實(shí)現(xiàn)
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 死信隊(duì)列 DLX DLQ
*/
@Configuration
public class DlxConfig {
@Bean
public ConnectionFactory connectionFactory() throws Exception {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean("oriUseExchange")
public DirectExchange exchange() {
return new DirectExchange("ORI_USE_EXCHANGE", true, false, new HashMap<>());
}
@Bean("oriUseQueue")
public Queue queue() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 10000); // 10秒鐘后成為死信
map.put("x-dead-letter-exchange", "DEAD_LETTER_EXCHANGE"); // 隊(duì)列中的消息變成死信后,進(jìn)入死信交換機(jī)
return new Queue("ORI_USE_QUEUE", true, false, false, map);
}
@Bean
public Binding binding(@Qualifier("oriUseQueue") Queue queue,@Qualifier("oriUseExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("ori.use");
}
/**
* 隊(duì)列的死信交換機(jī)
* @return
*/
@Bean("deatLetterExchange")
public TopicExchange deadLetterExchange() {
return new TopicExchange("DEAD_LETTER_EXCHANGE", true, false, new HashMap<>());
}
@Bean("deatLetterQueue")
public Queue deadLetterQueue() { // 消費(fèi)者只監(jiān)聽該隊(duì)列即可
return new Queue("DEAD_LETTER_QUEUE", true, false, false, new HashMap<>());
}
@Bean
public Binding bindingDead(@Qualifier("deatLetterQueue") Queue queue,@Qualifier("deatLetterExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("#"); // 無條件路由
}
}
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan(basePackages = "com.dlx.ttl")
public class DlxSender {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(DlxSender.class);
RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// 隨隊(duì)列的過期屬性過期,單位ms
rabbitTemplate.convertAndSend("ORI_USE_EXCHANGE", "ori.use", "測試死信消息");
}
}
五、Springboot利用插件實(shí)現(xiàn)延遲消息
1、配置實(shí)現(xiàn)
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
// 配置類
@Configuration
public class DelayPluginConfig {
@Bean
public ConnectionFactory connectionFactory() throws Exception {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setUri("amqp://admin:admin@192.168.56.10:5672");
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean("delayExchange")
public TopicExchange exchange() {
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-delayed-type", "direct");
return new TopicExchange("DELAY_EXCHANGE", true, false, argss);
}
@Bean("delayQueue")
public Queue deadLetterQueue() {
return new Queue("DELAY_QUEUE", true, false, false, new HashMap<>());
}
@Bean
public Binding bindingDead(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("#"); // 無條件路由
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 消費(fèi)者
*/
public class DelayPluginConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:admin@192.168.56.10:5672");
// 建立連接
Connection conn = factory.newConnection();
// 創(chuàng)建消息通道
Channel channel = conn.createChannel();
// 創(chuàng)建消費(fèi)者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("收到消息:[" + msg + "]\n接收時(shí)間:" +sf.format(new Date()));
}
};
// 開始獲取消息
// String queue, boolean autoAck, Consumer callback
channel.basicConsume("DELAY_QUEUE", true, consumer);
}
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
/**
* 生產(chǎn)者
* 延時(shí)消息插件,去管控臺隊(duì)列看有無收到消息
*/
@ComponentScan(basePackages = "com.dlx.delayplugin")
public class DelayPluginProducer {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(DelayPluginProducer.class);
RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// 延時(shí)投遞,比如延時(shí)4秒
Date now = new Date();
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND, +4);
Date delayTime = calendar.getTime();
// 定時(shí)投遞,把這個(gè)值替換delayTime即可
// Date exactDealyTime = new Date("2019/06/24,22:30:00");
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String msg = "延時(shí)插件測試消息,發(fā)送時(shí)間:" + sf.format(now) + ",理論路由時(shí)間:" + sf.format(delayTime);
MessageProperties messageProperties = new MessageProperties();
// 延遲的間隔時(shí)間,目標(biāo)時(shí)刻減去當(dāng)前時(shí)刻
messageProperties.setHeader("x-delay", delayTime.getTime() - now.getTime());
Message message = new Message(msg.getBytes(), messageProperties);
// 不能在本地測試,必須發(fā)送消息到安裝了插件的服務(wù)端
rabbitTemplate.send("DELAY_EXCHANGE", "#", message);
}
}
到了這里,關(guān)于RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!