国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

.NET中使用RabbitMQ延時隊(duì)列和死信隊(duì)列

這篇具有很好參考價值的文章主要介紹了.NET中使用RabbitMQ延時隊(duì)列和死信隊(duì)列。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

RabbitMQ延時隊(duì)列和死信隊(duì)列

延時隊(duì)列和死信隊(duì)列

延時隊(duì)列是RabbitMQ中的一種特殊隊(duì)列,它可以在消息到達(dá)隊(duì)列后延遲一段時間再被消費(fèi)。

延時隊(duì)列的實(shí)現(xiàn)原理是通過使用消息的過期時間和死信隊(duì)列來實(shí)現(xiàn)。當(dāng)消息被發(fā)送到延時隊(duì)列時,可以為消息設(shè)置一個過期時間,這個過期時間決定了消息在延時隊(duì)列中等待的時間。如果消息在過期時間內(nèi)沒有被消費(fèi)者消費(fèi),則會被自動發(fā)送到一個預(yù)先指定的死信隊(duì)列中。

在RabbitMQ中,延時隊(duì)列的實(shí)現(xiàn)可以通過以下步驟來完成:

  1. 創(chuàng)建一個普通的隊(duì)列作為延時隊(duì)列,設(shè)置x-message-ttl參數(shù)為消息的過期時間。
  2. 創(chuàng)建一個死信隊(duì)列,用于接收延時隊(duì)列中過期的消息。
  3. 將延時隊(duì)列設(shè)置為普通隊(duì)列的死信交換機(jī),并指定死信路由鍵。
  4. 將消費(fèi)者綁定到死信隊(duì)列,以消費(fèi)延時隊(duì)列中過期的消息。

使用場景

  1. 訂單在十分鐘內(nèi)未支付則自動取消。
  2. 新創(chuàng)建的店鋪,如果十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒。
  3. 賬單在一周內(nèi)未支付,則自動結(jié)算。
  4. 用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運(yùn)營人員。
  5. 還有很多場景就不一一例舉了。

TTL設(shè)置

方式一:

創(chuàng)建隊(duì)列時設(shè)置x-message-ttl的屬性,所有被投遞到該隊(duì)列的消息最多都不會超過60s

var args = new Dictionary<string,object>();
args.Add("x-message-ttl",60000); //單位為毫秒
model.QueueDeclare("myqueue",false,false,false,args);

方式二:

為每條消息設(shè)置TTL,為每條消息設(shè)置過期時間。

IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "60000"
model.BasicPublic(exchangeName,routingKey,props,messageBodyBytes);

代碼實(shí)踐

模擬支付業(yè)務(wù)

整個項(xiàng)目由三部分組成

  • Web API項(xiàng)目:用于發(fā)送訂單請求,生產(chǎn)者。
  • 控制臺項(xiàng)目一:用于處理訂單支付,延時隊(duì)列。
  • 控制臺項(xiàng)目二:用于處理超時未支付的訂單,死信隊(duì)列。

Web API項(xiàng)目

訂單類,就簡單的寫一個用于演示,真實(shí)業(yè)務(wù)肯定不是這樣~

public class OrderDto
{
    /// <summary>
    /// 訂單名稱
    /// </summary>
    public string Name { get; set; }
    /// <summary>
    /// 訂單狀態(tài)
    /// 0 未支付
    /// 1 已支付
    /// 2 超時
    /// </summary>
    public int Status { get; set; }
}

控制器

[ApiController]
[Route("api/orders")]
public class OrdersController : ControllerBase
{
    private readonly IOrderService _orderService;

    public OrdersController(IOrderService orderService)
    {
        _orderService = orderService;
    }

    [HttpPost]
    public IActionResult CreateOrder([FromBody] OrderDto orderDto)
    {
        // 處理訂單邏輯
        _orderService.ProcessOrder(orderDto);
        return Ok();
    }
}

訂單服務(wù)

public interface IOrderService
{
    void ProcessOrder(OrderDto orderDto);
}

public class OrderService : IOrderService
{
    private readonly RabbitMQConnectionFactory _connectionFactory;
    
