RabbitMQ延時隊(duì)列和死信隊(duì)列
延時隊(duì)列和死信隊(duì)列
延時隊(duì)列是RabbitMQ中的一種特殊隊(duì)列,它可以在消息到達(dá)隊(duì)列后延遲一段時間再被消費(fèi)。
延時隊(duì)列的實(shí)現(xiàn)原理是通過使用消息的過期時間和死信隊(duì)列來實(shí)現(xiàn)。當(dāng)消息被發(fā)送到延時隊(duì)列時,可以為消息設(shè)置一個過期時間,這個過期時間決定了消息在延時隊(duì)列中等待的時間。如果消息在過期時間內(nèi)沒有被消費(fèi)者消費(fèi),則會被自動發(fā)送到一個預(yù)先指定的死信隊(duì)列中。
在RabbitMQ中,延時隊(duì)列的實(shí)現(xiàn)可以通過以下步驟來完成:
- 創(chuàng)建一個普通的隊(duì)列作為延時隊(duì)列,設(shè)置x-message-ttl參數(shù)為消息的過期時間。
- 創(chuàng)建一個死信隊(duì)列,用于接收延時隊(duì)列中過期的消息。
- 將延時隊(duì)列設(shè)置為普通隊(duì)列的死信交換機(jī),并指定死信路由鍵。
- 將消費(fèi)者綁定到死信隊(duì)列,以消費(fèi)延時隊(duì)列中過期的消息。
使用場景
- 訂單在十分鐘內(nèi)未支付則自動取消。
- 新創(chuàng)建的店鋪,如果十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒。
- 賬單在一周內(nèi)未支付,則自動結(jié)算。
- 用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運(yùn)營人員。
- 還有很多場景就不一一例舉了。
TTL設(shè)置
方式一:
創(chuàng)建隊(duì)列時設(shè)置x-message-ttl的屬性,所有被投遞到該隊(duì)列的消息最多都不會超過60s
var args = new Dictionary<string,object>();
args.Add("x-message-ttl",60000); //單位為毫秒
model.QueueDeclare("myqueue",false,false,false,args);
方式二:
為每條消息設(shè)置TTL,為每條消息設(shè)置過期時間。
IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "60000"
model.BasicPublic(exchangeName,routingKey,props,messageBodyBytes);
代碼實(shí)踐
模擬支付業(yè)務(wù)
整個項(xiàng)目由三部分組成
- Web API項(xiàng)目:用于發(fā)送訂單請求,生產(chǎn)者。
- 控制臺項(xiàng)目一:用于處理訂單支付,延時隊(duì)列。
- 控制臺項(xiàng)目二:用于處理超時未支付的訂單,死信隊(duì)列。
Web API項(xiàng)目
訂單類,就簡單的寫一個用于演示,真實(shí)業(yè)務(wù)肯定不是這樣~
public class OrderDto
{
/// <summary>
/// 訂單名稱
/// </summary>
public string Name { get; set; }
/// <summary>
/// 訂單狀態(tài)
/// 0 未支付
/// 1 已支付
/// 2 超時
/// </summary>
public int Status { get; set; }
}
控制器
[ApiController]
[Route("api/orders")]
public class OrdersController : ControllerBase
{
private readonly IOrderService _orderService;
public OrdersController(IOrderService orderService)
{
_orderService = orderService;
}
[HttpPost]
public IActionResult CreateOrder([FromBody] OrderDto orderDto)
{
// 處理訂單邏輯
_orderService.ProcessOrder(orderDto);
return Ok();
}
}
訂單服務(wù)
public interface IOrderService
{
void ProcessOrder(OrderDto orderDto);
}
public class OrderService : IOrderService
{
private readonly RabbitMQConnectionFactory _connectionFactory;
public OrderService(RabbitMQConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public void ProcessOrder(OrderDto orderDto)
{
using (var channel = _connectionFactory.CreateChannel())
{
var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>
{
{ "x-delay", 1000 * 20 } // 設(shè)置20秒延時
};
var message = JsonConvert.SerializeObject(orderDto);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("delayed_exchange", "routing_key", properties, body);
}
}
}
支付處理項(xiàng)目
ProcessPay類,用于接收訂單消息
public class ProcessPay : IHostedService
{
private readonly ConnectionFactory _factory;
private IConnection _connection;
private IModel _channel;
public ProcessPay()
{
_factory = new ConnectionFactory()
{
HostName = "ip",
Port = 5672,
UserName = "用戶名",
Password = "密碼"
};
}
public Task StartAsync(CancellationToken cancellationToken)
{
Console.WriteLine(" Press [enter] to exit.");
_connection = _factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare("delayed_exchange", ExchangeType.Direct, true, false, null);
//關(guān)鍵代碼,綁定死信交換機(jī)
var arguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "dead_letter_exchange" },
{ "x-dead-letter-routing-key", "dead_letter_routing_key" }
};
_channel.QueueDeclare("delayed_queue", true, false, false, arguments);
_channel.QueueBind("delayed_queue", "delayed_exchange", "routing_key");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// 處理支付邏輯
var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
Console.WriteLine($"訂單信息:{orderDto.Name}");
Console.WriteLine("請輸入價格(模擬支付):");
// 超時未支付
string? many = "";
// 支付處理
Console.WriteLine("請輸入:");
// 超時未支付進(jìn)行處理
Task.Factory.StartNew(() =>
{
many = Console.ReadLine();
Console.WriteLine($"many:{many}");
}).Wait(20 * 1000);
if (string.Equals(many, "100"))
{
orderDto.Status = 1;
Console.WriteLine("支付完成");
_channel.BasicAck(ea.DeliveryTag, true);
}
else
{
//重試幾次依然失敗
Console.WriteLine("等待一定時間內(nèi)失效超時未支付的訂單");
_channel.BasicNack(ea.DeliveryTag, false, false);
}
};
_channel.BasicConsume("delayed_queue", false, consumer);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_channel?.Close();
_connection?.Close();
_channel?.Dispose();
_connection?.Dispose();
return Task.CompletedTask;
}
}
在Main方法中使用單例模式注冊該服務(wù),當(dāng)然直接將代碼寫在Main方法也是沒有問題的,只不過這種方式方便管理。
static void Main(string[] args)
{
var host = new HostBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<IHostedService,ProcessPay>();
})
.Build();
host.Run();
}
支付超時項(xiàng)目
創(chuàng)建一個死信隊(duì)列服務(wù),用于訂閱死信隊(duì)列中的訂單消息,這里我就直接把代碼寫在Main方法中了
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("dead_letter_exchange", ExchangeType.Direct, true, false, null);
channel.QueueDeclare("dead_letter_queue", true, false, false, null);
channel.QueueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// 處理超時未支付的訂單邏輯
var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
orderDto.Status = 2;
Console.WriteLine($"訂單信息:{orderDto.Name},{orderDto.Status}");
Console.WriteLine("超時未支付");
channel.BasicAck(ea.DeliveryTag, true);
};
channel.BasicConsume("dead_letter_queue", false, consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
效果展示
代碼看不出效果,直接上圖。
首先是3個項(xiàng)目各自運(yùn)行效果圖
然后演示正常消費(fèi)效果
接下來是超時未支付效果
結(jié)尾
這就是一個簡單的延時隊(duì)列和死信隊(duì)列的代碼,模擬了支付超時的場景,這里的數(shù)據(jù)都寫死了的,真實(shí)運(yùn)用的時候肯定是中數(shù)據(jù)庫中獲取,修改數(shù)據(jù)庫實(shí)體的值。然后死信隊(duì)列是用于處理在一定時間內(nèi)未被處理的消息,死信交換機(jī)也只是一個普通的交換機(jī),只不過他是用于處理超時的消息的交換機(jī)。
對于RabbitMQ的文章基本就結(jié)束了,可能還會有一篇RabbitMQ集群搭建的文章,但不是很想去寫,最近太懶了~文章來源:http://www.zghlxwxcb.cn/news/detail-615465.html
有問題歡迎指出,活到老學(xué)到老~文章來源地址http://www.zghlxwxcb.cn/news/detail-615465.html
RabbitMQ系列文章
- ZY知識庫 · ZY - 在.NET Core中使用RabbitMQ (pljzy.top)
參考資料
- 【【2021最新.NET/C#】RabbitMQ從零到高可用集群】 https://www.bilibili.com/video/BV1GU4y1w7Yq/?p=10&share_source=copy_web&vd_source=fce337a51d11a67781404c67ec0b5084
到了這里,關(guān)于.NET中使用RabbitMQ延時隊(duì)列和死信隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!