主題模式
官方文檔參考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html
使用topic類型的交換器,隊(duì)列綁定到交換器、bingingKey時(shí)使用通配符,交換器將消息路由轉(zhuǎn)發(fā)到具體隊(duì)列時(shí),會(huì)根據(jù)消息routingKey模糊匹配,比較靈活。
在Direct類型的交換器做到了根據(jù)日志級(jí)別的不同,將消息發(fā)送給了不同隊(duì)列的。
這里再加入一個(gè)需求,不僅想根據(jù)日志級(jí)別進(jìn)行劃分,還想根據(jù)日志的來源分日志,如何來做呢?
使用topic類型的交換器, routingKey就不能隨便寫了,它必須是點(diǎn)分單詞,單詞可以隨便寫,一般按消息的特征,該點(diǎn)分單詞字符串最長(zhǎng)255字節(jié)。
bindingKey也必須是這種形式。top類型的交換器背后原理跟direct類型類似只要隊(duì)列的bingingkey的值與消息的routingKey的匹配,隊(duì)列就可以收到該消息。有兩個(gè)不同
- * (star)匹配一個(gè)單詞。
- # 匹配0到多個(gè)單詞。
上報(bào)的數(shù)據(jù)的RoutingKey,格式如下
地區(qū).業(yè)務(wù).日志級(jí)別 如shanghai.busi.INFO 、 hangzhou.line.ERROR
生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
public class Product {
private static final String[] ADDRESS_ARRAYS = {"shanghai", "suzhou", "hangzhou"};
private static final String[] BUSI_NAMES = {"product", "user", "schedule"};
private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定義交換機(jī)
channel.exchangeDeclare(
"ex.busi.topic",
BuiltinExchangeType.TOPIC,
// 持久化標(biāo)識(shí)
false,
// 是否自動(dòng)刪除
false,
// 屬性信息
null);
for (int i = 0; i < 50; i++) {
String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];
String busiName = BUSI_NAMES[ThreadLocalRandom.current().nextInt(0, BUSI_NAMES.length)];
String address =
ADDRESS_ARRAYS[ThreadLocalRandom.current().nextInt(0, ADDRESS_ARRAYS.length)];
String routingKey = address + "." + busiName + "." + level;
String pushMsg = "地址[" + address + "],業(yè)務(wù)[" + busiName + "],級(jí)別[" + level + "],消息:" + i;
channel.basicPublish(
"ex.busi.topic", routingKey, null, pushMsg.getBytes(StandardCharsets.UTF_8));
}
}
}
上海的消費(fèi)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
/**
* 上海地區(qū)的消費(fèi)都,獲取所有的上海信息
*/
public class ShangHaiConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定義交換機(jī)
channel.exchangeDeclare(
"ex.busi.topic",
BuiltinExchangeType.TOPIC,
// 持久化標(biāo)識(shí)
false,
// 是否自動(dòng)刪除
false,
// 屬性信息
null);
// 定義隊(duì)列
channel.queueDeclare(
"shanghai.all.log",
// 持久化存儲(chǔ)
true,
// 排他
false,
// 自動(dòng)刪除
true,
// 屬性
null);
// 將隊(duì)列與交換機(jī)進(jìn)行綁定
channel.queueBind("shanghai.all.log", "ex.busi.topic", "shanghai.#", null);
channel.basicConsume(
"shanghai.all.log",
(consumerTag, message) -> {
String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("shanghai consumer 收到數(shù)據(jù):" + dataMsg);
},
consumerTag -> {});
}
}
所有錯(cuò)誤日志的消費(fèi)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class ErrorLogConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定義交換機(jī)
channel.exchangeDeclare(
"ex.busi.topic",
BuiltinExchangeType.TOPIC,
// 持久化標(biāo)識(shí)
false,
// 是否自動(dòng)刪除
false,
// 屬性信息
null);
// 定義隊(duì)列
channel.queueDeclare(
"log.all.error",
// 持久化存儲(chǔ)
true,
// 排他
false,
// 自動(dòng)刪除
true,
// 屬性
null);
// 將隊(duì)列與交換機(jī)進(jìn)行綁定
channel.queueBind("log.all.error", "ex.busi.topic", "#.ERROR", null);
channel.basicConsume(
"log.all.error",
(consumerTag, message) -> {
String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("錯(cuò)誤日志 consumer 收到數(shù)據(jù):" + dataMsg);
},
consumerTag -> {});
}
}
蘇州用戶的消費(fèi)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class SuZhouUserConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定義交換機(jī)
channel.exchangeDeclare(
"ex.busi.topic",
BuiltinExchangeType.TOPIC,
// 持久化標(biāo)識(shí)
false,
// 是否自動(dòng)刪除
false,
// 屬性信息
null);
// 定義隊(duì)列
channel.queueDeclare(
"suzhou.user.consumer",
// 持久化存儲(chǔ)
true,
// 排他
false,
// 自動(dòng)刪除
true,
// 屬性
null);
// 將隊(duì)列與交換機(jī)進(jìn)行綁定
channel.queueBind("suzhou.user.consumer", "ex.busi.topic", "suzhou.user.*", null);
channel.basicConsume(
"suzhou.user.consumer",
(consumerTag, message) -> {
String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("suzhou consumer 收到數(shù)據(jù):" + dataMsg);
},
consumerTag -> {});
}
}
首先啟動(dòng)三個(gè)消費(fèi)者,查看隊(duì)列和交換器的信息
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ ex.busi.topic │ topic │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ ex.routing │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌───────────────┬─────────────┬──────────────────────┬──────────────────┬──────────────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ │ exchange │ suzhou.user.consumer │ queue │ suzhou.user.consumer │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ │ exchange │ shanghai.all.log │ queue │ shanghai.all.log │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ │ exchange │ log.all.error │ queue │ log.all.error │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange │ log.all.error │ queue │ #.ERROR │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange │ shanghai.all.log │ queue │ shanghai.# │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange │ suzhou.user.consumer │ queue │ suzhou.user.* │ │
└───────────────┴─────────────┴──────────────────────┴──────────────────┴──────────────────────┴───────────┘
[root@nullnull-os ~]#
觀察可以發(fā)現(xiàn),此隊(duì)列與消息的綁定已經(jīng)成功。接下使用生產(chǎn)者發(fā)送消息。觀察控制臺(tái)輸出:
錯(cuò)誤日志消費(fèi)者
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:6
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[product],級(jí)別[ERROR],消息:8
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[product],級(jí)別[ERROR],消息:10
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:12
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:15
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[hangzhou],業(yè)務(wù)[user],級(jí)別[ERROR],消息:16
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:17
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[product],級(jí)別[ERROR],消息:18
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[hangzhou],業(yè)務(wù)[user],級(jí)別[ERROR],消息:21
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[product],級(jí)別[ERROR],消息:22
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[product],級(jí)別[ERROR],消息:24
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[hangzhou],業(yè)務(wù)[product],級(jí)別[ERROR],消息:28
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:33
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[hangzhou],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:39
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[user],級(jí)別[ERROR],消息:40
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[user],級(jí)別[ERROR],消息:43
錯(cuò)誤日志 consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:46
上海地區(qū)的消費(fèi)者
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[WARN],消息:0
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[user],級(jí)別[INFO],消息:1
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[INFO],消息:2
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[WARN],消息:5
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[product],級(jí)別[ERROR],消息:10
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:12
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:17
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[product],級(jí)別[ERROR],消息:18
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[product],級(jí)別[ERROR],消息:22
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[product],級(jí)別[ERROR],消息:24
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[user],級(jí)別[INFO],消息:32
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[INFO],消息:35
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[product],級(jí)別[INFO],消息:38
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[WARN],消息:41
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[schedule],級(jí)別[ERROR],消息:46
shanghai consumer 收到數(shù)據(jù):地址[shanghai],業(yè)務(wù)[user],級(jí)別[INFO],消息:48
蘇州用戶的消費(fèi)者文章來源:http://www.zghlxwxcb.cn/news/detail-685134.html
suzhou consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[user],級(jí)別[WARN],消息:37
suzhou consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[user],級(jí)別[ERROR],消息:40
suzhou consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[user],級(jí)別[ERROR],消息:43
suzhou consumer 收到數(shù)據(jù):地址[suzhou],業(yè)務(wù)[user],級(jí)別[WARN],消息:45
至此topic模式操作成功。文章來源地址http://www.zghlxwxcb.cn/news/detail-685134.html
到了這里,關(guān)于RabbitMQ工作模式-主題模式的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!