0、引言
先決條件
本教程假設 RabbitMQ 已安裝并且正在
本地主機
的標準端口(5672
)上運行。如果您使用了不同的主機、端口或憑證,則要求調(diào)整連接設置。獲取幫助
如果您在閱讀本教程時遇到問題,可以通過郵件列表或者 RabbitMQ 社區(qū) Slack 與 RabbitMQ 官方取得聯(lián)系。
在上一篇教程中我們構(gòu)建了一個簡單的日志系統(tǒng),得以向許多接收者(receiver)廣播日志消息。
在本教程中我們將會為該系統(tǒng)添加一個特性 —— 我們將使“僅訂閱消息的一個子集”成為可能。例如,我們將能夠只將關(guān)鍵錯誤消息定向到日志文件(以節(jié)省磁盤空間),與此同時仍然能夠在控制臺上打印所有的日志消息。
原文鏈接:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
1、綁定
在上一篇例程中我們已經(jīng)建立過綁定了。您也許能夠回想起,代碼類似于:
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: string.Empty);
綁定 即表征 交換機 與 隊列 之間的關(guān)系。這可以簡單地理解為:隊列對來自此交換機的消息感興趣。
綁定可以接收一個額外的 routingKey
(路由鍵)參數(shù)。為了避免與 BasicPublish
方法的一個參數(shù)混淆,我們現(xiàn)在將其稱之為 binding key
(綁定鍵)。下面演示了我們?nèi)绾蝿?chuàng)建一個帶有鍵的綁定:
channel.QueueBind(queue: queueName,
exchange:"direct_logs",
routingKey: "black");
binding key
的含義取決于交換機的類型。比如我們之前使用的 fanout
交換機會簡單地忽略它的值。
2、直連交換機(Direct exchange)
在上一篇教程中,我們的日志系統(tǒng)向所有的消費者廣播所有消息。我們希望對其進行拓展,以允許根據(jù)消息的嚴重程度過濾消息。比如,我們也許想要向磁盤寫入日志消息的腳本文件只接收關(guān)鍵錯誤,而不是在警告或者信息日志消息上浪費磁盤空間。
但我們在上一篇教程使用的 fanout
扇出交換機并沒有給我們?nèi)绱舜蟮撵`活性 —— 它只能夠進行無意識的廣播。
相反,我們將會使用一個 direct
直連交換機。其背后的路由算法很簡單 —— 將消息路由到其 routing key
與隊列的 binding key
完全匹配的隊列。
為了說明這一點,請考慮如下配置:
在這個配置中,我們可以看到綁定了兩個隊列的 direct
直連交換機 X
。第一個隊列使用 orange
綁定鍵綁定;而第二個隊列擁有兩個綁定,一個綁定鍵為 black
,另一個綁定鍵為 green
。
在這樣的配置下,被發(fā)布到交換機中帶有 orange
路由鍵的消息將會被路由到 Q1
隊列。而帶有 black
或者 green
路由鍵的消息將會去往 Q2
隊列。其他所有消息將會被丟棄。
3、多個綁定
使用相同的綁定鍵綁定多個隊列是完全合法的。在我們的示例中,我們可以使用 black
綁定鍵在 X
與 Q1
之間添加綁定。在那樣的情況下,direct
直連交換機表現(xiàn)得像是 fanout
扇出交換機:向所有匹配的隊列廣播消息。帶有 black
路由鍵的消息將同時傳遞給 Q1
和 Q2
雙方。
4、發(fā)送日志
我們將在日志系統(tǒng)中使用這個模型,我們將消息發(fā)送至 direct
直連交換機而不是 fanout
扇出交換機。我們提供日志嚴重程序作為 routing key
路由鍵。這樣接收腳本就能夠選擇其想要接收的嚴重程度。讓我們首先關(guān)注發(fā)送日志:
如常,首先我們需要創(chuàng)建一個交換機:
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
我們已經(jīng)準備好發(fā)送一條消息:
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
為了簡化問題我們假定 ‘嚴重程度’ 可以是 info
、warning
、error
中的一個。
5、訂閱
接收消息的方式工作與上一篇教程類似,但有一點例外 —— 我們將會為我們感興趣的每個 ‘嚴重程度’ 創(chuàng)建新的綁定。
var queueName = channel.QueueDeclare().QueueName;
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
6、將所有的東西放到一起
EmitLogDirect.cs
類的代碼:
using System.Text;
using RabbitMQ.Client;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1)
? string.Join(" ", args.Skip(1).ToArray())
: "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
Console.WriteLine($" [x] Sent '{severity}':'{message}'");
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
ReceiveLogsDirect.cs
的代碼:
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
// declare a server-named queue
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach (var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
像往常一樣創(chuàng)建項目(建議參考教程一)
如果您只想要保存 warning
和 error
(沒有 info
)日志消息到文件中,只需要打開一個控制臺并輸入:
cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log
如果您想要在屏幕上看見所有的日志消息,打開一個新的終端并嘗試:
cd ReceiveLogsDirect
dotnet run info warning error
# => [*] Waiting for logs. To exit press CTRL+C
例如,要發(fā)送一個 error
日志消息,只需要輸入:
cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
(EmitLogDirect.cs 和 ReceiveLogsDirect.cs 的完整源碼)
運行效果:
要了解如何基于特定模式偵聽消息,請移步至教程五。
5、生產(chǎn)[非]適用性免責聲明
請記住,本教程和其他教程都是教程。他們一次展示一個新概念,可能會有意地過度簡化一些東西,而忽略其他東西。例如,為了簡潔起見,連接管理、錯誤處理、連接恢復、并發(fā)性和指標收集等主題在很大程度上被省略了。這種簡化的代碼不應該被認為可以用于生產(chǎn)。文章來源:http://www.zghlxwxcb.cn/news/detail-581010.html
在發(fā)布您的應用之前,請先查看其他文檔。我們特別推薦以下指南:發(fā)布者確認和消費者確認,生產(chǎn)清單和監(jiān)控。文章來源地址http://www.zghlxwxcb.cn/news/detail-581010.html
到了這里,關(guān)于(四)「消息隊列」之 RabbitMQ 路由(使用 .NET 客戶端)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!