国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

rabbitmq+springboot實現(xiàn)冪等性操作

這篇具有很好參考價值的文章主要介紹了rabbitmq+springboot實現(xiàn)冪等性操作。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

文章目錄

  • 1.場景描述
    • 1.1 場景1
    • 1.2 場景2
  • 2.原理
  • 3.實戰(zhàn)開發(fā)
    • 3.1 建表
    • 3.2 集成mybatis-plus
    • 3.3 集成RabbitMq
      • 3.3.1 安裝mq
      • 3.3.2 springBoot集成mq
    • 3.4 具體實現(xiàn)
      • 3.4.1 mq配置類
      • 3.4.2 生產(chǎn)者
      • 3.4.3 消費者

1.場景描述

消息中間件是分布式系統(tǒng)常用的組件,無論是異步化、解耦、削峰等都有廣泛的應用價值。我們通常會認為,消息中間件是一個可靠的組件——這里所謂的可靠是指,只要我把消息成功投遞到了消息中間件,消息就不會丟失,即消息肯定會至少保證消息能被消費者成功消費一次,這是消息中間件最基本的特性之一,也就是我們常說的“AT LEAST ONCE”,即消息至少會被“成功消費一遍”。

1.1 場景1

什么意思呢?舉個例子:一個消息M發(fā)送到了消息中間件,消息投遞到了消費程序A,A接受到了消息,然后進行消費,但在消費到一半的時候程序重啟了,這時候這個消息并沒有標記為消費成功,這個消息還會繼續(xù)投遞給這個消費者,直到其消費成功了,消息中間件才會停止投遞。
這種情景就會出現(xiàn)消息可能被多次地投遞。

1.2 場景2

還有一種場景是程序A接受到這個消息M并完成消費邏輯之后,正想通知消息中間件“我已經(jīng)消費成功了”的時候,程序就重啟了,那么對于消息中間件來說,這個消息并沒有成功消費過,所以他還會繼續(xù)投遞。這時候對于應用程序A來說,看起來就是這個消息明明消費成功了,但是消息中間件還在重復投遞。

以上兩個場景對于消息隊列來說就是同一個messageId的消息重復投遞下來了。

我們利用消息id來判斷消息是否已經(jīng)消費過,如果該信息被消費過,那么消息表中已經(jīng) 會有一條數(shù)據(jù),由于消費時會先執(zhí)行插入操作,此時會因為主鍵沖突無法重復插入,我們就利用這個原理來進行冪等的控制,消息內容可以用json格式來進行傳輸?shù)摹?/p>

3.實戰(zhàn)開發(fā)

3.1 建表

DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (
  `message_id` varchar(50) NOT NULL COMMENT '消息ID',
  `message_content` varchar(2000) DEFAULT NULL COMMENT '消息內容',
  `status` int DEFAULT '0' COMMENT '消費狀態(tài)(0-未消費成功;1-消費成功)',
  `retry_times` int DEFAULT '0' COMMENT '重試次數(shù)',
  `type` int DEFAULT '0' COMMENT '消費類型',
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.2 集成mybatis-plus

《springBoot集成mybatisPlus》

3.3 集成RabbitMq

3.3.1 安裝mq

推薦使用docker安裝rabbitmq,還未安裝的可以參考以下信息:

  • docker安裝

3.3.2 springBoot集成mq

  • 1.添加依賴
 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.4 生產(chǎn)者具體實現(xiàn)

3.4.1 mq配置類

  • DirectRabbitConfig
    具體如何開啟可以參考《rabbitMq實現(xiàn)死信隊列》
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {

    //正常交換機的名字
    public final static String  EXCHANGE\_NAME = "exchange\_name";
    //正常隊列的名字
    public final static String QUEUE\_NAME="queue\_name";
    //死信交換機的名字
    public final static String  EXCHANGE\_DEAD = "exchange\_dead";
    //死信隊列的名字
    public final static String QUEUE\_DEAD="queue\_dead";
    //死信路由key
    public final static String DEAD\_KEY="dead.key";




    //創(chuàng)建正常交換機
    @Bean(EXCHANGE\_NAME)
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)
                //持久化 mq重啟后數(shù)據(jù)還在
                .durable(true)
                .build();
    }



    //創(chuàng)建正常隊列
    @Bean(QUEUE\_NAME)
    public Queue queue(){
        //正常隊列和死信進行綁定 轉發(fā)到 死信隊列,配置參數(shù)
        Map<String,Object>map=getMap();
        return new Queue(QUEUE\_NAME,true,false,false,map);
    }

    //正常隊列綁定正常交換機 設置規(guī)則 執(zhí)行綁定 定義路由規(guī)則 requestmaping映射
    @Bean
    public Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,
                           @Qualifier(EXCHANGE\_NAME) Exchange exchange){
        return BindingBuilder.bind(queue)
                .to(exchange)
                //路由規(guī)則
                .with("app.#")
                .noargs();
    }

    //創(chuàng)建死信隊列
    @Bean(QUEUE\_DEAD)
    public Queue queueDead(){
        return new Queue(QUEUE\_DEAD,true,false,false);
    }

    //創(chuàng)建死信交換機
    @Bean(EXCHANGE\_DEAD)
    public Exchange exchangeDead(){
        return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD)
                .durable(true) //持久化 mq重啟后數(shù)據(jù)還在
                .build();
    }


    //綁定死信隊列和死信交換機
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(queueDead())
                .to(exchangeDead())
                //路由規(guī)則 正常路由key
                .with(DEAD\_KEY)
                .noargs();
    }

    /\*\*
      獲取死信的配置信息
     \*
     \*\*/
    public Map<String,Object>getMap(){
        //3種方式 任選其一,選擇其他方式之前,先把交換機和隊列刪除了,在啟動項目,否則報錯。
        //方式一
        Map<String,Object> map=new HashMap<>(16);
        //死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;
        map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);
        //死信消息路由鍵,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設置,則使用消息的原來的路由鍵值
        map.put("x-dead-letter-routing-key", DEAD\_KEY);
        //方式二
        //消息的過期時間,單位:毫秒;達到時間 放入死信隊列
        // map.put("x-message-ttl",5000);
        //方式三
        //隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;放入死信隊列一條數(shù)據(jù)
        // map.put("x-max-length",3);
        return map;
    }


}
  • 延遲隊列配置
    具體如何開啟可以參考《rabbitMq實現(xiàn)死信隊列》

由于rabbitMq中不直接支持死信隊列,需要我們利用插件rabbitmq_delayed_messgae_exchage進行開啟

/**
 * 定義延遲交換機
 */
@Configuration
public class RabbitMQDelayedConfig {
    //隊列
    private static final String DELAYQUEUE = "delayedqueue";
    //交換機
    private static final String DELAYEXCHANGE = "delayedExchange";
    @Bean
    public Queue delayqueue(){return new Queue(DELAYQUEUE);}
    //自定義延遲交換機
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");
        /**
         * 1、交換機名稱
         * 2、交換機類型
         * 3、是否需要持久化
         * 4、是否需要自動刪除
         * 5、其他參數(shù)
         */
        return new CustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);
    }
    //綁定隊列和延遲交換機
    @Bean
    public Binding delaybinding(){
        return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();
    }
}

3.4.2 生產(chǎn)者

  • 1.消費隊列的生產(chǎn)者
import com.example.shop.config.RabbitmqConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class Sender_Direct {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 用于消費訂單
     *
     * @param orderId
     */
    public void send2Direct(String orderId) {
        //創(chuàng)建消費對象,并指定全局唯一ID(這里使用UUID,也可以根據(jù)業(yè)務規(guī)則生成,只要保證全局唯一即可)
        MessageProperties messageProperties = new MessageProperties();
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, "內容設置",  message -> {
            //設置消息的id為唯一
            messageProperties.setMessageId(UUID.randomUUID().toString());
            messageProperties.setContentType("text/plain");
            messageProperties.setContentEncoding("utf-8");
            message.getMessageProperties().setMessageId(orderId);
            return message;
        });
    }

}

3.4.3 消費者

1.開啟手動ack配置

spring:
  application:
    name: shop
  rabbitmq:
    host: 192.168.1.102
    port: 5673
    virtual-host: /
    username: guest
    password: guest
    listener:
      simple:
        # 表示消費者消費成功消息以后需要手工的進行簽收(ack確認),默認為 auto
        acknowledge-mode: manual

消費者要配置ack重試機制,具體參考前幾篇文章,使用的是mysql消息ID的唯一性,有時候可能生成一樣的訂單,具體的沒有進行實驗,內容是json生成的,可以執(zhí)行業(yè)務