    public OrderService(RabbitMQConnectionFactory connectionFactory)
    {
        _connectionFactory = connectionFactory;
    }

    public void ProcessOrder(OrderDto orderDto)
    {
        using (var channel = _connectionFactory.CreateChannel())
        {
            var properties = channel.CreateBasicProperties();
            properties.Headers = new Dictionary<string, object>
            {
                { "x-delay", 1000 * 20  } // 設(shè)置20秒延時
            };

            var message = JsonConvert.SerializeObject(orderDto);
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish("delayed_exchange", "routing_key", properties, body);
        }
    }
}

支付處理項(xiàng)目

ProcessPay類,用于接收訂單消息

public class ProcessPay : IHostedService
{
    private readonly ConnectionFactory _factory;
    private IConnection _connection;
    private IModel _channel;

    public ProcessPay()
    {
        _factory = new ConnectionFactory()
        {
            HostName = "ip",
            Port = 5672,
            UserName = "用戶名",
            Password = "密碼"
        };
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        Console.WriteLine(" Press [enter] to exit.");
        _connection = _factory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.ExchangeDeclare("delayed_exchange", ExchangeType.Direct, true, false, null);
        //關(guān)鍵代碼,綁定死信交換機(jī)
        var arguments = new Dictionary<string, object>
        {
            { "x-dead-letter-exchange", "dead_letter_exchange" },
            { "x-dead-letter-routing-key", "dead_letter_routing_key" }
        };
        _channel.QueueDeclare("delayed_queue", true, false, false, arguments);
        _channel.QueueBind("delayed_queue", "delayed_exchange", "routing_key");

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            // 處理支付邏輯
            var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
            Console.WriteLine($"訂單信息:{orderDto.Name}");
            Console.WriteLine("請輸入價格(模擬支付):");

            // 超時未支付
            string? many = "";
            // 支付處理
            Console.WriteLine("請輸入:");
            // 超時未支付進(jìn)行處理
            Task.Factory.StartNew(() =>
            {
                many = Console.ReadLine();
                Console.WriteLine($"many:{many}");
            }).Wait(20 * 1000);
            if (string.Equals(many, "100"))
            {
                orderDto.Status = 1;
                Console.WriteLine("支付完成");
                _channel.BasicAck(ea.DeliveryTag, true);
            }
            else
            {
                //重試幾次依然失敗
                Console.WriteLine("等待一定時間內(nèi)失效超時未支付的訂單");
                _channel.BasicNack(ea.DeliveryTag, false, false);
            }
        };

        _channel.BasicConsume("delayed_queue", false, consumer);

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _channel?.Close();
        _connection?.Close();
        _channel?.Dispose();
        _connection?.Dispose();

        return Task.CompletedTask;
    }
}

在Main方法中使用單例模式注冊該服務(wù),當(dāng)然直接將代碼寫在Main方法也是沒有問題的,只不過這種方式方便管理。

static void Main(string[] args)
{
    var host = new HostBuilder()
        .ConfigureServices((hostContext, services) =>
                           {
                               services.AddSingleton<IHostedService,ProcessPay>();
                           })
        .Build();

    host.Run();
}

支付超時項(xiàng)目

創(chuàng)建一個死信隊(duì)列服務(wù),用于訂閱死信隊(duì)列中的訂單消息,這里我就直接把代碼寫在Main方法中了

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("dead_letter_exchange", ExchangeType.Direct, true, false, null);

        channel.QueueDeclare("dead_letter_queue", true, false, false, null);

        channel.QueueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");
        
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            // 處理超時未支付的訂單邏輯
            var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
            orderDto.Status = 2;
            Console.WriteLine($"訂單信息:{orderDto.Name},{orderDto.Status}");
            Console.WriteLine("超時未支付");

            channel.BasicAck(ea.DeliveryTag, true);
        };

        channel.BasicConsume("dead_letter_queue", false, consumer);
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

效果展示

代碼看不出效果,直接上圖。

首先是3個項(xiàng)目各自運(yùn)行效果圖

.NET中使用RabbitMQ延時隊(duì)列和死信隊(duì)列

