目錄
一、安裝mq
二、實操
1、簡單模式
2、工作模式
3、fanout扇形模式(發(fā)布訂閱)
4、direct路由模式也叫定向模式
5、topic主題模式也叫通配符模式(路由模式的一種)
6、header 參數(shù)匹配模式
7、延時隊列(插件方式實現(xiàn))
參考資料:
一、安裝mq
1、我的環(huán)境是使用VMware安裝的Centos7系統(tǒng)。MQ部署在docker上面
使用Docker部署RabbitMQ_KiriSoyer的博客-CSDN博客_docker 部署rabbitmq
2、創(chuàng)建公共項目Commons用于提供者和消費者引用,nuget安裝 RabbitMQ.Client,添加一個幫助類:
public class RabbitMQHelper
? ? {? ? ? ? //連接mq
? ? ? ? public static IConnection GetMQConnection()
? ? ? ? {
? ? ? ? ? ? var factory = new ConnectionFactory
? ? ? ? ? ? {
? ? ? ? ? ? ? ? HostName = "127.0.0.1", ?//mq的ip(我自己虛擬機上的)
? ? ? ? ? ? ? ? Port = 5672, //端口
? ? ? ? ? ? ? ? UserName = "guoyingjian", ?//賬戶
? ? ? ? ? ? ? ? Password = "guoyingjian", ?//密碼
? ? ? ? ? ? ? ? VirtualHost = "/" //虛擬機?
? ? ? ? ? ? };
? ? ? ? ? ? return factory.CreateConnection(); ?//返回連接
? ? ? ? }
? ? }
二、實操
rabbitmq消息隊列有幾種模式:
1、簡單模式
一個提供者,一個消費者,是有序的,消費者只有一個,吞吐量低,工作基本不用,用來學習了解一下還是可以的
2、工作模式
根據(jù)隊列名發(fā)消息,但有多個消費者,無序的,吞吐量高,1和2工作中基本不用,因為他們沒有使用自定義交換機,練練手明白就行了。
生產(chǎn)者代碼:
using RabbitMQ.Client;
????????/// <summary>
? ? ? ? /// MQ 工作隊列模式發(fā)消息
? ? ? ? /// </summary>
? ? ? ? /// <returns></returns>
? ? ? ? public void SendWorkerMQ()
? ? ? ? {
? ? ? ? ? ? //最基礎(chǔ)的是點對點的隊列模式,他的優(yōu)勢是有序的,? ? ? ? ? ? //下面這個工作隊列是無序的
? ? ? ? ? ? #region 工作隊列模式
? ? ? ? ? ? string queueName = "WorkQueue";
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? //創(chuàng)建隊列
? ? ? ? ? ? ? ? ? ? channel.QueueDeclare(queueName, false, false, false, null);
? ? ? ? ? ? ? ? ? ? for (int i = 1; i <= 30; i++)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? string message = "hello mq" + i;
? ? ? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes(message);
? ? ? ? ? ? ? ? ? ? ? ? //發(fā)送消息到mq,如沒綁定交換機,將使用默認交換機路由
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
? ? ? ? ? ? ? ? ? ? ? ? Console.WriteLine("send normal message" + i);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }
消費者代碼:
//工作隊列接收消息(多個消費者,默認輪循)
? ? ? ? public static void ReceiveMessage()
? ? ? ? {
? ? ? ? ? ? string queueName = "WorkQueue";//隊列名稱與提供者一致
? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();? ? ? ? ? ??//創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();
? ? ? ? ? ? channel.QueueDeclare(queueName, false, false, false, null);
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);? ? ? ? ? ??//prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);? ? ? ? ? ? //消息處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine($"normal received => {message}");
? ? ? ? ? ? };? ? ? ? ? ? //消費消息
? ? ? ? ? ? channel.BasicConsume(queueName, true, consumer);? ? ? ? }
下面是工作中使用交換機的4種模式:
3、fanout扇形模式(發(fā)布訂閱)
該類型通常叫作廣播類型。fanout類型的Exchange不處理Routing key,而是會將發(fā)送給Exchange的消息,路由到所有與它綁定的Queue上。比如現(xiàn)在有一個fanout類型的Exchange,它下面綁定了三個Queue,Routing key分別是ORDER/GOODS/STOCK:
然后向該Exchange中發(fā)送一條消息,消息的Routing key隨便填一個值abc(不填也行),如果這個Exchange的路由與這三個Queue綁定,則三個Queue都應該會收到消息
生產(chǎn)者代碼:
????????/// <summary>
? ? ? ? /// MQ 扇形交換機模式發(fā)消息
? ? ? ? /// </summary>
? ? ? ? /// <returns></returns>
? ? ? ? [HttpGet("SendFanoutWorkerMQ")]
? ? ? ? public void SendFanoutWorkerMQ()
? ? ? ? {
? ? ? ? ? ? #region 使用扇形交換機模式
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? string exchangeName = "fanout_exchange";//fanout只提供交換機名稱即可? ? ? ? ? ? ? ? ? ? var properties = channel.CreateBasicProperties();
? ? ? ? ? ? ? ? ? ? properties.Persistent = true; //設(shè)置數(shù)據(jù)的持久化,保證mq服務掛掉之后消息的安全性? ? ? ? ? ? ? ? ? ? for (int i = 1; i <= 10; i++)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes("hello mq" + i);
? ? ? ? ? ? ? ? ? ? ? ? //這里綁定了交換機,那么就會發(fā)送到這個交換機所有綁定過的隊列中
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: "", properties, body);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }
消費者代碼:
/// <summary>
? ? ? ? /// 扇形模式隊列消費消息
? ? ? ? /// 注意先啟動一次消費端會創(chuàng)建交換機、隊列、綁定。如果不啟動則消息丟失。也可以在生產(chǎn)端做這些創(chuàng)建和綁定
? ? ? ? /// </summary>
? ? ? ? public static void FanoutReceiveMessage()
? ? ? ? {? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();
? ? ? ? ? ? //創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();? ? ? ? ? ? //創(chuàng)建交換機
? ? ? ? ? ? channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
? ? ? ? ? ? //創(chuàng)建隊列
? ? ? ? ? ? string queueName1 = "fanoutWorkQueue1";//隊列名稱
? ? ? ? ? ? string queueName2 = "fanoutWorkQueue2";
? ? ? ? ? ? string queueName3 = "fanoutWorkQueue3";
? ? ? ? ? ? channel.QueueDeclare(queue: queueName1,//隊列名
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: false,//是否持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,//排它性
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,//一旦客戶端連接斷開則自動刪除queue
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);//如果安裝了隊列優(yōu)先級插件則可以設(shè)置優(yōu)先級
? ? ? ? ? ? channel.QueueDeclare(queueName2, false, false, false, null);
? ? ? ? ? ? channel.QueueDeclare(queueName3, false, false, false, null);? ? ? ? ? ? //多個隊列綁定到fanout_exchange交換機(似發(fā)布訂閱)
? ? ? ? ? ? channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
? ? ? ? ? ? channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
? ? ? ? ? ? channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");? ? ? ? ? ? //聲明消費者
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);? ? ? ? ? ? //對消費端進行限流:
? ? ? ? ? ? //首先第一步,我們既然要使用消費端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 channel.basicConsume(queueName, false, consumer);
? ? ? ? ? ? //第二步我們來設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
? ? ? ? ? ? //第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應為 channel.basicAck(envelope.getDeliveryTag(), true);
? ? ? ? ? ? //prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);? ? ? ? ? ? //消費者處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine($"normal received => {message}");
? ? ? ? ? ? };? ? ? ? ? ? //消費消息
? ? ? ? ? ? channel.BasicConsume(queueName2, //隊列名
? ? ? ? ? ? ? ? autoAck: true, //確認消費(刪除)
? ? ? ? ? ? ? ? consumer: consumer);? ? ? ? }
4、direct路由模式也叫定向模式
direct類型的Exchange會將消息轉(zhuǎn)發(fā)到指定Routing key的Queue上,Routing key的解析規(guī)則為精確匹配。也就是只有當producer發(fā)送的消息的Routing key與消費端的某個Routing key相等時,消息才會被分發(fā)到對應的Queue上。比如現(xiàn)在有一個direct類型的Exchange,它下面綁定了三個Queue,Routing key分別是ORDER/GOODS/STOCK:
然后向該Exchange中發(fā)送一條消息,消息的Routing key是ORDER,那只有Routing key是ORDER的隊列有一條消息。(與fanout區(qū)別:fanout根據(jù)已綁定的交換機的隊列發(fā)送消息。direct當然也得綁定交換機,只不過再精確匹配到routingkey相等的隊列發(fā)送消息)
生產(chǎn)者代碼:
?????????/// <summary>
? ? ? ? /// MQ 直接交換機模式發(fā)消息(指定routingKey發(fā)送)
? ? ? ? /// </summary>
? ? ? ? /// <returns></returns>
? ? ? ? [HttpGet("SendDirectWorkerMQ")]????????public void SendDirectWorkerMQ()
? ? ? ? {
? ? ? ? ? ? #region 使用直接交換機模式
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? //direct只提供交換機名稱和routingkey即可,消費端只消費routingkey相匹配的
? ? ? ? ? ? ? ? ? ? string exchangeName = "direct_exchange";? ? ? ? ? ? ? ? ? ??var properties = channel.CreateBasicProperties();
? ? ? ? ? ? ? ? ? ? properties.Persistent = true; //設(shè)置數(shù)據(jù)的持久化,保證mq服務掛掉之后消息的安全性? ? ? ? ? ? ? ? ? ? for (int i = 1; i <= 10; i++)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes("hello mq" + i + "yellow");
? ? ? ? ? ? ? ? ? ? ? ? //這里綁定了交換機,同時綁定了routekey,就會發(fā)送到routekey是yellow的隊列中
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: "yellow", properties, body);? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }
消費者代碼:
/// <summary>
? ? ? ? /// 直接模式隊列消費消息
? ? ? ? /// 注意先啟動一次消費端會創(chuàng)建交換機、隊列、綁定。如果不啟動則消息丟失。也可以在生產(chǎn)端做這些創(chuàng)建和綁定
? ? ? ? /// </summary>
? ? ? ? public static void DirectReceiveMessage()
? ? ? ? {
? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();
? ? ? ? ? ? //創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();? ? ? ? ? ? //創(chuàng)建交換機
? ? ? ? ? ? channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
? ? ? ? ? ? //創(chuàng)建隊列
? ? ? ? ? ? string queueName1 = "directWorkQueue1";//隊列名稱
? ? ? ? ? ? string queueName2 = "directWorkQueue2";
? ? ? ? ? ? string queueName3 = "directWorkQueue3";
? ? ? ? ? ? channel.QueueDeclare(queue: queueName1,//隊列名
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: false,//是否持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,//排它性
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,//一旦客戶端連接斷開則自動刪除queue
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);//如果安裝了隊列優(yōu)先級插件則可以設(shè)置優(yōu)先級
? ? ? ? ? ? channel.QueueDeclare(queueName2, false, false, false, null);
? ? ? ? ? ? channel.QueueDeclare(queueName3, false, false, false, null);? ? ? ? ? ? //多個隊列綁定到fanout_exchange交換機(似發(fā)布訂閱)
? ? ? ? ? ? channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
? ? ? ? ? ? channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
? ? ? ? ? ? channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");? ? ? ? ? ? //聲明消費者
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);? ? ? ? ? ? //對消費端進行限流:
? ? ? ? ? ? //首先第一步,我們既然要使用消費端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 falsechannel.basicConsume(queueName, false, consumer);
? ? ? ? ? ? //第二步我們來設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
? ? ? ? ? ? //第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應為 truechannel.basicAck(envelope.getDeliveryTag(), true);
? ? ? ? ? ? //prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);? ? ? ? ? ? //消費者處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");? ? ? ? ? ? ? ? //消費完成后需要手動手動簽收消息,如果不寫該代碼就容易導致重復消費問題
? ? ? ? ? ? ? ? //可以降低每次簽收性能損耗。參數(shù)multiple:false就是單個手動簽收,true就是批量簽收,比如消費30條消息后再確認簽收
? ? ? ? ? ? ? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
? ? ? ? ? ? };? ? ? ? ? ? //消息的簽收模式
? ? ? ? ? ? //手動簽收:保證正確消費,不會丟消息(基于客戶端而已)
? ? ? ? ? ? //自動簽收:容易丟失消息
? ? ? ? ? ? channel.BasicConsume(queueName1, //消費隊列2的消息
? ? ? ? ? ? ? ? autoAck: false, //代表要手動簽收,因可能會出現(xiàn)確認簽收了然后宕機了導致沒有執(zhí)行事件,造成消息丟失。解決方案:手動簽收操作寫在了隊列事件完成后。
? ? ? ? ? ? ? ? consumer: consumer);? ? ? ? }
5、topic主題模式也叫通配符模式(路由模式的一種)
根據(jù)通配符模糊匹配,將消息交給符合routing pattern(路由模式)的隊列。
它與direct相比,都是可以根據(jù)routingkey把消息路由到不同的隊列。只不過topic類型exchange可以讓隊列在綁定routingkey的時候使用通配符。
routingkey一般都是有一個或多個單詞組成,多個單詞以“.”分割,例如:“item.insert”。通配符匹配規(guī)則“#”可以匹配一個或多個單詞,“*”只能匹配1個單詞,例如:“item.#”可以匹配“item.insert.asd”或者“item.insert”,“item.*”只能匹配到“item.insert”。
生產(chǎn)者代碼:
public void SendTopicWorkerMQ()
? ? ? ? {
? ? ? ? ? ? #region 使用topic交換機模式
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? //topic只提供交換機名稱和routingkey即可,消費端只消費與routingkey通配符匹配的
? ? ? ? ? ? ? ? ? ? string exchangeName = "topic_exchange";? ? ? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ? ? string routingKey1 = "user.america";
? ? ? ? ? ? ? ? ? ? string routingKey2 = "user.china";
? ? ? ? ? ? ? ? ? ? string routingKey3 = "user.china.beijing";
? ? ? ? ? ? ? ? ? ? string routingKey4 = "user.china.beijing.changping";? ? ? ? ? ? ? ? ? ? var properties = channel.CreateBasicProperties();
? ? ? ? ? ? ? ? ? ? properties.Persistent = true; //設(shè)置數(shù)據(jù)的持久化,保證mq服務掛掉之后消息的安全性
? ? ? ? ? ? ? ? ? ? for (int i = 1; i <= 10; i++)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes("hello mq" + i + "topic");
? ? ? ? ? ? ? ? ? ? ? ? //傳4個不同的routingkey過去,消費者會根據(jù)通配符匹配并消費(好像不能在生產(chǎn)者寫通配符)
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: routingKey1, properties , body);
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: routingKey2, properties , body);
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: routingKey3, properties , body);
? ? ? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: routingKey4, properties , body);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }
消費者代碼:
/// <summary>
? ? ? ? /// 主題模式隊列消費消息
? ? ? ? /// 注意先啟動一次消費端會創(chuàng)建交換機、隊列、綁定。如果不啟動則消息丟失。也可以在生產(chǎn)端做這些創(chuàng)建和綁定
? ? ? ? /// </summary>
? ? ? ? public static void TopicReceiveMessage()
? ? ? ? {
? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();
? ? ? ? ? ? //創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();? ? ? ? ? ? //創(chuàng)建交換機
? ? ? ? ? ? channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
? ? ? ? ? ? string exchangeName = "topic_exchange";
? ? ? ? ? ? //創(chuàng)建隊列
? ? ? ? ? ? string queueName1 = "topicWorkQueue1";//隊列名稱
? ? ? ? ? ? string queueName2 = "topicWorkQueue2";
? ? ? ? ? ? string queueName3 = "topicWorkQueue3";
? ? ? ? ? ? channel.QueueDeclare(queue: queueName1,//隊列名
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: false,//是否持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,//排它性
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,//一旦客戶端連接斷開則自動刪除queue
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);//如果安裝了隊列優(yōu)先級插件則可以設(shè)置優(yōu)先級
? ? ? ? ? ? channel.QueueDeclare(queueName2, false, false, false, null);
? ? ? ? ? ? channel.QueueDeclare(queueName3, false, false, false, null);? ? ? ? ? ? //多個隊列綁定到fanout_exchange交換機
? ? ? ? ? ? channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "user.*.*");//匹配例如:user.a.b
? ? ? ? ? ? channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "user.*"); ?//匹配例如:user.a
? ? ? ? ? ? channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "user.#"); ?//匹配例如:user...... (user. 后面是啥都行)? ? ? ? ? ? //聲明消費者
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);? ? ? ? ? ? //對消費端進行限流:
? ? ? ? ? ? //首先第一步,我們既然要使用消費端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 channel.basicConsume(queueName, false, consumer);
? ? ? ? ? ? //第二步我們來設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
? ? ? ? ? ? //第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應為 channel.basicAck(envelope.getDeliveryTag(), true);
? ? ? ? ? ? //prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);? ? ? ? ? ? //消費者處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");? ? ? ? ? ? ? ? //消費完成后需要手動手動簽收消息,如果不寫該代碼就容易導致重復消費問題
? ? ? ? ? ? ? ? //可以降低每次簽收性能損耗。參數(shù)multiple:false就是單個手動簽收,true就是批量簽收,比如消費30條消息后再確認簽收
? ? ? ? ? ? ? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
? ? ? ? ? ? };? ? ? ? ? ? //消息的簽收模式
? ? ? ? ? ? //手動簽收:保證正確消費,不會丟消息(基于客戶端而已)
? ? ? ? ? ? //自動簽收:容易丟失消息
? ? ? ? ? ? channel.BasicConsume(queueName2, //消費隊列2的消息(可以手動替換其他隊列消費)
? ? ? ? ? ? ? ? autoAck: false, //代表要手動簽收,因可能會出現(xiàn)確認簽收了然后宕機了導致沒有執(zhí)行事件,造成消息丟失。解決方案:手動簽收操作寫在了隊列事件完成后。
? ? ? ? ? ? ? ? consumer: consumer);? ? ? ? }
6、header 參數(shù)匹配模式
header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊列,Headers?類型的交換器性能會很差,所以這種類型不常用。
以上注意:Exchange(交換機):只負責轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息將會丟失??!
7、延時隊列(插件方式實現(xiàn))
實現(xiàn)延遲隊列有兩種方法
1、TTL+DLX需要創(chuàng)建死信交換機綁定隊列,需創(chuàng)建多個交換機多個隊列,復雜麻煩所以不推薦。
2、推薦使用rabbitmq_delayed_message_exchange 插件實現(xiàn),下面來實現(xiàn)一下:
①插件下載網(wǎng)址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
注意:一定要下載與自己mq版本同一個版本號,不然下面開啟插件會報錯
比如我的mq版本是 3.9.11的,那我就下載rabbitmq_delayed_message_exchange-3.9.0.ez 即可
下載之后想辦法上傳到linux上。
或者直接在linux上面下載:
#linux下載插件的命令:(注:選擇屬于自己的版本號,f12指針查看按鈕鏈接獲取下載地址)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
②下載完畢后,使用命令復制到docker內(nèi)rabbitmq容器的plugins文件夾下
查看mq的位置: whereis rabbitmq
查看運行中的容器命令:
docker ps
docker的復制命令:
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez 0e6e229cc6f2:/plugins(0e6e229cc6f2是容器id,plugins是mq容器內(nèi)的文件夾)
③然后進入容器
docker exec -it?0e6e229cc6f2 bash
退出容器:
exit
或者按Ctrl+P+Q進行退出容器
④進入之后直接進入plugins文件夾下看看復制進去沒有?
cd /plugins
ls
記住只能有一個?rabbitmq_delayed_message_exchange-3.9.0.ez文件,不能放多個版本否則報錯
⑤啟用rabbitmq_delayed_message_exchange插件
開啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
啟用之后可以看到mq可視化頁面,交換機類型多了一個 x-delayed-message
⑥然后就是代碼實現(xiàn):
生產(chǎn)者代碼:
public void SendDelayedWorkerMQ()
? ? ? ? {
? ? ? ? ? ? #region 使用延時交換機模式
? ? ? ? ? ? using (var connection = RabbitMQHelper.GetMQConnection())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //創(chuàng)建通信管道
? ? ? ? ? ? ? ? using (var channel = connection.CreateModel())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? string exchangeName = "delayed_exchange";//delayed需提供交換機名稱
? ? ? ? ? ? ? ? ? ? string queueName = "delay_WorkQueue";? ? ? ? ? ? ? ? ? ? #region 消費端做交換機和隊列的創(chuàng)建和綁定
? ? ? ? ? ? ? ? ? ? #endregion
? ? ? ? ? ? ? ? ? ? var properties = channel.CreateBasicProperties();
? ? ? ? ? ? ? ? ? ? properties.Persistent = true; //設(shè)置數(shù)據(jù)的持久化,保證mq服務掛掉之后消息的安全性
? ? ? ? ? ? ? ? ? ? //properties.Priority = 9;//消息的優(yōu)先級 ?值越大 優(yōu)先級越高 0~9
? ? ? ? ? ? ? ? ? ? //延時時間從header賦值
? ? ? ? ? ? ? ? ? ? Dictionary<string, object> headers = new Dictionary<string, object>();
? ? ? ? ? ? ? ? ? ? headers.Add("x-delay", 10000);
? ? ? ? ? ? ? ? ? ? properties.Headers = headers;? ? ? ? ? ? ? ? ? ? var body = Encoding.UTF8.GetBytes("生產(chǎn)者發(fā)送時間:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
? ? ? ? ? ? ? ? ? ? //發(fā)送延時消息
? ? ? ? ? ? ? ? ? ? channel.BasicPublish(exchange: exchangeName, routingKey: queueName, properties, body);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? #endregion
? ? ? ? }
消費者代碼:
/// <summary>
? ? ? ? /// 延遲交換機模式隊列消費消息
? ? ? ? /// 注意先啟動一次消費端會創(chuàng)建交換機、隊列、綁定。如果不啟動則消息丟失。也可以在生產(chǎn)端做這些創(chuàng)建和綁定
? ? ? ? /// </summary>
? ? ? ? public static void DelayedReceiveMessage()
? ? ? ? {? ? ? ? ? ? var connection = RabbitMQHelper.GetMQConnection();
? ? ? ? ? ? //創(chuàng)建管道
? ? ? ? ? ? var channel = connection.CreateModel();
? ? ? ? ? ? string queueName = "delay_WorkQueue";//隊列名稱
? ? ? ? ? ? string exchangeName = "delayed_exchange";//隊列名稱? ? ? ? ? ? Dictionary<string, object> args = new Dictionary<string, object>();
? ? ? ? ? ? args.Add("x-delayed-type", "direct"); //x-delayed-type必須加(這個創(chuàng)建的是交換機類型)? ? ? ? ? ? //創(chuàng)建交換機
? ? ? ? ? ? channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", durable: true, autoDelete: false, arguments: args);
? ? ? ? ? ? //創(chuàng)建隊列? ? ? ? ? ? channel.QueueDeclare(queue: queueName,//隊列名
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: true,//隊列是否持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,//是否為單消費者隊列,為True時,只能由單一消費者消費
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,//是否自動刪除隊列,當消費者全部斷開時,隊列自動刪除
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);//高級特性? ? ? ? ? ? //多個隊列綁定到delayed_exchange交換機(似發(fā)布訂閱)
? ? ? ? ? ? channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName);? ? ? ? ? ? //聲明消費者
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);? ? ? ? ? ? //對消費端進行限流:
? ? ? ? ? ? //首先第一步,我們既然要使用消費端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 channel.basicConsume(queueName, false, consumer);
? ? ? ? ? ? //第二步我們來設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
? ? ? ? ? ? //第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應為 channel.basicAck(envelope.getDeliveryTag(), true);
? ? ? ? ? ? //prefetchCount:1意思是當前worker在當前消息未消費確認時,不會再往這個worker中再次發(fā)送(可以根據(jù)不通服務器負載能力來分配)
? ? ? ? ? ? //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);? ? ? ? ? ? //消費者處理的事件
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //業(yè)務邏輯處理
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(ea.Body.ToArray());
? ? ? ? ? ? ? ? Console.WriteLine(message);
? ? ? ? ? ? ? ? Console.WriteLine($"消費者消費時間:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));? ? ? ? ? ? ? ? //消費完成后需要手動手動簽收消息,如果不寫該代碼就容易導致重復消費問題
? ? ? ? ? ? ? ? //可以降低每次簽收性能損耗。參數(shù)multiple:false就是單個手動簽收,true就是批量簽收,比如消費30條消息后再確認簽收
? ? ? ? ? ? ? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
? ? ? ? ? ? };? ? ? ? ? ? //消費消息
? ? ? ? ? ? channel.BasicConsume(queueName, //隊列名
? ? ? ? ? ? ? ? autoAck: false, //確認消費
? ? ? ? ? ? ? ? consumer: consumer);
? ? ? ? }
最終啟動程序可以看到消費者端輸出的結(jié)果計算精準的相差10秒!
參考資料:
持久化、優(yōu)先級、高級特性:
NetCore RabbitMQ高級特性 持久化 及 消息優(yōu)先級 - 天才臥龍 - 博客園
延遲和死信隊列:https://www.jb51.net/article/221796.htm文章來源:http://www.zghlxwxcb.cn/news/detail-406116.html
NetCore RabbitMQ 高級特性 消息存活周期TTL、死信交換機/死信對列DLX,延遲隊列,及冪等性的保障 - 天才臥龍 - 博客園文章來源地址http://www.zghlxwxcb.cn/news/detail-406116.html
到了這里,關(guān)于.NetCore 使用 RabbitMQ (交換機/隊列/消息持久化+mq高級特性+死信隊列+延遲隊列)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!