国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

這篇具有很好參考價值的文章主要介紹了.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

一、安裝mq

二、實操

1、簡單模式

2、工作模式

3、fanout扇形模式(發(fā)布訂閱)

4、direct路由模式也叫定向模式

5、topic主題模式也叫通配符模式(路由模式的一種)

6、header 參數(shù)匹配模式

7、延時隊列(插件方式實現(xiàn))

參考資料:


一、安裝mq

1、我的環(huán)境是使用VMware安裝的Centos7系統(tǒng)。MQ部署在docker上面

使用Docker部署RabbitMQ_KiriSoyer的博客-CSDN博客_docker 部署rabbitmq

2、創(chuàng)建公共項目Commons用于提供者和消費者引用,nuget安裝 RabbitMQ.Client,添加一個幫助類:

public class RabbitMQHelper
? ? {

? ? ? ? //連接mq
? ? ? ? public static IConnection GetMQConnection()
? ? ? ? {
? ? ? ? ? ? var factory = new ConnectionFactory
? ? ? ? ? ? {
? ? ? ? ? ? ? ? HostName = "127.0.0.1", ?//mq的ip(我自己虛擬機上的)
? ? ? ? ? ? ? ? Port = 5672, //端口
? ? ? ? ? ? ? ? UserName = "guoyingjian", ?//賬戶
? ? ? ? ? ? ? ? Password = "guoyingjian", ?//密碼
? ? ? ? ? ? ? ? VirtualHost = "/" //虛擬機?
? ? ? ? ? ? };
? ? ? ? ? ? return factory.CreateConnection(); ?//返回連接
? ? ? ? }
? ? }

二、實操

rabbitmq消息隊列有幾種模式:

1、簡單模式

一個提供者,一個消費者,是有序的,消費者只有一個,吞吐量低,工作基本不用,用來學習了解一下還是可以的

2、工作模式

根據(jù)隊列名發(fā)消息,但有多個消費者,無序的,吞吐量高,1和2工作中基本不用,因為他們沒有使用自定義交換機,練練手明白就行了。

生產(chǎn)者代碼:

using RabbitMQ.Client;

????????/// <summary>
? ? ? ? /// MQ 工作隊列模式發(fā)消息
? ? ? ? /// </summary>
? ? ? ? /// <returns></returns>
? ? ? ? public void SendWorkerMQ()
? ? ? ? {
? ? ? ? ? ? //最基礎(chǔ)的是點對點的隊列模式,他的優(yōu)勢是有序的,

? ? ? ? ? ? //下面這個工作隊列是無序的
? ? ? ? ? ? #region 工作隊列模式
? ? ? ? ? ? string queueName = "WorkQueue";
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? //創(chuàng)建隊列
? ? ? ? ? ? ? ? ? ? channel.QueueDeclare(queueName, false, false, false, null);
? ? ? ? ? ? ? ? ? ? for (int i = 1; i <= 30; i++)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? string message = "hello mq" + i;
? ? ? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes(message);
? ? ? ? ? ? ? ? ? ? ? ? //發(fā)送消息到mq,如沒綁定交換機,將使用默認交換機路由
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
? ? ? ? ? ? ? ? ? ? ? ? Console.WriteLine("send normal message" + i);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }

消費者代碼:

//工作隊列接收消息(多個消費者,默認輪循)
? ? ? ? public static void ReceiveMessage()
? ? ? ? {
? ? ? ? ? ? string queueName = "WorkQueue";//隊列名稱與提供者一致
? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();

? ? ? ? ? ??//創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();
? ? ? ? ? ? channel.QueueDeclare(queueName, false, false, false, null);
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);

? ? ? ? ? ??//prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

? ? ? ? ? ? //消息處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine($"normal received => {message}");
? ? ? ? ? ? };

? ? ? ? ? ? //消費消息
? ? ? ? ? ? channel.BasicConsume(queueName, true, consumer);

? ? ? ? }

下面是工作中使用交換機的4種模式:

3、fanout扇形模式(發(fā)布訂閱)

該類型通常叫作廣播類型。fanout類型的Exchange不處理Routing key,而是會將發(fā)送給Exchange的消息,路由到所有與它綁定的Queue上。比如現(xiàn)在有一個fanout類型的Exchange,它下面綁定了三個Queue,Routing key分別是ORDER/GOODS/STOCK:

.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

然后向該Exchange中發(fā)送一條消息,消息的Routing key隨便填一個值abc(不填也行),如果這個Exchange的路由與這三個Queue綁定,則三個Queue都應該會收到消息

生產(chǎn)者代碼:

????????/// <summary>
? ? ? ? /// MQ 扇形交換機模式發(fā)消息
? ? ? ? /// </summary>
? ? ? ? /// <returns></returns>
? ? ? ? [HttpGet("SendFanoutWorkerMQ")]
? ? ? ? public void SendFanoutWorkerMQ()
? ? ? ? {
? ? ? ? ? ? #region 使用扇形交換機模式
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? string exchangeName = "fanout_exchange";//fanout只提供交換機名稱即可

? ? ? ? ? ? ? ? ? ? var properties = channel.CreateBasicProperties();
? ? ? ? ? ? ? ? ? ? properties.Persistent = true; //設(shè)置數(shù)據(jù)的持久化,保證mq服務掛掉之后消息的安全性

? ? ? ? ? ? ? ? ? ? for (int i = 1; i <= 10; i++)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes("hello mq" + i);
? ? ? ? ? ? ? ? ? ? ? ? //這里綁定了交換機,那么就會發(fā)送到這個交換機所有綁定過的隊列中
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: "", properties, body);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }

消費者代碼:

/// <summary>
? ? ? ? /// 扇形模式隊列消費消息
? ? ? ? /// 注意先啟動一次消費端會創(chuàng)建交換機、隊列、綁定。如果不啟動則消息丟失。也可以在生產(chǎn)端做這些創(chuàng)建和綁定
? ? ? ? /// </summary>
? ? ? ? public static void FanoutReceiveMessage()
? ? ? ? {

? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();
? ? ? ? ? ? //創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();

? ? ? ? ? ? //創(chuàng)建交換機
? ? ? ? ? ? channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
? ? ? ? ? ? //創(chuàng)建隊列
? ? ? ? ? ? string queueName1 = "fanoutWorkQueue1";//隊列名稱
? ? ? ? ? ? string queueName2 = "fanoutWorkQueue2";
? ? ? ? ? ? string queueName3 = "fanoutWorkQueue3";
? ? ? ? ? ? channel.QueueDeclare(queue: queueName1,//隊列名
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: false,//是否持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,//排它性
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,//一旦客戶端連接斷開則自動刪除queue
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);//如果安裝了隊列優(yōu)先級插件則可以設(shè)置優(yōu)先級
? ? ? ? ? ? channel.QueueDeclare(queueName2, false, false, false, null);
? ? ? ? ? ? channel.QueueDeclare(queueName3, false, false, false, null);

? ? ? ? ? ? //多個隊列綁定到fanout_exchange交換機(似發(fā)布訂閱)
? ? ? ? ? ? channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
? ? ? ? ? ? channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
? ? ? ? ? ? channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

? ? ? ? ? ? //聲明消費者
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);

? ? ? ? ? ? //對消費端進行限流:
? ? ? ? ? ? //首先第一步,我們既然要使用消費端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 channel.basicConsume(queueName, false, consumer);
? ? ? ? ? ? //第二步我們來設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
? ? ? ? ? ? //第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應為 channel.basicAck(envelope.getDeliveryTag(), true);
? ? ? ? ? ? //prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

? ? ? ? ? ? //消費者處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine($"normal received => {message}");
? ? ? ? ? ? };

? ? ? ? ? ? //消費消息
? ? ? ? ? ? channel.BasicConsume(queueName2, //隊列名
? ? ? ? ? ? ? ? autoAck: true, //確認消費(刪除)
? ? ? ? ? ? ? ? consumer: consumer);