然后演示正常消費(fèi)效果

.NET中使用RabbitMQ延時隊(duì)列和死信隊(duì)列

接下來是超時未支付效果

.NET中使用RabbitMQ延時隊(duì)列和死信隊(duì)列

結(jié)尾

這就是一個簡單的延時隊(duì)列和死信隊(duì)列的代碼,模擬了支付超時的場景,這里的數(shù)據(jù)都寫死了的,真實(shí)運(yùn)用的時候肯定是中數(shù)據(jù)庫中獲取,修改數(shù)據(jù)庫實(shí)體的值。然后死信隊(duì)列是用于處理在一定時間內(nèi)未被處理的消息,死信交換機(jī)也只是一個普通的交換機(jī),只不過他是用于處理超時的消息的交換機(jī)。

對于RabbitMQ的文章基本就結(jié)束了,可能還會有一篇RabbitMQ集群搭建的文章,但不是很想去寫,最近太懶了~

有問題歡迎指出,活到老學(xué)到老~文章來源地址http://www.zghlxwxcb.cn/news/detail-615465.html

RabbitMQ系列文章

  • ZY知識庫 · ZY - 在.NET Core中使用RabbitMQ (pljzy.top)

參考資料

  • 【【2021最新.NET/C#】RabbitMQ從零到高可用集群】 https://www.bilibili.com/video/BV1GU4y1w7Yq/?p=10&share_source=copy_web&vd_source=fce337a51d11a67781404c67ec0b5084

