国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

消息隊列中間件,RabbitMQ的使用,死信隊列,延遲隊列,利用枚舉實現(xiàn)隊列,交換機,RountKey的聲明

這篇具有很好參考價值的文章主要介紹了消息隊列中間件,RabbitMQ的使用,死信隊列,延遲隊列,利用枚舉實現(xiàn)隊列,交換機,RountKey的聲明。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

0.交換機種類和區(qū)別

1.聲明隊列和交換機以及RountKey

2.初始化循環(huán)綁定

3.聲明交換機

4.監(jiān)聽隊列

4.1 監(jiān)聽普通隊列

4.2監(jiān)聽死信隊列

?5.削峰填谷的實現(xiàn)


0.交換機種類和區(qū)別

  1. Direct Exchange(直連交換機)

    • 直連交換機將消息發(fā)送到與消息的路由鍵完全匹配的隊列。它是最簡單的交換機類型之一。
    • 當(dāng)一個隊列使用某個直連交換機綁定時,它需要指定一個綁定鍵(binding key),當(dāng)消息的路由鍵與該綁定鍵完全匹配時,消息會被發(fā)送到該隊列。
  2. Fanout Exchange(扇出交換機)

    • 扇出交換機會將消息發(fā)送到與其綁定的所有隊列,忽略消息的路由鍵。
    • 當(dāng)一個隊列使用扇出交換機綁定時,它會接收到交換機發(fā)送的所有消息,無論消息的路由鍵是什么。
  3. Topic Exchange(主題交換機)

    • 主題交換機根據(jù)消息的路由鍵和綁定鍵之間的模式匹配來路由消息。
    • 綁定鍵可以使用通配符進行匹配,支持 '*' 匹配一個單詞,'#' 匹配零個或多個單詞,從而允許更靈活的路由規(guī)則。
  4. Headers Exchange(標(biāo)頭交換機)

    • 標(biāo)頭交換機根據(jù)消息的標(biāo)頭(headers)中的鍵值對來路由消息,而不是使用路由鍵。
    • 在將隊列綁定到標(biāo)頭交換機時,可以指定一組標(biāo)頭鍵值對,只有當(dāng)消息的標(biāo)頭中包含與綁定相匹配的所有鍵值對時,消息才會被路由到該隊列。

如果滿足key的前提下,綁定同一個交換機的隊列都會分配到相同數(shù)量的信息

比如此時交換機有20條信息,a,b隊列都會分配到20條信息

默認(rèn)情況下,會輪詢分配給消費者,也可以設(shè)置最多獲取多少條未被消費的信息,根據(jù)消費者的消費能力來設(shè)置

1.聲明隊列和交換機以及RountKey

package com.example.config;


import lombok.Getter;

@Getter
public enum RabbitmqBind {


    DATA_CLEAN_PROCESS_DEAD(
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS_DEAD,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD,
            false,
            false,
            null,
            null
    ),

    DATA_CLEAN_PROCESS(
            RabbitMqExchangeEnum.E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,
            true,
            true,
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD),

    SMS_CLEAN_DEAD(
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD,
            RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD,
            true,
            false,
            null,
            null
    ),

    SMS_CLEAN(
            RabbitMqExchangeEnum.E_TOPIC_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS,
            RabbitmqRoutingKey.K_API_TO_DCN_SMS,
            true,
            true,
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD
    ),


    ;

    /**
     * 交換機
     */
    private RabbitMqExchangeEnum exchange;

    /**
     * 隊列名稱
     */
    private String queueName;

    /**
     * 路由Key
     */
    private RabbitmqRoutingKey routingKey;

    /**
     * 綁定標(biāo)識
     * 是否啟用
     */
    private Boolean isBind;

    /**
     * 是否綁定死信
     */
    private Boolean isDeathBelief;

    /**
     * 綁定的死信交換機
     */
    private RabbitMqExchangeEnum boundDeadExchange;

    /**
     * 死信key
     */
    private RabbitmqRoutingKey deadRoutingKey;


    RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind,
                 Boolean isDeathBelief, RabbitMqExchangeEnum boundDeadExchange, RabbitmqRoutingKey deadRoutingKey
    ) {
        this.exchange = exchange;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.isBind = isBind;
        this.isDeathBelief = isDeathBelief;
        this.boundDeadExchange = boundDeadExchange;
        this.deadRoutingKey = deadRoutingKey;
    }

    /**
     * 交換機
     */
    @Getter
    public enum RabbitMqExchangeEnum {

        /**
         * 交換機定義,類型 - 名稱
         */
        E_DIRECT_RCP("direct", "E_DIRECT_RCP"),
        DEAD_E_DIRECT_RCP("direct", "DEAD_E_DIRECT_RCP"),

        E_TOPIC_RCP("topic", "E_TOPIC_RCP"),

        E_TOPIC_PAY("topic", "E_TOPIC_PAY");

        private String exchangeType;

        private String exchangeName;

        RabbitMqExchangeEnum(String exchangeType, String exchangeName) {
            this.exchangeType = exchangeType;
            this.exchangeName = exchangeName;
        }
    }

    /**
     * 隊列名定義
     */
    public interface RabbitMqQueueConstants {

        /**
         * 接收清洗數(shù)據(jù)
         */
        String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";

        /**
         * 清洗結(jié)束通知
         */
        String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";

        /**
         * 死信隊列
         */
        String Q_DATA_CLEAN_PROCESS_DEAD = "Q_DATA_CLEAN_PROCESS_DEAD";

        /**
         * 清洗結(jié)束通知死信隊列
         */
        String Q_API_TO_DCN_SMS_DEAD = "Q_API_TO_DCN_SMS_DEAD";
    }

    /**
     * routingKey
     */
    @Getter
    public enum RabbitmqRoutingKey {

        /**
         * 路由
         */
        K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),
        K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),

        // 路由綁定死信路由
        DEAD("DEAD"),

        //死信路由
        K_DATA_CLEAN_PROCESS_DEAD("K_DATA_CLEAN_PROCESS_DEAD"),
        K_DATA_CLEAN_FINISH_DEAD("K_DATA_CLEAN_FINISH_DEAD"),
        ;

        private String keyName;

        RabbitmqRoutingKey(String keyName) {
            this.keyName = keyName;
        }
    }

}

2.初始化循環(huán)綁定

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;

@Configuration
@ConditionalOnClass(EnableRabbit.class)
public class MqConfig {
    @Resource
    protected RabbitTemplate rabbitTemplate;
    @Resource
    ConnectionFactory connectionFactory;
//
//    @Lazy
//    @Autowired
//    protected RabbitAdmin rabbitAdmin;
//
//
//    public static final int DEFAULT_CONCURRENT = 10;
//
//    @Bean("customContainerFactory")
//    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
//                                                                 ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
//        factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
//        configurer.configure(factory, connectionFactory);
//        return factory;
//    }
//
//    @Bean
//    @ConditionalOnMissingBean
//    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
//        return new RabbitTransactionManager(connectionFactory);
//    }
//
//    @Bean
//    @ConditionalOnMissingBean
//    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
//        return new RabbitAdmin(connectionFactory);
//    }

    @PostConstruct
    protected void init() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);


        rabbitTemplate.setChannelTransacted(true);
        //創(chuàng)建exchange
        Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values())
                .forEach(rabbitMqExchangeEnum -> {
                            Exchange exchange = RabbitmqExchange
                                    .getInstanceByType(rabbitMqExchangeEnum.getExchangeType())
                                    .createExchange(rabbitMqExchangeEnum.getExchangeName());
                            rabbitAdmin.declareExchange(exchange);
                        }
                );

        //創(chuàng)建隊列并綁定exchange
        Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> {
            if (RabbitmqBind.getIsBind()) {
                if (RabbitmqBind.getIsDeathBelief()) {
                    //需要綁定死信交換機的隊列
                    rabbitAdmin.declareQueue(QueueBuilder.durable(RabbitmqBind.getQueueName())
                            .ttl(60000).deadLetterExchange(RabbitmqBind.getBoundDeadExchange().getExchangeName())
                            .deadLetterRoutingKey(RabbitmqBind.getDeadRoutingKey().getKeyName()).build());
                    rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                            Binding.DestinationType.QUEUE,
                            RabbitmqBind.getExchange().getExchangeName(),
                            RabbitmqBind.getRoutingKey().getKeyName(), null));
                } else {
                    //不需要綁定死信交換機的隊列
                    rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(),
                            true, false, false, null));
                    rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                            Binding.DestinationType.QUEUE,
                            RabbitmqBind.getExchange().getExchangeName(),
                            RabbitmqBind.getRoutingKey().getKeyName(), null));
                }
            }
        });
    }

}

?綁定的形式由枚舉類中定義

3.聲明交換機

package com.example.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.TopicExchange;

import java.util.Arrays;


@Getter
@Slf4j
public enum RabbitmqExchange {

    DIRECT("direct"){
        @Override
        public Exchange createExchange(String exchangeName) {
            return new DirectExchange(exchangeName, true, false);
        }
    },

    TOPIC("topic"){
        @Override
        public Exchange createExchange(String exchangeName) {
            return new TopicExchange(exchangeName, true, false);
        }
    };