? ? ? ? }

4、direct路由模式也叫定向模式

direct類型的Exchange會將消息轉(zhuǎn)發(fā)到指定Routing key的Queue上,Routing key的解析規(guī)則為精確匹配。也就是只有當producer發(fā)送的消息的Routing key與消費端的某個Routing key相等時,消息才會被分發(fā)到對應的Queue上。比如現(xiàn)在有一個direct類型的Exchange,它下面綁定了三個Queue,Routing key分別是ORDER/GOODS/STOCK:

.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

然后向該Exchange中發(fā)送一條消息,消息的Routing key是ORDER,那只有Routing key是ORDER的隊列有一條消息。(與fanout區(qū)別:fanout根據(jù)已綁定的交換機的隊列發(fā)送消息。direct當然也得綁定交換機,只不過再精確匹配到routingkey相等的隊列發(fā)送消息)

生產(chǎn)者代碼:

?????????/// <summary>
? ? ? ? /// MQ 直接交換機模式發(fā)消息(指定routingKey發(fā)送)
? ? ? ? /// </summary>
? ? ? ? /// <returns></returns>
? ? ? ? [HttpGet("SendDirectWorkerMQ")]

????????public void SendDirectWorkerMQ()
? ? ? ? {
? ? ? ? ? ? #region 使用直接交換機模式
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? //direct只提供交換機名稱和routingkey即可,消費端只消費routingkey相匹配的
? ? ? ? ? ? ? ? ? ? string exchangeName = "direct_exchange";

? ? ? ? ? ? ? ? ? ??var properties = channel.CreateBasicProperties();
? ? ? ? ? ? ? ? ? ? properties.Persistent = true; //設(shè)置數(shù)據(jù)的持久化,保證mq服務掛掉之后消息的安全性

? ? ? ? ? ? ? ? ? ? for (int i = 1; i <= 10; i++)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes("hello mq" + i + "yellow");
? ? ? ? ? ? ? ? ? ? ? ? //這里綁定了交換機,同時綁定了routekey,就會發(fā)送到routekey是yellow的隊列中
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: "yellow", properties, body);

? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }

消費者代碼:

/// <summary>
? ? ? ? /// 直接模式隊列消費消息
? ? ? ? /// 注意先啟動一次消費端會創(chuàng)建交換機、隊列、綁定。如果不啟動則消息丟失。也可以在生產(chǎn)端做這些創(chuàng)建和綁定
? ? ? ? /// </summary>
? ? ? ? public static void DirectReceiveMessage()
? ? ? ? {
? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();
? ? ? ? ? ? //創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();

? ? ? ? ? ? //創(chuàng)建交換機
? ? ? ? ? ? channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
? ? ? ? ? ? //創(chuàng)建隊列
? ? ? ? ? ? string queueName1 = "directWorkQueue1";//隊列名稱
? ? ? ? ? ? string queueName2 = "directWorkQueue2";
? ? ? ? ? ? string queueName3 = "directWorkQueue3";
? ? ? ? ? ? channel.QueueDeclare(queue: queueName1,//隊列名
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: false,//是否持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,//排它性
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,//一旦客戶端連接斷開則自動刪除queue
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);//如果安裝了隊列優(yōu)先級插件則可以設(shè)置優(yōu)先級
? ? ? ? ? ? channel.QueueDeclare(queueName2, false, false, false, null);
? ? ? ? ? ? channel.QueueDeclare(queueName3, false, false, false, null);

? ? ? ? ? ? //多個隊列綁定到fanout_exchange交換機(似發(fā)布訂閱)
? ? ? ? ? ? channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
? ? ? ? ? ? channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
? ? ? ? ? ? channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");

? ? ? ? ? ? //聲明消費者
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);