到了這里,關(guān)于.NET中使用RabbitMQ延時隊(duì)列和死信隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 使用RabbitMQ死信隊(duì)列關(guān)閉未支付的訂單

    使用RabbitMQ死信隊(duì)列關(guān)閉未支付的訂單

    RabbitMQ死信隊(duì)列(Dead-Letter Exchange,簡稱DLX)是一種特殊類型的交換機(jī),用于處理在隊(duì)列中無法被消費(fèi)的消息。當(dāng)消息無法被消費(fèi)時,它會被轉(zhuǎn)發(fā)到死信隊(duì)列中,以便進(jìn)一步處理。 在RabbitMQ中,死信隊(duì)列通常用于處理以下情況: 消息無法被消費(fèi)者處理:例如,如果消費(fèi)者崩潰

    2024年02月11日
    瀏覽(21)
  • .NetCore 使用 RabbitMQ (交換機(jī)/隊(duì)列/消息持久化+mq高級特性+死信隊(duì)列+延遲隊(duì)列)

    .NetCore 使用 RabbitMQ (交換機(jī)/隊(duì)列/消息持久化+mq高級特性+死信隊(duì)列+延遲隊(duì)列)

    目錄 一、安裝mq 二、實(shí)操 1、簡單模式 2、工作模式 3、fanout扇形模式(發(fā)布訂閱) 4、direct路由模式也叫定向模式 5、topic主題模式也叫通配符模式(路由模式的一種) 6、header 參數(shù)匹配模式 7、延時隊(duì)列(插件方式實(shí)現(xiàn)) 參考資料: 1、我的環(huán)境是使用VMware安裝的Centos7系統(tǒng)。MQ部署

    2023年04月09日
    瀏覽(111)
  • 消息隊(duì)列中間件,RabbitMQ的使用,死信隊(duì)列,延遲隊(duì)列,利用枚舉實(shí)現(xiàn)隊(duì)列,交換機(jī),RountKey的聲明

    消息隊(duì)列中間件,RabbitMQ的使用,死信隊(duì)列,延遲隊(duì)列,利用枚舉實(shí)現(xiàn)隊(duì)列,交換機(jī),RountKey的聲明

    目錄 0.交換機(jī)種類和區(qū)別 1.聲明隊(duì)列和交換機(jī)以及RountKey 2.初始化循環(huán)綁定 3.聲明交換機(jī) 4.監(jiān)聽隊(duì)列 4.1 監(jiān)聽普通隊(duì)列 4.2監(jiān)聽死信隊(duì)列 ?5.削峰填谷的實(shí)現(xiàn) Direct Exchange(直連交換機(jī)) : 直連交換機(jī)將消息發(fā)送到與消息的路由鍵完全匹配的隊(duì)列。它是最簡單的交換機(jī)類型之一。

    2024年04月23日
    瀏覽(587)
  • 【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列

    【RabbitMQ】RabbitMQ高級:死信隊(duì)列和延遲隊(duì)列

    在電商平臺下單,訂單創(chuàng)建成功,等待支付,一般會給30分鐘的時間,開始倒計(jì)時。如果在這段時間內(nèi)用戶沒有支付,則默認(rèn)訂單取消。 該如何實(shí)現(xiàn)? 定期輪詢(數(shù)據(jù)庫等) 用戶下單成功,將訂單信息放入數(shù)據(jù)庫,同時將支付狀態(tài)放入數(shù)據(jù)庫,用戶付款更改數(shù)據(jù)庫狀態(tài)。定

    2024年01月17日
    瀏覽(18)
  • RabbitMQ-死信交換機(jī)和死信隊(duì)列

    RabbitMQ-死信交換機(jī)和死信隊(duì)列

    DLX: Dead-Letter-Exchange 死信交換器,死信郵箱 當(dāng)消息成為Dead message后,可以被重新發(fā)送到另一個交換機(jī),這個交換機(jī)就是DLX。 如下圖所示: 其實(shí)死信隊(duì)列就是一個普通的交換機(jī),有些隊(duì)列的消息成為死信后,(比如過期了或者隊(duì)列滿了)這些死信一般情況下是會被 RabbitMQ 清理

    2024年02月08日
    瀏覽(26)
  • RabbitMQ延遲隊(duì)列,死信隊(duì)列配置

    延遲和死信隊(duì)列的配置 延遲隊(duì)列有效期一分鐘,后進(jìn)入死信隊(duì)列,如果異常就進(jìn)入異常隊(duì)列 異常隊(duì)列配置類

    2024年02月14日
    瀏覽(28)
  • 【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    這篇文章,主要介紹消息隊(duì)列RabbitMQ之死信隊(duì)列。 目錄 一、RabbitMQ死信隊(duì)列 1.1、什么是死信隊(duì)列 1.2、設(shè)置過期時間TTL 1.3、配置死信交換機(jī)和死信隊(duì)列(代碼配置) (1)設(shè)置隊(duì)列過期時間 (2)設(shè)置單條消息過期時間 (3)隊(duì)列設(shè)置死信交換機(jī) (4)配置的基本思路 1.4、配置

    2024年02月16日
    瀏覽(95)
  • 【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列

    【RabbitMQ學(xué)習(xí)日記】——死信隊(duì)列與延遲隊(duì)列

    死信,顧名思義就是無法被消費(fèi)的消息,字面意思可以這樣理解,一般來說, producer 將消息投遞到 broker 或者直接到 queue 里了, consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時候 由于特定的原因?qū)е?queue 中的某些消息無法被消費(fèi) ,這樣的消息如果沒有后續(xù)的處理,就變成了死

    2024年02月06日
    瀏覽(22)
  • 【RabbitMQ教程】第五章 —— RabbitMQ - 死信隊(duì)列

    【RabbitMQ教程】第五章 —— RabbitMQ - 死信隊(duì)列

    ?????????????????????????????????????????????????????????????????? ?? 【 R a b b i t M Q 教 程 】 第 五 章 — — R a b b i t M Q ? 死 信 隊(duì) 列 color{#FF1493}{【RabbitMQ教程】第五章 —— RabbitMQ - 死信隊(duì)列} 【 R a b b i t M Q 教 程 】 第 五 章 — — R a

    2024年02月09日
    瀏覽(19)
  • RabbitMQ——死信隊(duì)列

    死信隊(duì)列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一種重要特性,用于處理無法被消費(fèi)的消息,防止消息丟失。 死信的來源 在消息隊(duì)列中,當(dāng)消息滿足一定條件而無法被正常消費(fèi)時,這些消息會被發(fā)送到死信隊(duì)列。滿足條件的情況包括但不限于: 消息被拒絕( basic.reject 或 bas

    2024年03月14日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包