import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.des.Bean.MessageIdempotent;
import com.example.des.Bean.Shop;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class Receiver_Direct {
    private static final Integer delayTimes = 30;//延時消費時間,單位:秒

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = {"smsQueue"})
    public void receiveD(Message message, Channel channel) throws IOException {
        try {
            // 獲取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody());//獲取消息
            //向數(shù)據(jù)庫插入數(shù)據(jù)
            MessageIdempotent messageIdempotent = new MessageIdempotent();
            messageIdempotent.setMessageId(messageId);
            messageIdempotent.setMessageContent(msg);
            messageIdempotent.setRetryTimes(0);
            System.out.println(messageIdempotent.toString());
            Boolean save = true;   //設置保存成功,消息投遞失敗是在確認模式那里

            if (!save) {//說明屬于重重復請求
                //1、處理消息內容的業(yè)務,解析json數(shù)據(jù)
                //2、創(chuàng)建訂單,并保存
                Boolean flag = consumeOrder(new Shop());
                if (flag){
                    //投入延遲隊列,如果30分鐘訂單還沒有消費,就刪除訂單
                    rabbitTemplate.convertAndSend("delayedExchange","sectest",message,message1->{
                        //設置發(fā)送消息的延長時間 單位:ms,表示30分鐘
                        message1.getMessageProperties().setDelay(1000*60*30);
                        return message1;
                    });
                    //更新消息狀態(tài),消費成功,
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                }else {
                    //延遲投入死信,進行重試
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                }
            } else {
                //1、處理消息內容的業(yè)務,解析json數(shù)據(jù)
                //2、創(chuàng)建訂單,并保存
                //投入死信隊列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }
        }catch (Exception e){
            System.out.println("錯誤信息");
        }

    }

    private boolean consumeOrder(Shop shop) {
        return true;
    }

    @RabbitListener(queues = {" delay.queue.demo.delay.queue"})
    public void dead(String payload, Message message, Channel channel) throws IOException {
        System.out.println("死信隊列:"+payload);
        //刪除消息 將數(shù)據(jù)庫狀態(tài)更新為失敗,更新郵件或者消息通知,有時候可以人工消費
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,true);
    }

    @RabbitListener(queues = "delayedqueue")
    public void receivemsg(Message messages){
        //查詢有沒有被消費,也就是更新成功,有時候需要樂觀鎖
    }
}

至此mq的消息重復以及冪等的信息處理就很完美的解決了,當然本文以數(shù)據(jù)庫為例進行實現(xiàn),感興趣的可以嘗試使用redis來進行實現(xiàn)文章來源地址http://www.zghlxwxcb.cn/news/detail-694486.html