? ? ? ? ? ? //對消費端進行限流:
? ? ? ? ? ? //首先第一步,我們既然要使用消費端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 falsechannel.basicConsume(queueName, false, consumer);
? ? ? ? ? ? //第二步我們來設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
? ? ? ? ? ? //第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應為 truechannel.basicAck(envelope.getDeliveryTag(), true);
? ? ? ? ? ? //prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

? ? ? ? ? ? //消費者處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");

? ? ? ? ? ? ? ? //消費完成后需要手動手動簽收消息,如果不寫該代碼就容易導致重復消費問題
? ? ? ? ? ? ? ? //可以降低每次簽收性能損耗。參數(shù)multiple:false就是單個手動簽收,true就是批量簽收,比如消費30條消息后再確認簽收
? ? ? ? ? ? ? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
? ? ? ? ? ? };

? ? ? ? ? ? //消息的簽收模式
? ? ? ? ? ? //手動簽收:保證正確消費,不會丟消息(基于客戶端而已)
? ? ? ? ? ? //自動簽收:容易丟失消息
? ? ? ? ? ? channel.BasicConsume(queueName1, //消費隊列2的消息
? ? ? ? ? ? ? ? autoAck: false, //代表要手動簽收,因可能會出現(xiàn)確認簽收了然后宕機了導致沒有執(zhí)行事件,造成消息丟失。解決方案:手動簽收操作寫在了隊列事件完成后。
? ? ? ? ? ? ? ? consumer: consumer);

? ? ? ? }

5、topic主題模式也叫通配符模式(路由模式的一種)

根據(jù)通配符模糊匹配,將消息交給符合routing pattern(路由模式)的隊列。

它與direct相比,都是可以根據(jù)routingkey把消息路由到不同的隊列。只不過topic類型exchange可以讓隊列在綁定routingkey的時候使用通配符。

routingkey一般都是有一個或多個單詞組成,多個單詞以“.”分割,例如:“item.insert”。通配符匹配規(guī)則“#”可以匹配一個或多個單詞,“*”只能匹配1個單詞,例如:“item.#”可以匹配“item.insert.asd”或者“item.insert”,“item.*”只能匹配到“item.insert”。

.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

生產(chǎn)者代碼:

public void SendTopicWorkerMQ()
? ? ? ? {
? ? ? ? ? ? #region 使用topic交換機模式
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? //topic只提供交換機名稱和routingkey即可,消費端只消費與routingkey通配符匹配的
? ? ? ? ? ? ? ? ? ? string exchangeName = "topic_exchange";

? ? ? ? ? ? ? ? ? ??

? ? ? ? ? ? ? ? ? ? string routingKey1 = "user.america";
? ? ? ? ? ? ? ? ? ? string routingKey2 = "user.china";
? ? ? ? ? ? ? ? ? ? string routingKey3 = "user.china.beijing";
? ? ? ? ? ? ? ? ? ? string routingKey4 = "user.china.beijing.changping";

? ? ? ? ? ? ? ? ? ? var properties = channel.CreateBasicProperties();
? ? ? ? ? ? ? ? ? ? properties.Persistent = true; //設(shè)置數(shù)據(jù)的持久化,保證mq服務掛掉之后消息的安全性
? ? ? ? ? ? ? ? ? ? for (int i = 1; i <= 10; i++)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes("hello mq" + i + "topic");
? ? ? ? ? ? ? ? ? ? ? ? //傳4個不同的routingkey過去,消費者會根據(jù)通配符匹配并消費(好像不能在生產(chǎn)者寫通配符)
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: routingKey1, properties , body);
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: routingKey2, properties , body);
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: routingKey3, properties , body);
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: routingKey4, properties , body);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }

消費者代碼:

/// <summary>
? ? ? ? /// 主題模式隊列消費消息
? ? ? ? /// 注意先啟動一次消費端會創(chuàng)建交換機、隊列、綁定。如果不啟動則消息丟失。也可以在生產(chǎn)端做這些創(chuàng)建和綁定
? ? ? ? /// </summary>
? ? ? ? public static void TopicReceiveMessage()
? ? ? ? {
? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();
? ? ? ? ? ? //創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();

? ? ? ? ? ? //創(chuàng)建交換機
? ? ? ? ? ? channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
? ? ? ? ? ? string exchangeName = "topic_exchange";
? ? ? ? ? ? //創(chuàng)建隊列
? ? ? ? ? ? string queueName1 = "topicWorkQueue1";//隊列名稱
? ? ? ? ? ? string queueName2 = "topicWorkQueue2";
? ? ? ? ? ? string queueName3 = "topicWorkQueue3";
? ? ? ? ? ? channel.QueueDeclare(queue: queueName1,//隊列名
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: false,//是否持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,//排它性
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,//一旦客戶端連接斷開則自動刪除queue
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);//如果安裝了隊列優(yōu)先級插件則可以設(shè)置優(yōu)先級
? ? ? ? ? ? channel.QueueDeclare(queueName2, false, false, false, null);
? ? ? ? ? ? channel.QueueDeclare(queueName3, false, false, false, null);

? ? ? ? ? ? //多個隊列綁定到fanout_exchange交換機
? ? ? ? ? ? channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "user.*.*");//匹配例如:user.a.b
? ? ? ? ? ? channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "user.*"); ?//匹配例如:user.a
? ? ? ? ? ? channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "user.#"); ?//匹配例如:user...... (user. 后面是啥都行)

? ? ? ? ? ? //聲明消費者
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);

? ? ? ? ? ? //對消費端進行限流:
? ? ? ? ? ? //首先第一步,我們既然要使用消費端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 channel.basicConsume(queueName, false, consumer);
? ? ? ? ? ? //第二步我們來設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
? ? ? ? ? ? //第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應為 channel.basicAck(envelope.getDeliveryTag(), true);
? ? ? ? ? ? //prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

? ? ? ? ? ? //消費者處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");

? ? ? ? ? ? ? ? //消費完成后需要手動手動簽收消息,如果不寫該代碼就容易導致重復消費問題
? ? ? ? ? ? ? ? //可以降低每次簽收性能損耗。參數(shù)multiple:false就是單個手動簽收,true就是批量簽收,比如消費30條消息后再確認簽收
? ? ? ? ? ? ? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
? ? ? ? ? ? };

? ? ? ? ? ? //消息的簽收模式
? ? ? ? ? ? //手動簽收:保證正確消費,不會丟消息(基于客戶端而已)
? ? ? ? ? ? //自動簽收:容易丟失消息
? ? ? ? ? ? channel.BasicConsume(queueName2, //消費隊列2的消息(可以手動替換其他隊列消費)
? ? ? ? ? ? ? ? autoAck: false, //代表要手動簽收,因可能會出現(xiàn)確認簽收了然后宕機了導致沒有執(zhí)行事件,造成消息丟失。解決方案:手動簽收操作寫在了隊列事件完成后。
? ? ? ? ? ? ? ? consumer: consumer);

? ? ? ? }

6、header 參數(shù)匹配模式

header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊列,Headers?類型的交換器性能會很差,所以這種類型不常用。

以上注意:Exchange(交換機):只負責轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息將會丟失??!

7、延時隊列(插件方式實現(xiàn))

實現(xiàn)延遲隊列有兩種方法

1、TTL+DLX需要創(chuàng)建死信交換機綁定隊列,需創(chuàng)建多個交換機多個隊列,復雜麻煩所以不推薦。

2、推薦使用rabbitmq_delayed_message_exchange 插件實現(xiàn),下面來實現(xiàn)一下:

①插件下載網(wǎng)址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

注意:一定要下載與自己mq版本同一個版本號,不然下面開啟插件會報錯

比如我的mq版本是 3.9.11的,那我就下載rabbitmq_delayed_message_exchange-3.9.0.ez 即可

下載之后想辦法上傳到linux上。

或者直接在linux上面下載:

#linux下載插件的命令:(注:選擇屬于自己的版本號,f12指針查看按鈕鏈接獲取下載地址)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez

②下載完畢后,使用命令復制到docker內(nèi)rabbitmq容器的plugins文件夾下

查看mq的位置: whereis rabbitmq

查看運行中的容器命令:

docker ps

docker的復制命令:

docker cp rabbitmq_delayed_message_exchange-3.9.0.ez 0e6e229cc6f2:/plugins(0e6e229cc6f2是容器id,plugins是mq容器內(nèi)的文件夾)

③然后進入容器

docker exec -it?0e6e229cc6f2 bash

退出容器:

exit

或者按Ctrl+P+Q進行退出容器

④進入之后直接進入plugins文件夾下看看復制進去沒有?

cd /plugins

ls

.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

記住只能有一個?rabbitmq_delayed_message_exchange-3.9.0.ez文件,不能放多個版本否則報錯

⑤啟用rabbitmq_delayed_message_exchange插件

開啟插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

啟用之后可以看到mq可視化頁面,交換機類型多了一個 x-delayed-message

.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

⑥然后就是代碼實現(xiàn):

生產(chǎn)者代碼:

public void SendDelayedWorkerMQ()
? ? ? ? {
? ? ? ? ? ? #region 使用延時交換機模式
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? string exchangeName = "delayed_exchange";//delayed需提供交換機名稱
? ? ? ? ? ? ? ? ? ? string queueName = "delay_WorkQueue";

? ? ? ? ? ? ? ? ? ? #region 消費端做交換機和隊列的創(chuàng)建和綁定

? ? ? ? ? ? ? ? ? ? #endregion
? ? ? ? ? ? ? ? ? ? var properties = channel.CreateBasicProperties();
? ? ? ? ? ? ? ? ? ? properties.Persistent = true; //設(shè)置數(shù)據(jù)的持久化,保證mq服務掛掉之后消息的安全性
? ? ? ? ? ? ? ? ? ? //properties.Priority = 9;//消息的優(yōu)先級 ?值越大 優(yōu)先級越高 0~9
? ? ? ? ? ? ? ? ? ? //延時時間從header賦值
? ? ? ? ? ? ? ? ? ? Dictionary<string, object> headers = new Dictionary<string, object>();
? ? ? ? ? ? ? ? ? ? headers.Add("x-delay", 10000);
? ? ? ? ? ? ? ? ? ? properties.Headers = headers;

? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes("生產(chǎn)者發(fā)送時間:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
? ? ? ? ? ? ? ? ? ? //發(fā)送延時消息
? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: queueName, properties, body);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }

消費者代碼:

/// <summary>
? ? ? ? /// 延遲交換機模式隊列消費消息
? ? ? ? /// 注意先啟動一次消費端會創(chuàng)建交換機、隊列、綁定。如果不啟動則消息丟失。也可以在生產(chǎn)端做這些創(chuàng)建和綁定
? ? ? ? /// </summary>
? ? ? ? public static void DelayedReceiveMessage()
? ? ? ? {

? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();
? ? ? ? ? ? //創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();
? ? ? ? ? ? string queueName = "delay_WorkQueue";//隊列名稱
? ? ? ? ? ? string exchangeName = "delayed_exchange";//隊列名稱

? ? ? ? ? ? Dictionary<string, object> args = new Dictionary<string, object>();
? ? ? ? ? ? args.Add("x-delayed-type", "direct"); //x-delayed-type必須加(這個創(chuàng)建的是交換機類型)

? ? ? ? ? ? //創(chuàng)建交換機
? ? ? ? ? ? channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", durable: true, autoDelete: false, arguments: args);
? ? ? ? ? ? //創(chuàng)建隊列

? ? ? ? ? ? channel.QueueDeclare(queue: queueName,//隊列名
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: true,//隊列是否持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,//是否為單消費者隊列,為True時,只能由單一消費者消費
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,//是否自動刪除隊列,當消費者全部斷開時,隊列自動刪除
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);//高級特性

? ? ? ? ? ? //多個隊列綁定到delayed_exchange交換機(似發(fā)布訂閱)
? ? ? ? ? ? channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName);

