第三種模型(Publish/Subscribe 發(fā)布/訂閱)
扇型(funout)交換機(jī)
扇型交換機(jī)將消息路由給綁定到它身上的所有隊(duì)列,而不會(huì)理會(huì)綁定的路由鍵。如果 N 個(gè)隊(duì)列綁定到某個(gè)扇型交換機(jī)上,當(dāng)有消息發(fā)送給此扇型交換機(jī)時(shí),交換機(jī)會(huì)將消息的拷貝分別發(fā)送給這所有的 N 個(gè)隊(duì)列。扇型用來交換機(jī)處理消息的廣播路由。
因?yàn)樯刃徒粨Q機(jī)投遞信息的拷貝到所有綁定到它的隊(duì)列,所以它可以用來在群聊的時(shí)候,分發(fā)消息給參與群聊的用戶。
扇型交換機(jī)圖例:
Public/Subscribe 模型
在這種模式下,消息發(fā)送流程是這樣的:
- 可以有多個(gè)消費(fèi)者;
- 每個(gè)消費(fèi)者有自己的 queue(隊(duì)列)
- 每個(gè)隊(duì)列都要綁定到 Exchange(交換機(jī))
- 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定。
- 交換機(jī)把所有的消息發(fā)送給綁定過的所有隊(duì)列。
- 隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)。
- 開發(fā)生產(chǎn)者
// 將通道聲明指定的交換機(jī)
// 參數(shù)1:交換機(jī)的名稱
// 參數(shù)2:交換機(jī)的類型
channel.exchangeDeclare("logs","fanout");
// 發(fā)送消息
channel.basicPublish("logs","",null,"fanout type message".getBytes());
- 開發(fā)消費(fèi)者
// 通道聲明交換機(jī)
channel.exchangeDeclare("logs","fanout");
// 臨時(shí)隊(duì)列
String queueName = channel.queueDeclare().getQueue();
// 隊(duì)列綁定交換機(jī)
channel.queueBind(queueName,"logs","");
實(shí)例代碼:
Runnable myRunnable = () -> {
try {
Connection conn = RabbitMQUtils.getConnection();
Channel channel = conn.createChannel();
// 通道聲明交換機(jī)
channel.exchangeDeclare("logs", "fanout");
// 臨時(shí)隊(duì)列
String queueName = channel.queueDeclare().getQueue();
// 隊(duì)列綁定交換機(jī)
channel.queueBind(queueName, "logs", "");
// 消費(fèi)消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者" + consumerTag + ":" + new String(body));
}
});
} catch (IOException e) {
e.printStackTrace();
}
};
Thread t1 = new Thread(myRunnable);
Thread t2 = new Thread(myRunnable);
Thread t3 = new Thread(myRunnable);
t1.start();
t2.start();
t3.start();
結(jié)果(有群發(fā)那味了):
綁定
- 綁定是交換機(jī)將消息路由給隊(duì)列所需遵循的規(guī)則。如果要制定交換機(jī)“E”將消息路由給隊(duì)列“Q”,那么“Q”就需要與“E”進(jìn)行綁定。綁定操作需要定義一個(gè)可選的路由鍵(routing key)屬性給某些類型的交換機(jī)(當(dāng)然像這第三種模型這樣的,路由鍵有沒有無所謂的,則不需聲明路由鍵,而前兩種模型下,是自動(dòng)綁定的,交換機(jī)則為默認(rèn)交換機(jī),路由鍵和隊(duì)列名一致的。)。路由鍵的意義在于從發(fā)送給交換機(jī)的眾多消息中選擇出某些消息,將其路由給綁定的隊(duì)列,然后消費(fèi)者再?gòu)年?duì)列中消費(fèi)消息。
第四、第五種模型(Routing、Topics)
第四種模型(Routing)
在第三種模型中,一條消息會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場(chǎng)景下,希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)候就需要用直連交換機(jī)構(gòu)建的路由routing模型了。
圖解:
- P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
- X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給與routing key完全匹配的隊(duì)列。
- C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為error的消息。
- C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning的消息。
- 開發(fā)生產(chǎn)者
// 通過通道聲明交換機(jī)
channel.exchangeDeclare("logs_routing", "direct");
// 發(fā)送消息
String routingKey = "info";
channel.basicPublish("logs_routing", routingKey, null, ("這是direct模型發(fā)布的基于route key:" + routingKey).getBytes());
- 開發(fā)消費(fèi)者
// 通道聲明交換機(jī)以及交換的類型
channel.exchangeDeclare("logs_routing", "direct");
// 創(chuàng)建一個(gè)臨時(shí)隊(duì)列
String queueName = channel.queueDeclare().getQueue();
// 基于route key 綁定隊(duì)列和交換機(jī)
channel.queueBind(queueName, "logs_routing", "error");
主題交換機(jī)(Topic Exchange)
主題交換機(jī)通過對(duì)消息的路由鍵和隊(duì)列到交換機(jī)的綁定模式之間的匹配,將消息路由給一個(gè)或多個(gè)隊(duì)列(注意這是消息給一個(gè)或多個(gè)隊(duì)列,和直連交換機(jī)可不同,直連是路由鍵和隊(duì)列同名,一個(gè)消息對(duì)應(yīng)一個(gè)隊(duì)列這種才是直連)。主題交換機(jī)經(jīng)常用來實(shí)現(xiàn)各種分發(fā)/訂閱模式及其變種。主題交換機(jī)通常用來實(shí)現(xiàn)消息的多播路由(與第三種模型的廣播路由不同)。
主題交換機(jī)擁有非常廣泛的用戶案例。當(dāng)一個(gè)問題涉及到那些想要有針對(duì)性的選擇需要接收消息的多消費(fèi)/多應(yīng)用
的時(shí)候,主題交換機(jī)都可以被列入考慮范圍。
第五種模型(Topics)
主題交換機(jī)和直連交換機(jī)相比,都是可以根據(jù) routingkey 把消息路由到不同的隊(duì)列。只不過主題交換機(jī)可以讓隊(duì)列在綁定 routingkey 的時(shí)候使用通配符。這種模型 routingkey 一般都是由一個(gè)或者多個(gè)單詞組成,多個(gè)單詞之間以.
分割,例如:item.insert
通配符
* (star) can substitute for exactly one word. 匹配一個(gè)單詞
# (hash) can substitute for zero or more words. 匹配一個(gè)或多個(gè)單詞
如:
audit.# 匹配audit.irs.corporate 或者 audit.irs 等
audit.* 只能匹配audit.irs
- 生產(chǎn)者
// 聲明交換機(jī)以及交換機(jī)類型
channel.exchangeDeclare("topics", "topic");
// 發(fā)布消息
String routeKey = "user.delete";
channel.basicPublish("topics", routeKey, null, ("這里是topic動(dòng)態(tài)路由模型,route key:" + routeKey).getBytes());
- 消費(fèi)者
// 生命交換機(jī)以及交換機(jī)類型
channel.exchangeDeclare("topics", "topic");
// 創(chuàng)建一個(gè)臨時(shí)隊(duì)列
String queueName = channel.queueDeclare().getQueue();
// 綁定隊(duì)列和交換機(jī) 動(dòng)態(tài)通配符形式 route key
channel.queueBind(queueName, "topics", "use.*");
案例結(jié)果文章來源:http://www.zghlxwxcb.cn/news/detail-621813.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-621813.html
到了這里,關(guān)于【RabbitMQ(day3)】扇形交換機(jī)和主題交換機(jī)的應(yīng)用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!