到了這里,關于rabbitmq+springboot實現(xiàn)冪等性操作的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot

    消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot

    1、延遲隊列概念 延時隊列內部是有序的 , 最重要的特性 就體現(xiàn)在它的 延時屬性 上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說, 延時隊列就是用來存放需要在指定時間被處理的元素的隊列。 延遲隊列使用場景: 訂單在十分鐘之內未支付則

    2024年02月22日
    瀏覽(19)
  • SpringBoot Redis 注解 攔截器來實現(xiàn)接口冪等性校驗

    冪等性,?通俗的說就是一個接口,?多次發(fā)起同一個請求,?必須保證操作只能執(zhí)行一次 比如:訂單接口,?不能多次創(chuàng)建訂單 支付接口,?重復支付同一筆訂單只能扣一次錢 支付寶回調接口,?可能會多次回調,?必須處理重復回調 普通表單提交接口,?因為網(wǎng)絡超時等原因多次點擊提

    2024年01月19日
    瀏覽(31)
  • SpringBoot RabbitMQ 實現(xiàn)消息隊列功能

    作者:禪與計算機程序設計藝術 在企業(yè)級應用中,為了提升系統(tǒng)性能、降低響應延遲、改善用戶體驗、增加系統(tǒng)的穩(wěn)定性、提高資源利用率等方面所需的功能之一就是使用消息隊列。RabbitMQ是一個開源的AMQP(Advanced Message Queuing Protocol)的實現(xiàn)消息隊列,它是用Erlang語言開發(fā)的。

    2024年02月09日
    瀏覽(22)
  • [AIGC] 用冪等性解決重復消息問題

    [AIGC] 用冪等性解決重復消息問題

    在構建分布式系統(tǒng)時,開發(fā)人員經(jīng)常會遇到重復消息問題。這可能是由于網(wǎng)絡延遲、系統(tǒng)故障或其他原因導致的。無論如何,重復消息會導致系統(tǒng)出現(xiàn)錯誤和不一致狀態(tài)。為了解決這個問題,我們可以使用冪等性來確保系統(tǒng)的可靠性和一致性。 在數(shù)學中,冪等性是指一個函數(shù)

    2024年02月19日
    瀏覽(16)
  • 【RabbitMQ教程】第八章 —— RabbitMQ - 冪等性、優(yōu)先級、惰性

    【RabbitMQ教程】第八章 —— RabbitMQ - 冪等性、優(yōu)先級、惰性

    ?????????????????????????????????????????????????????????????????? ?? 【 R a b b i t M Q 教程】第八章—— R a b b i t M Q ? 冪等性、優(yōu)先級、惰性 color{#FF1493}{【RabbitMQ教程】第八章 —— RabbitMQ - 冪等性、優(yōu)先級、惰性} 【 R abbi tMQ 教程】第八章

    2024年02月09日
    瀏覽(18)
  • SpringBoot中接口冪等性實現(xiàn)方案-自定義注解+Redis+攔截器實現(xiàn)防止訂單重復提交

    SpringBoot中接口冪等性實現(xiàn)方案-自定義注解+Redis+攔截器實現(xiàn)防止訂單重復提交

    SpringBoot+Redis+自定義注解實現(xiàn)接口防刷(限制不同接口單位時間內最大請求次數(shù)): SpringBoot+Redis+自定義注解實現(xiàn)接口防刷(限制不同接口單位時間內最大請求次數(shù))_redis防刷_霸道流氓氣質的博客-CSDN博客 以下接口冪等性的實現(xiàn)方式與上面博客類似,可參考。 什么是冪等性? 冪等

    2024年02月15日
    瀏覽(23)
  • SpringBoot自定義注解+AOP+redis實現(xiàn)防接口冪等性重復提交,從概念到實戰(zhàn)

    本文為千鋒教育技術團獨家創(chuàng)作,更多技術類知識干貨,點個關注持續(xù)追更~ 接口冪等性是Web開發(fā)中非常重要的一個概念,它可以保證多次調用同一個接口不會對結果產(chǎn)生影響。如果你想了解更多關于接口冪等性的知識,那么本文就是一個不錯的起點。 在Web開發(fā)中,我們經(jīng)常

    2024年02月03日
    瀏覽(26)
  • Spark操作Hive表冪等性探索

    旁邊的實習生一邊敲著鍵盤一邊很不開心的說:做數(shù)據(jù)開發(fā)真麻煩,數(shù)據(jù)bug排查太繁瑣了,我今天數(shù)據(jù)跑的有問題,等我處理完問題重新跑了代碼,發(fā)現(xiàn)報表的數(shù)據(jù)很多重復,準備全部刪了重新跑。 我:你的數(shù)據(jù)操作具備冪等性嗎? 實習生:啥是冪等性?數(shù)倉中的表還要考

    2024年02月13日
    瀏覽(20)
  • 如何保證用戶重試操作的冪等性

    如何保證用戶重試操作的冪等性

    服務不穩(wěn)定是一類常態(tài),面對此類場景恰當?shù)膽獙Σ呗詰撌鞘裁??退一步說,即使我們能夠確保第一方服務的穩(wěn)定性,我們又應該如何面對網(wǎng)絡延遲以及掌控以外的不確定性?這都是本篇文章會談到的內容 本文是團隊內部分享的文字版,敏感信息已經(jīng)抹去或者重寫。我們通

    2024年02月06日
    瀏覽(23)
  • Springboot 定時任務,分布式下冪等性如何解決

    Springboot 定時任務,分布式下冪等性如何解決

    在分布式環(huán)境下,定時任務的冪等性問題需要考慮多個節(jié)點之間的數(shù)據(jù)一致性和事務處理。 一種解決方法是使用分布式鎖來保證同一時間只有一個節(jié)點能夠執(zhí)行該任務。具體實現(xiàn)可以使用Redis或Zookeeper等分布式協(xié)調工具提供的分布式鎖功能。 另一種解決方法是使用消息隊列來

    2024年02月11日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包