前言
通過本篇博客能夠簡單使用RabbitMQ的主題模式。
本篇博客主要是博主通過官網(wǎng)總結(jié)出的RabbitMQ主題模式。其中如果有誤歡迎大家及時(shí)指正。
什么是Topic模式
Topic模式與Direct模式相比,他們都可以根據(jù)Routing key把消息路由到對(duì)應(yīng)的隊(duì)列上,但是Topic模式相較于Direct來說,它可以基于多個(gè)標(biāo)準(zhǔn)進(jìn)行路由。也就是在隊(duì)列綁定Routing key的時(shí)候使用通配符。這使我們相較于Direct模式靈活性更大。
使用Topic模式的要點(diǎn)
routing key必須是由"."進(jìn)行分隔的單詞列表,最大限制為255字節(jié)
通配符規(guī)則
- "*"可以代替一個(gè)單詞。
- "#"可以代替零個(gè)或多個(gè)單詞。
示例
創(chuàng)建了三個(gè)綁定:Q1綁定了綁定鍵“.orange”。和Q2的".*.rabbit"和“l(fā)azy.#”。
1.一個(gè)消息的路由鍵為"quick.orange.rabbit" 時(shí),它將會(huì)被送到隊(duì)列Q1和Q2。
2.一個(gè)消息的路由鍵為"quick.orange.fox"時(shí),它將會(huì)背誦到隊(duì)列Q1
3.一個(gè)消息的路由鍵為"lazy.brown.fox"時(shí),它將被送到隊(duì)列Q2
4.一個(gè)消息的路由鍵為"quick.brown.fox",沒有匹配任何隊(duì)列,消息將會(huì)丟失。
5.一個(gè)消息的路由鍵為"lazy.orange.new.rabbit",它將被送到隊(duì)列Q2.
6.一個(gè)消息的路由鍵為"orang"或者"quick.orange.new.rabbit"沒有匹配到任何隊(duì)列消息將丟失。
代碼示例
Pom文件引入RabbtiMQ依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
RabbitMQ工具類
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : RabbitMQUtils
* @description : [rabbitmq工具類]
* @createTime : [2023/1/17 8:49]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 8:49]
* @updateRemark : [描述說明本次修改內(nèi)容]
*/
public class RabbitMQUtils {
/*
* @version V1.0
* Title: getConnection
* @author Wangwei
* @description 創(chuàng)建rabbitmq連接
* @createTime 2023/1/17 8:52
* @param []
* @return com.rabbitmq.client.Connection
*/
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setPort(5672);
factory.setVirtualHost("虛擬主機(jī)");
factory.setUsername("用戶名");
factory.setPassword("密碼");
//創(chuàng)建連接
Connection connection=factory.newConnection();
return connection;
}
/*
* @version V1.0
* Title: getChannel
* @author Wangwei
* @description 創(chuàng)建信道
* @createTime 2023/1/17 8:55
* @param []
* @return com.rabbitmq.client.Channel
*/
public static Channel getChannel() throws IOException, TimeoutException {
Connection connection=getConnection();
Channel channel=connection.createChannel();
return channel;
}
}
生產(chǎn)者
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : Producer
* @description : [生產(chǎn)者]
* @createTime : [2023/2/1 9:38]
* @updateUser : [WangWei]
* @updateTime : [2023/2/1 9:38]
* @updateRemark : [描述說明本次修改內(nèi)容]
*/
public class Producer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連接
RabbitMQUtils.getConnection();
//聲明通道
Channel channel = RabbitMQUtils.getChannel();
//創(chuàng)建topic類型交換機(jī)并命名為logs
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//聲明routingKey
String severityInfo="info.log.test";
String severityError="error.test";
String severityError2="log.error.test";
//循環(huán)發(fā)送2條消息
for (int i = 0; i <2 ; i++) {
String msg="info.log.test:"+i;
/*推送消息
*交換機(jī)命名,不填寫使用默認(rèn)的交換機(jī)
* routingKey -路由鍵-
* props:消息的其他屬性-路由頭等正文
* msg消息正文
*/
channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));
System.out.println(msg);
}
//循環(huán)發(fā)送2條消息
for (int i = 0; i <2 ; i++) {
String msg="主題模式error.test:"+i;
/*推送消息
*交換機(jī)命名,不填寫使用默認(rèn)的交換機(jī)
* routingKey -路由鍵-
* props:消息的其他屬性-路由頭等正文
* msg消息正文
*/
channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));
System.out.println(msg);
}
//循環(huán)發(fā)送2條消息
for (int i = 0; i <2 ; i++) {
String msg="log.error.test:"+i;
/*推送消息
*交換機(jī)命名,不填寫使用默認(rèn)的交換機(jī)
* routingKey -路由鍵-
* props:消息的其他屬性-路由頭等正文
* msg消息正文
*/
channel.basicPublish(EXCHANGE_NAME,severityError2,null,msg.getBytes(StandardCharsets.UTF_8));
System.out.println(msg);
}
}
}
消費(fèi)者1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerOne
* @description : [消費(fèi)者1]
* @createTime : [2023/2/1 9:39]
* @updateUser : [WangWei]
* @updateTime : [2023/2/1 9:39]
* @updateRemark : [描述說明本次修改內(nèi)容]
*/
public class ConsumerOne {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = channel.queueDeclare().getQueue();
//聲明routingKey (error)
String severityError="error.*";
//交換機(jī)與隊(duì)列進(jìn)行綁定-如果沒有隊(duì)列與交換機(jī)進(jìn)行綁定,那么消費(fèi)者接受不到生產(chǎn)者的消息,消息會(huì)丟失
//queueName綁定了direct_logs交換機(jī)并且綁定了routingKey
channel.queueBind(queueName, EXCHANGE_NAME,severityError );
//因?yàn)镽abbitmq服務(wù)器將異步地向我們推送消息,所以我們以對(duì)象的形式提供了一個(gè)回調(diào),該回調(diào)將緩沖消息,直到我們準(zhǔn)備好使用它們。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
消費(fèi)者2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerTwo
* @description : [消費(fèi)者2]
* @createTime : [2023/2/1 9:38]
* @updateUser : [WangWei]
* @updateTime : [2023/2/1 9:38]
* @updateRemark : [描述說明本次修改內(nèi)容]
*/
public class ConsumerTwo {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
//創(chuàng)建fanout類型交換機(jī)并命名為logs
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//創(chuàng)建了一個(gè)非持久的、排他的、自動(dòng)刪除的隊(duì)列,并生成了一個(gè)名稱
String queueName = channel.queueDeclare().getQueue();
//聲明routingKey (info,error,warning)
String severityInfo="info.#";
String severityError="*.error.*";
//交換機(jī)與隊(duì)列進(jìn)行綁定-如果沒有隊(duì)列與交換機(jī)進(jìn)行綁定,那么消費(fèi)者接受不到生產(chǎn)者的消息,消息會(huì)丟失
//queueName綁定了direct_logs交換機(jī)并且綁定了3個(gè)routingKey
channel.queueBind(queueName, EXCHANGE_NAME,severityInfo );
channel.queueBind(queueName, EXCHANGE_NAME,severityError );
//因?yàn)镽abbitmq服務(wù)器將異步地向我們推送消息,所以我們以對(duì)象的形式提供了一個(gè)回調(diào),該回調(diào)將緩沖消息,直到我們準(zhǔn)備好使用它們。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
效果
文章來源:http://www.zghlxwxcb.cn/news/detail-696423.html
總結(jié)
通過使用通配符實(shí)現(xiàn)靈活性的應(yīng)用有很多,例如nginx的請(qǐng)求轉(zhuǎn)發(fā),gateway為請(qǐng)求過濾等等都是使用了統(tǒng)配符的技術(shù)。通過這種聯(lián)想來對(duì)知識(shí)進(jìn)行結(jié)構(gòu)化,找相同和不同,思考能力和學(xué)習(xí)力也會(huì)有很大的提高。文章來源地址http://www.zghlxwxcb.cn/news/detail-696423.html
到了這里,關(guān)于【RabbitMQ六】——RabbitMQ主題模式(Topic)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!