簡(jiǎn)介
發(fā)布訂閱模式允許一個(gè)生產(chǎn)者向多個(gè)消費(fèi)者發(fā)送消息。在RabbitMQ中實(shí)現(xiàn)發(fā)布訂閱模式通常涉及以下幾個(gè)關(guān)鍵組件:
- 生產(chǎn)者:負(fù)責(zé)生產(chǎn)并發(fā)送消息到RabbitMQ的Exchange(路由器)。
- Exchange:作為消息的中轉(zhuǎn)站,根據(jù)不同的規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列。
- 隊(duì)列:存儲(chǔ)消息的緩沖區(qū),每個(gè)消費(fèi)者有自己的獨(dú)立隊(duì)列。
- 消費(fèi)者:從自己的隊(duì)列中接收并消費(fèi)消息。
在這種模式下,生產(chǎn)者發(fā)送的消息不是直接發(fā)送到特定的隊(duì)列,而是發(fā)送給Exchange。Exchange根據(jù)配置的規(guī)則決定如何處理這些消息。例如,它可以將消息路由到一個(gè)特定的隊(duì)列,也可以將消息路由到多個(gè)隊(duì)列,或者在某些情況下廢棄消息。
在實(shí)際應(yīng)用中,發(fā)布訂閱模式常用于構(gòu)建實(shí)時(shí)通信系統(tǒng)、通知服務(wù)、日志系統(tǒng)等場(chǎng)景,其中多個(gè)消費(fèi)者需要接收來自同一生產(chǎn)者的消息。這種模式的優(yōu)勢(shì)在于能夠?qū)崿F(xiàn)一對(duì)多的通信,使得消息的分發(fā)更加靈活和高效。
?生產(chǎn)者代碼
在前面的模式中,我們使用了channel.QueueDeclare()來聲明隊(duì)列。這里不需要了
channel.QueueDeclare("hello", true, false, false, null);
在發(fā)布訂閱模式中,生產(chǎn)者只需要將消息發(fā)送到交換機(jī)上,然后由交換機(jī)根據(jù)綁定規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列中。消費(fèi)者則可以從自己的隊(duì)列中獲取并處理這些消息。
因此,我們這里只聲明一個(gè)扇形交換機(jī),并將消息發(fā)布到該交換機(jī)上即可。而具體的隊(duì)列聲明和綁定操作可以在消費(fèi)者端進(jìn)行。
在前面的第一章和第二章中,我們都沒有聲明交換機(jī),這是因?yàn)镽abbitMQ中有一個(gè)默認(rèn)的交換機(jī),稱為空字符串名稱的默認(rèn)交換機(jī)。當(dāng)生產(chǎn)者發(fā)送消息但沒有指定交換機(jī)時(shí),消息會(huì)被發(fā)送到這個(gè)默認(rèn)交換機(jī)。同樣,當(dāng)創(chuàng)建隊(duì)列但沒有指定隊(duì)列與交換機(jī)的綁定關(guān)系時(shí),隊(duì)列會(huì)自動(dòng)綁定到默認(rèn)交換機(jī)上。RabbitMQ提供了幾種內(nèi)置的交換機(jī)類型,如直接交換(direct)、扇形交換(fanout)、主題交換(topic)和頭交換(headers)等,以滿足不同的業(yè)務(wù)場(chǎng)景需求。
class MyClass
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost"; //RabbitMQ服務(wù)在本地運(yùn)行
factory.UserName = "guest"; //用戶名
factory.Password = "guest"; //密碼
//創(chuàng)建連接
using (var connection = factory.CreateConnection())
{
//創(chuàng)建通道
using (var channel = connection.CreateModel())
{
//聲明了一個(gè)扇形交換機(jī)(fanout exchange),命名為"hello"
channel.ExchangeDeclare("hello", "fanout");
string msg;
Console.WriteLine("請(qǐng)輸入要發(fā)送的消息內(nèi)容:");
while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
{
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish("hello", "", null, body); //開始傳遞
Console.WriteLine("已發(fā)送: {0}", msg);
}
}
}
}
}
消費(fèi)者代碼
我們通過channel.QueueDeclare().QueueName;聲明一個(gè)新的隊(duì)列,如果這個(gè)方法聲明隊(duì)列,RabbitMQ會(huì)自動(dòng)為你生成一個(gè)獨(dú)一無二的隊(duì)列名稱
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName, "hello", "");
然后將隊(duì)列的名稱賦值給變量queueName
。將該隊(duì)列綁定到之前聲明的扇形交換機(jī)hello上,使用空字符串作為路由鍵。每次運(yùn)行這個(gè)項(xiàng)目時(shí),都會(huì)創(chuàng)建一個(gè)新的隊(duì)列并將其綁定到交換機(jī)上。這樣,多個(gè)消費(fèi)者可以同時(shí)連接到同一個(gè)交換機(jī),并從不同的隊(duì)列中接收消息。
class MyClass
{
static void Main(string[] args)
{
//創(chuàng)建連接工廠
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "guest";
factory.Password = "guest";
//創(chuàng)建連接
using (var connection = factory.CreateConnection())
{
//創(chuàng)建通道
using (var channel = connection.CreateModel())
{
//聲明了一個(gè)扇形交換機(jī)(fanout exchange),命名為"hello"
channel.ExchangeDeclare("hello", "fanout");
//聲明一個(gè)新的隊(duì)列,并將這個(gè)隊(duì)列的名稱賦值給變量 queueName
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName, "hello", "");
//事件的基本消費(fèi)者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("已接收: {0}", message);
};
channel.BasicConsume(queueName, true, consumer);
Console.ReadKey();
}
}
}
}
代碼演示
和我前面文章的步驟一樣,將消費(fèi)者先進(jìn)行發(fā)布打包,雙擊.exe文件運(yùn)行多次項(xiàng)目。
我們前面提到的使用channel.QueueDeclare().QueueName;聲明一個(gè)新的隊(duì)列,在RabbitMQ管理界面可以看到有三個(gè)自動(dòng)生成名稱的隊(duì)列。
然后我們啟動(dòng)生產(chǎn)者,并隨機(jī)發(fā)送幾條消息
再回到消費(fèi)者,我們運(yùn)行的三個(gè)消費(fèi)端都同時(shí)的收到了消息!文章來源:http://www.zghlxwxcb.cn/news/detail-833252.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-833252.html
到了這里,關(guān)于C#使用RabbitMQ-3_發(fā)布訂閱模式(扇形交換機(jī))的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!