    public static RabbitmqExchange getInstanceByType(String type){

        return Arrays.stream(RabbitmqExchange.values()).filter(e -> e.getType().equals(type))
                .findAny()
                .orElseThrow(() ->
//                        new ProcessException("無效的exchange type")

                        new RuntimeException("無效的exchange type")
                );
    }

    private String type;


    RabbitmqExchange(String type) {
        this.type = type;
    }

    public abstract Exchange createExchange(String exchangeName);

}

4.監(jiān)聽隊列

4.1 監(jiān)聽普通隊列

package com.example.listener;

import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RabbitListener(queues = {
        RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5")
//, containerFactory = "customContainerFactory"
public class MqListener {



    @RabbitHandler
    public void processMessage(String message) {
        log.info("DataClean recive message :{} ", message);
        process(message);
    }

    @RabbitHandler
    public void processMessage(byte[] message) {
        String msg = new String(message);
        log.info("DataClean recive message :{} ", msg);
        process(msg);
    }

    /**
     * 處理推送消息
     * @param message
     */
    private void process(String message) {
        log.info("process message :{}" , message);
        if(StringUtils.isBlank(message)) {
            log.error("process message is blank , message:{}" , message);
            return;
        }
    }

}

?監(jiān)聽并處理任務(wù)

4.2監(jiān)聽死信隊列

package com.example.listener;

import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(queues = {
        RabbitmqBind.RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD }, concurrency = "1-5")
public class DeadListener {

    @RabbitHandler
    public void processMessage(String message) {
        log.info("DataClean recive message :{} ", message);
        process(message);
    }

    @RabbitHandler
    public void processMessage(byte[] message) {
        String msg = new String(message);
        log.info("DataClean recive message :{} ", msg);
        process(msg);
    }


    /**
     * 處理推送消息
     * @param message
     */
    private void process(String message) {
        log.info("Dead process message :{}" , message);
        if(StringUtils.isBlank(message)) {
            log.error("Dead process message is blank , message:{}" , message);
            return;
        }
    }

}

?5.削峰填谷的實現(xiàn)

把高峰期的消息填進低峰期

消息隊列中間件,RabbitMQ的使用,死信隊列,延遲隊列,利用枚舉實現(xiàn)隊列,交換機,RountKey的聲明,中間件,消息隊列,開發(fā)小技巧,中間件,rabbitmq,分布式

可以用拉取的方式來實現(xiàn)

或者用消費者的最大數(shù)量和最小數(shù)量來實現(xiàn)文章來源地址http://www.zghlxwxcb.cn/news/detail-856444.html

channel.basicQos();//設(shè)置最大獲取未確認(rèn)消息的數(shù)量,實現(xiàn)權(quán)重

到了這里,關(guān)于消息隊列中間件,RabbitMQ的使用,死信隊列,延遲隊列,利用枚舉實現(xiàn)隊列,交換機,RountKey的聲明的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 深入詳解高性能消息隊列中間件 RabbitMQ

    深入詳解高性能消息隊列中間件 RabbitMQ

    ? 目錄 1、引言 2、什么是 RabbitMQ ? 3、RabbitMQ 優(yōu)勢 4、RabbitMQ 整體架構(gòu)剖析 4.1、發(fā)送消息流程 4.2、消費消息流程 5、RabbitMQ 應(yīng)用 5.1、廣播 5.2、RPC VC++常用功能開發(fā)匯總(專欄文章列表,歡迎訂閱,持續(xù)更新...) https://blog.csdn.net/chenlycly/article/details/124272585 C++軟件異常排查從入

    2024年02月05日
    瀏覽(97)
  • 消息隊列中間件 - Docker安裝RabbitMQ、AMQP協(xié)議、和主要角色

    消息隊列中間件 - Docker安裝RabbitMQ、AMQP協(xié)議、和主要角色

    不管是微服務(wù)還是分布式的系統(tǒng)架構(gòu)中,消息隊列中間件都是不可缺少的一個重要環(huán)節(jié),主流的消息隊列中間件有RabbitMQ、RocketMQ等等,從這篇開始詳細(xì)介紹以RabbitMQ為代表的消息隊列中間件。 AMQP協(xié)議 AMQP協(xié)議是一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)協(xié)議,基于此協(xié)議的客戶端與

    2024年02月03日
    瀏覽(92)
  • 基于golang多消息隊列中間件的封裝nsq,rabbitmq,kafka

    場景 在創(chuàng)建個人的公共方法庫中有這樣一個需求,就是不同的項目會用到不同的消息隊列中間件,我的思路把所有的消息隊列中間件進行封裝一個消息隊列接口(MQer)有兩個方法一個生產(chǎn)一個消費,那么在實例化對象的時候根據(jù)配置文件指定當(dāng)前項目使用的那個消息隊列中

    2024年02月14日
    瀏覽(93)
  • 利用消息中間件RabbitMQ創(chuàng)建隊列以及扇出(Fanout)、訂閱(Direct)、主題(Topic)交換機來完成消息的發(fā)送和監(jiān)聽接收(完整版)

    利用消息中間件RabbitMQ創(chuàng)建隊列以及扇出(Fanout)、訂閱(Direct)、主題(Topic)交換機來完成消息的發(fā)送和監(jiān)聽接收(完整版)

    目錄 一、前期項目環(huán)境準(zhǔn)備 1.1父項目以及子項目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout)?交換機實現(xiàn)消息的發(fā)送和接收 2.1編寫子項目consumer(消費者,接收消息)的代碼實現(xiàn)扇出(Fanout)交換機接收消息 2.1.1consumer子項目結(jié)構(gòu) 2.1.2FanoutConfig類的實現(xiàn)扇出(Fanout)交

    2024年02月05日
    瀏覽(95)
  • 消息隊列中間件介紹

    消息隊列中間件介紹

    消息隊列介紹 ? 消息隊列中間件是大型系統(tǒng)中的重要組件,已經(jīng)逐漸成為企業(yè)系統(tǒng)內(nèi)部通信的核心手段。它具有松耦合、異步消息、流量削峰、可靠投遞、廣播、流量控制、最終一致性等一系列功能,已經(jīng)成為異步RPC的主要手段之一。 目前常見的消息中間件有ActiveMQ、Rabbi

    2024年02月04日
    瀏覽(98)
  • 消息隊列(中間件)

    消息隊列(中間件)

    通信協(xié)議: 為了實現(xiàn)客戶端和服務(wù)器之間的通信來完成的邏輯,基于TCP實現(xiàn)的自定義應(yīng)用層協(xié)議。通過這個協(xié)議,完成客戶端–服務(wù)器遠(yuǎn)程方法調(diào)用。 序列化/反序列化: 通過網(wǎng)絡(luò)傳輸對象把對象存儲到硬盤上。 序列化:把對象轉(zhuǎn)化為二進制的數(shù)據(jù)序列,反序列化:把二進制數(shù)

    2024年02月07日
    瀏覽(103)
  • 消息隊列中間件(一)

    消息隊列中間件(一)

    流量削峰 應(yīng)用解耦 異步處理 ActiveMQ 優(yōu):單機吞吐萬級,時效性ms級,可用性高(主從架構(gòu)),可靠性高(丟失率低) 缺:官方維護少,高吞吐場景較少使用 Kafka 大數(shù)據(jù) - 數(shù)據(jù)采集,傳輸,存儲 優(yōu):高吞吐量(百萬級),時效性ms級,可用性高,日志成熟 缺:短輪詢,失敗

    2024年02月11日
    瀏覽(86)
  • 消息中間件RabbitMQ

    消息中間件RabbitMQ

    1.1.1. 什么是MQ MQ(message queue) ,從字面意思上看,本質(zhì)是個隊列,F(xiàn)IFO 先入先出,只不過隊列中存放的內(nèi)容是message 而已,還是一種跨進程的通信機制,用于上下游傳遞消息。在互聯(lián)網(wǎng)架構(gòu)中,MQ 是一種非常常見的上下游“邏輯解耦+物理解耦”的消息通信服務(wù)。使用了 MQ 之后,

    2024年01月17日
    瀏覽(104)
  • RabbitMQ消息中間件

    RabbitMQ消息中間件 RabbitMQ簡介 windows下安裝RabbitMQ RabbitMQ基本概念 RabbitMQ簡單模式 RabbitMQ工作隊列模式 RabbitMQ發(fā)布訂閱模式 RabbitMQ路由模式 RabbitMQ主題模式 RabbitMQ RPC模式 RabbitMQ發(fā)布確認(rèn)模式

    2024年02月10日
    瀏覽(104)
  • 消息中間件之RabbitMQ

    消息中間件之RabbitMQ

    1.基于AMQP協(xié)議Erlang語言開發(fā)的一款消息中間件,客戶端語言支持比較多, 比如Python,Java,Ruby,PHP,JS,Swift.運維簡單,靈活路由,但是性能不高, 可以滿足一般場景下的業(yè)務(wù)需要,三高場景下吞吐量不高,消息持久化沒有采取 零拷貝技術(shù),消息堆積時,性能會下降 2.消息吞吐量在

    2024年01月19日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包