官方文檔參考:https://www.rabbitmq.com/tutorials/tutorial-four-python.html
使用direct
類型的Exchange,發(fā)N條消息并使用不同的routingKey,消費(fèi)者定義隊(duì)列并將隊(duì)列routingKey
、Exchange綁定。此時(shí)使用direct
模式Exchange必須要routingKey
完成匹配的情況下消息才會(huì)轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中被消費(fèi)。
樣例使用日志分發(fā)為樣例。即按日志不同的級(jí)別,分發(fā)到不同的隊(duì)列。每個(gè)隊(duì)列只處理自己的對(duì)應(yīng)的級(jí)別日志。
創(chuàng)建生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
public class Product {
private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明交換機(jī),交換器和消息隊(duì)列的綁定不需要在這里處理。
channel.exchangeDeclare(
"ex.routing",
BuiltinExchangeType.DIRECT,
// 持久的標(biāo)識(shí)
false,
// 自動(dòng)刪除的標(biāo)識(shí)
false,
// 屬性
null);
for (int i = 0; i < 30; i++) {
String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];
String dataMsg = "[" + level + "] 消息發(fā)送 :" + i;
// 發(fā)送消息
channel.basicPublish("ex.routing", level, null, dataMsg.getBytes(StandardCharsets.UTF_8));
}
}
}
創(chuàng)建ERROR的消費(fèi)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class ErrorConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明隊(duì)列并綁定
channel.exchangeDeclare(
"ex.routing",
BuiltinExchangeType.DIRECT,
// 持久的標(biāo)識(shí)
false,
// 自動(dòng)刪除的標(biāo)識(shí)
false,
// 屬性
null);
// 此也可以聲明為臨時(shí)隊(duì)列,但是如果消息很重要,不要聲明臨時(shí)隊(duì)列。
channel.queueDeclare(
"log.error",
// 永久
false,
// 排他
false,
// 自動(dòng)刪除
true,
// 屬性
null);
//消費(fèi)者享有綁定到交換器的權(quán)力。
channel.queueBind("log.error", "ex.routing", "ERROR");
// 通過(guò)chanel消費(fèi)消息
channel.basicConsume(
"log.error",
(consumerTag, message) -> {
System.out.println("ERROR收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
},
consumerTag -> {});
}
}
創(chuàng)建INFO級(jí)的消費(fèi)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class InfoConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明隊(duì)列并綁定
channel.exchangeDeclare(
"ex.routing",
BuiltinExchangeType.DIRECT,
// 持久的標(biāo)識(shí)
false,
// 自動(dòng)刪除的標(biāo)識(shí)
true,
// 屬性
null);
// 此也可以聲明為臨時(shí)隊(duì)列,但是如果消息很重要,不要聲明臨時(shí)隊(duì)列。
channel.queueDeclare(
"log.info",
// 永久
false,
// 排他
false,
// 自動(dòng)刪除
false,
// 屬性
null);
//消費(fèi)者享有綁定到交換器的權(quán)力。
channel.queueBind("log.info", "ex.routing", "INFO");
// 通過(guò)chanel消費(fèi)消息
channel.basicConsume(
"log.info",
(consumerTag, message) -> {
System.out.println("INFO收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
},
consumerTag -> {});
}
}
創(chuàng)建WARN級(jí)別的消息者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class WarnConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明隊(duì)列并綁定
channel.exchangeDeclare(
"ex.routing",
BuiltinExchangeType.DIRECT,
// 持久的標(biāo)識(shí)
false,
// 自動(dòng)刪除的標(biāo)識(shí)
false,
// 屬性
null);
// 此也可以聲明為臨時(shí)隊(duì)列,但是如果消息很重要,不要聲明臨時(shí)隊(duì)列。
channel.queueDeclare(
"log.warn",
// 永久
false,
// 排他
false,
// 自動(dòng)刪除
true,
// 屬性
null);
//消費(fèi)者享有綁定到交換器的權(quán)力。
channel.queueBind("log.warn", "ex.routing", "WARN");
// 通過(guò)chanel消費(fèi)消息
channel.basicConsume(
"log.warn",
(consumerTag, message) -> {
System.out.println("warn收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
},
consumerTag -> {});
}
}
首先啟動(dòng)三個(gè)消費(fèi)者:
查看隊(duì)列及交換機(jī)情況
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ ex.routing │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬──────────────────┬──────────────────┬─────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ │ exchange │ log.info │ queue │ log.info │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ │ exchange │ log.warn │ queue │ log.warn │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ │ exchange │ log.error │ queue │ log.error │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing │ exchange │ log.error │ queue │ ERROR │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing │ exchange │ log.info │ queue │ INFO │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing │ exchange │ log.warn │ queue │ WARN │ │
└─────────────┴─────────────┴──────────────────┴──────────────────┴─────────────┴───────────┘
[root@nullnull-os ~]#
可以發(fā)現(xiàn),交換器ex.routing 綁定了三個(gè)隊(duì)列log.error
、log.info
、log.warn
并指定了路由鍵。
啟動(dòng)消費(fèi)者,查看消息通否被正常消費(fèi)。
ERROR的消費(fèi)者控制臺(tái)輸出
ERROR收到的消息:[ERROR] 消息發(fā)送 :1
ERROR收到的消息:[ERROR] 消息發(fā)送 :2
ERROR收到的消息:[ERROR] 消息發(fā)送 :6
ERROR收到的消息:[ERROR] 消息發(fā)送 :8
ERROR收到的消息:[ERROR] 消息發(fā)送 :9
ERROR收到的消息:[ERROR] 消息發(fā)送 :11
ERROR收到的消息:[ERROR] 消息發(fā)送 :15
ERROR收到的消息:[ERROR] 消息發(fā)送 :16
ERROR收到的消息:[ERROR] 消息發(fā)送 :19
ERROR收到的消息:[ERROR] 消息發(fā)送 :20
ERROR收到的消息:[ERROR] 消息發(fā)送 :21
ERROR收到的消息:[ERROR] 消息發(fā)送 :23
ERROR收到的消息:[ERROR] 消息發(fā)送 :24
ERROR收到的消息:[ERROR] 消息發(fā)送 :27
ERROR收到的消息:[ERROR] 消息發(fā)送 :28
INFO的消費(fèi)者控制臺(tái)輸出:
INFO收到的消息:[INFO] 消息發(fā)送 :0
INFO收到的消息:[INFO] 消息發(fā)送 :3
INFO收到的消息:[INFO] 消息發(fā)送 :4
INFO收到的消息:[INFO] 消息發(fā)送 :13
INFO收到的消息:[INFO] 消息發(fā)送 :14
INFO收到的消息:[INFO] 消息發(fā)送 :22
INFO收到的消息:[INFO] 消息發(fā)送 :25
WARN的消費(fèi)都控制臺(tái)輸出:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-691625.html
warn收到的消息:[WARN] 消息發(fā)送 :5
warn收到的消息:[WARN] 消息發(fā)送 :7
warn收到的消息:[WARN] 消息發(fā)送 :10
warn收到的消息:[WARN] 消息發(fā)送 :12
warn收到的消息:[WARN] 消息發(fā)送 :17
warn收到的消息:[WARN] 消息發(fā)送 :18
warn收到的消息:[WARN] 消息發(fā)送 :26
warn收到的消息:[WARN] 消息發(fā)送 :29
至此,驗(yàn)證已經(jīng)完成。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-691625.html
到了這里,關(guān)于RabbitMQ工作模式-路由模式的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!