? ? ? ? ? ? //聲明消費者
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);

? ? ? ? ? ? //對消費端進行限流:
? ? ? ? ? ? //首先第一步,我們既然要使用消費端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 channel.basicConsume(queueName, false, consumer);
? ? ? ? ? ? //第二步我們來設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
? ? ? ? ? ? //第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應為 channel.basicAck(envelope.getDeliveryTag(), true);
? ? ? ? ? ? //prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

? ? ? ? ? ? //消費者處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine(message);
? ? ? ? ? ? ? ? Console.WriteLine($"消費者消費時間:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));

? ? ? ? ? ? ? ? //消費完成后需要手動手動簽收消息,如果不寫該代碼就容易導致重復消費問題
? ? ? ? ? ? ? ? //可以降低每次簽收性能損耗。參數(shù)multiple:false就是單個手動簽收,true就是批量簽收,比如消費30條消息后再確認簽收
? ? ? ? ? ? ? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
? ? ? ? ? ? };

? ? ? ? ? ? //消費消息
? ? ? ? ? ? channel.BasicConsume(queueName, //隊列名
? ? ? ? ? ? ? ? autoAck: false, //確認消費
? ? ? ? ? ? ? ? consumer: consumer);
? ? ? ? }

最終啟動程序可以看到消費者端輸出的結(jié)果計算精準的相差10秒!

.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)

參考資料:

持久化、優(yōu)先級、高級特性:

NetCore RabbitMQ高級特性 持久化 及 消息優(yōu)先級 - 天才臥龍 - 博客園

延遲和死信隊列:https://www.jb51.net/article/221796.htm

NetCore RabbitMQ 高級特性 消息存活周期TTL、死信交換機/死信對列DLX,延遲隊列,及冪等性的保障 - 天才臥龍 - 博客園文章來源地址http://www.zghlxwxcb.cn/news/detail-406116.html

