MQ高級特性
1.削峰
設(shè)置 消費者
測試 添加多條消息
拉取消息 每隔20秒拉取一次 一次拉取五條 然后在20秒內(nèi)一條一條消費
TTL
Time To Live(存活時間/過期時間)。
當(dāng)消息到達存活時間后,還沒有被消費,會被自動清除。
RabbitMQ可以對消息設(shè)置過期時間,也可以對整個隊列(Queue)設(shè)置過期時間。
可以在管理臺新建隊列、交換機,綁定
1.圖形化操作
添加隊列
添加交換機
將交換機和對應(yīng)的隊列進行綁定
時間結(jié)束 , 消息失效
2.代碼實現(xiàn)
配置 生產(chǎn)者
@Configuration public class TopicMqTtlConfig { @Value("${mq.exchange.name}") private String EXCHANGENAME; @Value("${mq.queue.name1}") private String QUEUENAME1; @Value("${mq.queue.name2}") private String QUEUENAME2; // 1 // . 交換機 @Bean("ex1") public Exchange getExchange(){ Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build(); return exchange; } // 2。 隊列 @Bean("queue1") public Queue getQueue1(){ Queue queue = QueueBuilder.nonDurable(QUEUENAME1) .withArgument("x-message-ttl",30000)//過期時間30秒 .withArgument("x-max-length",10)//隊列中最多接收10條消息超過10條的部分廢棄 .build(); return queue; } @Bean("queue2") public Queue getQueue2(){ Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2) .withArgument("x-message-ttl",300000000)//過期時間30秒 .build(); return queue2; } // 3. 交換機和隊列進行綁定 @Bean("binding1") public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){ Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("ttl1.*").noargs(); return binding1; } @Bean("binding2") public Binding bindQueue2ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){ Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("ttl2.#").noargs(); return binding2; } }
測試
添加成功 ttl1只接收10條
時間過期
死信隊列
死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機,因為其他MQ產(chǎn)品中沒有交換機的概念),當(dāng)消息成為Dead message后,可以被重新發(fā)送到另一個交換機,這個交換機就是DLX。
比如消息隊列的消息過期,如果綁定了死信交換器,那么該消息將發(fā)送給死信交換機
消息在什么情況下會成為死信?(面試會問)
1.隊列消息長度到最大的限制
最大的長度設(shè)置為10當(dāng)?shù)?1條消息進來的時候就會成為死信
2. 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊列,requeue=false(不重新回到隊列中)
設(shè)置消費者為手動簽收的狀態(tài)
3. 原隊列存在消息過期設(shè)置,消息到達超時時間未被消費;
隊列綁定交換機的方式是什么?
給隊列設(shè)置參數(shù): x-dead-letter-exchange 和 x-dead-letter-routing-key
//?1.??交換機??:正常的交換機???死信交換機
//?2.隊列??:正常的??死信
//3.綁定???正常ex?-?正常的que
正常的que和死信交換機
死信ex-死信queue
2.代碼實現(xiàn)
@Configuration public class TopicMqDeadConfig { @Value("${mq1.exchange.name1}") private String EXCHANGENAME; @Value("${mq1.exchange.name2}") private String DEADEXCHANGE; @Value("${mq1.queue.name1}") private String QUEUENAME1; @Value("${mq1.queue.name2}") private String QUEUENAME2; // 聲明正常交換機 @Bean("ex1") public Exchange getExchange(){ Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build(); return exchange; } // 正常隊列 @Bean("queue1") public Queue getQueue1(){ Queue queue = QueueBuilder.nonDurable(QUEUENAME1) .withArgument("x-message-ttl",30000)//過期時間30秒 .withArgument("x-dead-letter-exchange",DEADEXCHANGE) .withArgument("x-dead-letter-routing-key","dead.test")//將正常隊列與死信交換機,死信隊列綁定 //.withArgument("x-max-length",10)//隊列中最多接收10條消息超過10條的部分廢棄 .build(); return queue; } // 交換機和隊列進行綁定 @Bean("binding1") public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){ Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs(); return binding1; } // 聲明死信交換機 @Bean("ex2") public Exchange getDeadExchange(){ Exchange exchange = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build(); return exchange; } //死信隊列 @Bean("queue2") public Queue getQueue2(){ Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2) .build(); return queue2; } // 死信交換機和死信隊列進行綁定 @Bean("binding2") public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange,@Qualifier("queue2") Queue queue){ Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs(); return binding2; } }
測試
如果程序出現(xiàn)錯誤 拒絕簽收
監(jiān)聽正常隊列
發(fā)送消息 啟動測試
總結(jié):
1. 死信交換機和死信隊列和普通的沒有區(qū)別
2. 當(dāng)消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
3. 消息成為死信的三種情況:
????????1. 隊列消息長度到達限制;
????????2. 消費者拒接消費消息,并且不重回隊列;
????????3. 原隊列存在消息過期設(shè)置,消息到達超時時間未被消費;
?延遲隊列
延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
需求:
- 1. 下單后,30分鐘未支付,取消訂單,回滾庫存
- 2. 新用戶注冊成功7天后,發(fā)送短信問候。
實現(xiàn)方式:
1. 定時器
2. 死信隊列
在RabbitMQ中并未提供延遲隊列功能。但是可以使用:TTL+死信隊列
組合實現(xiàn)延遲隊列的效果。
1.配置
添加依賴
<!--2. rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--nacos 配置中心--> <!--配置中心--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <!-- application bootstrap --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> </dependency> <!-- nacos--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>com.example</groupId> <artifactId>sys-comm</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
?
修改配置
2.代碼實現(xiàn)
創(chuàng)建實體類
發(fā)送消息 測試
過期后放入死信隊列
添加依賴
<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.16</version> </dependency>
將json數(shù)據(jù)轉(zhuǎn)化為對象
獲取成功
3.連接數(shù)據(jù)庫
創(chuàng)建表
創(chuàng)建測試類
@RestController @RequestMapping("order") public class OrderController { @Value("${mq1.exchange.name1}") private String EXCHANGENAME; // @Resource private RabbitTemplate rabbitTemplate; @GetMapping public Result aaa(TabOrder order){ //1. 消息 存放到mq里面 String s = JSONUtil.toJsonStr(order); // openfeign -- 數(shù)據(jù)添加到數(shù)據(jù)庫里面 rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", s); return Result.success(s); } }
監(jiān)聽normal
import javax.annotation.Resource; @Component public class XiaoFeng implements ChannelAwareMessageListener { @Resource private TabOrderMapper orderMapper; @Override @RabbitListener(queues = "test_queue_normal") public void onMessage(Message message, Channel channel) throws Exception { //Thread.sleep(2000);// 20s byte[] body = message.getBody(); String s = new String(body); System.out.println(s); // 將字符串轉(zhuǎn)化為 對象 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ TabOrder order = JSONUtil.toBean(s, TabOrder.class); // 將訂單的信息 報訊到數(shù)據(jù)庫里面 int insert = orderMapper.insert(order); channel.basicAck(deliveryTag,true); // }catch(Exception e){ //long deliveryTag, boolean multiple, boolean requeue System.out.println("拒絕簽收消息"); channel.basicNack(deliveryTag,true,false);// 死信消息 } } }
監(jiān)聽dead
@Component public class YanChi implements ChannelAwareMessageListener { @Resource private TabOrderMapper orderMapper; @Override @RabbitListener(queues = "test_queue_dead") public void onMessage(Message message, Channel channel) throws Exception { //Thread.sleep(2000);// 20s byte[] body = message.getBody(); String s = new String(body); System.out.println(s); // 將字符串轉(zhuǎn)化為 對象 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ TabOrder order = JSONUtil.toBean(s, TabOrder.class); // order 的狀態(tài) TabOrder tabOrder = orderMapper.selectById(order.getId()); if(tabOrder.getStatus()==1){ // 取消 tabOrder.setStatus(3); } orderMapper.updateById(tabOrder); channel.basicAck(deliveryTag,true); // }catch(Exception e){ //long deliveryTag, boolean multiple, boolean requeue System.out.println("拒絕簽收消息"); channel.basicNack(deliveryTag,true,false);// 死信消息 } } }
測試
成功文章來源:http://www.zghlxwxcb.cn/news/detail-793272.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-793272.html
到了這里,關(guān)于RabbitMQ高級特性2 、TTL、死信隊列和延遲隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!