到了這里,關(guān)于.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • RabbitMQ學習筆記(消息發(fā)布確認,死信隊列,集群,交換機,持久化,生產(chǎn)者、消費者)

    RabbitMQ學習筆記(消息發(fā)布確認,死信隊列,集群,交換機,持久化,生產(chǎn)者、消費者)

    MQ(message queue):本質(zhì)上是個隊列,遵循FIFO原則,隊列中存放的是message,是一種跨進程的通信機制,用于上下游傳遞消息。MQ提供“邏輯解耦+物理解耦”的消息通信服務。使用了MQ之后消息發(fā)送上游只需要依賴MQ,不需要依賴其它服務。 功能1:流量消峰 功能2:應用解耦 功

    2024年02月07日
    瀏覽(118)
  • RabbitMQ隊列及交換機的使用

    RabbitMQ隊列及交換機的使用

    目錄 一、簡單模型 1、首先控制臺創(chuàng)建一個隊列 2、父工程導入依賴? 3、生產(chǎn)者配置文件 ?4、寫測試類 5、消費者配置文件 6、消費者接收消息 二、WorkQueues模型 1、在控制臺創(chuàng)建一個新的隊列 2、生產(chǎn)者生產(chǎn)消息 3、創(chuàng)建兩個消費者接收消息 4、能者多勞充分利用每一個消費者

    2024年02月04日
    瀏覽(22)
  • 消息隊列-RabbitMQ:Exchanges、綁定 bindings以及3大常用交換機(Fanout exchange、Direct exchange、Topics exchange)

    消息隊列-RabbitMQ:Exchanges、綁定 bindings以及3大常用交換機(Fanout exchange、Direct exchange、Topics exchange)

    RabbitMQ 消息傳遞模型的核心思想是: 生產(chǎn)者生產(chǎn)的消息從不會直接發(fā)送到隊列 。實際上, 通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊列中 。 相反, 生產(chǎn)者只能將消息發(fā)送到交換機 (exchange) , 交換機工作 的內(nèi)容非常簡單, 一方面它接收來自生產(chǎn)者的消息 , 另一

    2024年04月08日
    瀏覽(21)
  • 【學習日記2023.6.19】 之 RabbitMQ服務異步通信_消息可靠性_死信交換機_惰性隊列_MQ集群

    【學習日記2023.6.19】 之 RabbitMQ服務異步通信_消息可靠性_死信交換機_惰性隊列_MQ集群

    消息隊列在使用過程中,面臨著很多實際問題需要思考: 消息從發(fā)送,到消費者接收,會經(jīng)歷多個過程: 其中的每一步都可能導致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收

    2024年02月11日
    瀏覽(98)
  • 利用消息中間件RabbitMQ創(chuàng)建隊列以及扇出(Fanout)、訂閱(Direct)、主題(Topic)交換機來完成消息的發(fā)送和監(jiān)聽接收(完整版)

    利用消息中間件RabbitMQ創(chuàng)建隊列以及扇出(Fanout)、訂閱(Direct)、主題(Topic)交換機來完成消息的發(fā)送和監(jiān)聽接收(完整版)

    目錄 一、前期項目環(huán)境準備 1.1父項目以及子項目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout)?交換機實現(xiàn)消息的發(fā)送和接收 2.1編寫子項目consumer(消費者,接收消息)的代碼實現(xiàn)扇出(Fanout)交換機接收消息 2.1.1consumer子項目結(jié)構(gòu) 2.1.2FanoutConfig類的實現(xiàn)扇出(Fanout)交

    2024年02月05日
    瀏覽(95)
  • RabbitMQ交換機與隊列

    RabbitMQ交換機與隊列

    RabbitMQ 消息傳遞模型的核心思想是: 生產(chǎn)者生產(chǎn)的消息從不會直接發(fā)送到隊列 。實際上,通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。 相反, 生產(chǎn)者只能將消息發(fā)送到交換機(exchange) ,交換機工作的內(nèi)容非常簡單, 一方面它接收來自生產(chǎn)者的消息,另一方面

    2024年01月24日
    瀏覽(24)
  • RabbitMq創(chuàng)建交換機和隊列

    RabbitMq創(chuàng)建交換機和隊列

    1. 網(wǎng)頁登錄 IP:1572 ? 2. 輸入登錄賬號密碼 admin admin 3. 點擊Exchanges 添加交換機Platform_AlarmEngineInterface 和Rg_Platform_AlarmEngineInterface ,Type選擇topic 4. 添加隊列 VIDEO_Alarm_platform 、 watch_ftp、 RG_VIDEO_Alarm_platform 、 RG_VIDEO_Alarm_platform_jiance 5. 綁定交換機和隊列 (1) 點擊Exchanges界面,選擇其

    2024年02月16日
    瀏覽(14)
  • RabbitMQ-死信交換機和死信隊列

    RabbitMQ-死信交換機和死信隊列

    DLX: Dead-Letter-Exchange 死信交換器,死信郵箱 當消息成為Dead message后,可以被重新發(fā)送到另一個交換機,這個交換機就是DLX。 如下圖所示: 其實死信隊列就是一個普通的交換機,有些隊列的消息成為死信后,(比如過期了或者隊列滿了)這些死信一般情況下是會被 RabbitMQ 清理

    2024年02月08日
    瀏覽(26)
  • SpringBoot整合RabbitMQ系列--綁定交換機與隊列的方法

    SpringBoot整合RabbitMQ系列--綁定交換機與隊列的方法

    原文網(wǎng)址:SpringBoot整合RabbitMQ系列--綁定交換機與隊列的方法_IT利刃出鞘的博客-CSDN博客 ? ? ? ? 本文用實例介紹SpringBoot中RabbitMQ如何綁定交換機(交換器)與隊列。 交換機 下邊兩種方式等價。 隊列 下邊兩種方式等價 綁定 下邊兩種方式等價 注意:第一種的參數(shù)并不是字符

    2023年04月09日
    瀏覽(34)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包