国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq

這篇具有很好參考價值的文章主要介紹了實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

相信很多同學都開發(fā)過WEB服務,在WEB服務的開發(fā)中一般是通過緩存、隊列、讀寫分離、削峰填谷、限流降級等手段來提高服務性能和保證服務的正常投用。對于削峰填谷就不得不用到我們的MQ消息中間件,比如適用于大數(shù)據(jù)的kafka,性能較高支持事務活躍度高的rabbitmq等等,MQ的選用和整合已經是JAVA WEB開發(fā)中不可或缺對的一部分。當然,作為號稱JAVA微服務框架全家桶的Spring Cloud也提供了良好的適配中間件的功能。今天我們就來整合一下微服務全家桶Spring Cloud提供的消息驅動——Spring Cloud Stream。

Spring Cloud Stream簡析

Spring Cloud Stream是用于構建微服務具有消息驅動能力的框架,應用程序通過inputs、outputs通道與binder進行交互,binder與消息中間件進行通信。

binder的作用是將消息中間件進行粘合,相當于對第三方中間件進行封裝整合,讓開發(fā)人員不用關心底層消息中間件如何運行。
實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq

inputs是消息輸入通道,類似于消息中間件的consumer消費者;outputs是消息輸出通道,類似于消息中間件的producer生產者。應用程序收發(fā)消息不再直接調用消息中間件的接口或者邏輯代碼,直接使用Spring Cloud Stream 的OUTPUT與INPUT通道進行處理。

可以通過binder綁定選用各種消息中間件,用binding進行中間件的相關參數(shù)配置,讓應用程序達到靈活配置和切換消息中間件的目的。

Spring Cloud Stream與rabbitmq整合

本次整合直接與rabbitmq整合,如果是使用kafka的同學,可以直接移植配置修改對應粘接mq即可。

本次整合加入了消費重試機制、死信隊列,并提供死信隊列消費監(jiān)聽方法,可直接移植到生產環(huán)境。

1、添加pom依賴

引入spring-cloud-starter-stream-rabbit 需要從Spring Cloud中引入,注意dependencyManagement的配置。

<properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>Hoxton.SR10</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

2、application.yml增加mq配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: /
  cloud:
    stream:
      binders: #stream框架粘接的mq
        myRabbit: #自定義個人mq名稱
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: #stream綁定信道
        output_channel: #自定義發(fā)送信道名稱
          destination: assExchange #目的地 交換機/主題
          content-type: application/json
          binder: myRabbit #粘接到的mq
          group: assGroup
        input_channel: #自定義接收信道
          destination: assExchange #目的地 交換機/主題
          content-type: application/json
          binder: myRabbit #粘接到的mq
          group: assGroup
          consumer:
            maxAttempts: 3 # 嘗試消費該消息的最大次數(shù)(消息消費失敗后,發(fā)布者會重新投遞)。默認3
            backOffInitialInterval: 1000 # 重試消費消息的初始化間隔時間。默認1s,即第一次重試消費會在1s后進行
            backOffMultiplier: 2 # 相鄰兩次重試之間的間隔時間的倍數(shù)。默認2
            backOffMaxInterval: 10000 # 下一次嘗試重試的最大時間間隔,默認為10000ms,即10s
      rabbit: #stream mq配置
        bindings:
          input_channel:
            consumer:
              concurrency: 1 #消費者數(shù)量
              max-concurrency: 5 #最大消費者數(shù)量
              durable-subscription: true  #持久化隊列
              recovery-interval: 3000  #3s 重連
              acknowledge-mode: MANUAL  #手動
              requeue-rejected: false #是否重新放入隊列
              auto-bind-dlq: true #開啟死信隊列
              requeueRejected: true #異常放入死信
              

3、定義輸入輸出信道

/**
 * MqChannel
 * @author senfel
 * @version 1.0
 * @date 2023/6/2 15:46
 */
public interface MqChannel {
  
    /**
     * 消息目的地 RabbitMQ中為交換機名稱
     */
    String destination = "assExchange";
    /**
     * 輸出信道
     */
    String OUTPUT_CHANNEL = "output_channel";
    /**
     * 輸入信道
     */
    String INPUT_CHANNEL = "input_channel";
    /**
     * 死信隊列
     */
    String INPUT_CHANNEL_DLQ = "assExchange.assGroup.dlq";

    @Output(MqChannel.OUTPUT_CHANNEL)
    MessageChannel output();

    @Input(MqChannel.INPUT_CHANNEL)
    SubscribableChannel input();

}

4、使用輸入輸出信道收發(fā)消息

TestMQService

/**
 * TestMQService
 * @author senfel
 * @version 1.0
 * @date 2023/6/2 15:47
 */
public interface TestMQService {

    /**
     * 發(fā)送消息
     */
    void send(String str);
}

TestMQServiceImpl

/**
 * TestMQServiceImpl
 * @author senfel
 * @version 1.0
 * @date 2023/6/2 15:49
 */
@Service
@Slf4j
@EnableBinding(MqChannel.class)
public class TestMQServiceImpl implements TestMQService {

    @Resource
    private MqChannel mqChannel;

    @Override
    public void send(String str) {
        mqChannel.output().send(MessageBuilder.withPayload("測試=========="+str).build());
    }


    /**
     * 接收消息監(jiān)聽
     * @param message 消息體
     * @param channel 信道
     * @param tag 標簽
     * @param death
     * @author senfel
     * @date 2023/6/5 9:25
     * @return void
     */
    @StreamListener(MqChannel.INPUT_CHANNEL)
    public void process(String message,
                        @Header(AmqpHeaders.CHANNEL) Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        log.info("message : "+message);
        if(message.contains("9")){
            // 參數(shù)1為消息的tag  參數(shù)2為是否多條處理 參數(shù)3為是否重發(fā)
            //channel.basicNack(tag,false,false);
            System.err.println("--------------消費者消費異常--------------------------------------");
            System.err.println(message);
            throw new RuntimeException("拋出異常");
        }else{
            System.err.println("--------------消費者--------------------------------------");
            System.err.println(message);
            channel.basicAck(tag,false);
        }

    }



    /**
     * 死信監(jiān)聽
     * @param message 消息體
     * @param channel 信道
     * @param tag 標簽
     * @param death
     * @author senfel
     * @date 2023/6/5 14:30
     * @return void
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(MqChannel.INPUT_CHANNEL_DLQ)
                    , exchange = @Exchange(MqChannel.destination)
            ),
            concurrency = "1-5"
    )
    public void processByDlq(String message,
                             @Header(AmqpHeaders.CHANNEL) Channel channel,
                             @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

        log.info("message : "+message);
        System.err.println("---------------死信消費者------------------------------------");
        System.err.println(message);
    }
}

controller

/**
 * @author senfel
 * @version 1.0
 * @date 2023/6/2 17:27
 */
@RestController
public class TestController{
    @Resource
    private TestMQService testMQService;

    @GetMapping("/test")
    public String testMq(String str){
        testMQService.send(str);
        return str;
    }
}

5、模擬正常消息消費

實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq

6、模擬異常消息

異常消息重試滿足3次投遞后進入死信消費
實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq
實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq
實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq文章來源地址http://www.zghlxwxcb.cn/news/detail-475447.html

到了這里,關于實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 在Spring Cloud中使用RabbitMQ完成一個消息驅動的微服務

    Spring Cloud系列目前已經有了Spring Cloud五大核心組件:分別是,Eureka注冊中心,Zuul網關,Hystrix熔斷降級,openFeign聲明式遠程調用,ribbon負載均衡。這五個模塊,對了,有沒有發(fā)現(xiàn),其實我這五個模塊中ribbon好像還沒有案例例舉,目前只有一個Ribbon模塊的搭建,后邊我會完善的

    2024年02月04日
    瀏覽(87)
  • Sprint Cloud Stream整合RocketMq和websocket實現(xiàn)消息發(fā)布訂閱

    Sprint Cloud Stream整合RocketMq和websocket實現(xiàn)消息發(fā)布訂閱

    1. 引入RocketMQ依賴 :首先,在 pom.xml 文件中添加RocketMQ的依賴: 2. 配置RocketMQ連接信息 :在 application.properties 或 application.yml 中配置RocketMQ的連接信息,包括Name Server地址等: 3.消息發(fā)布組件 4.消息發(fā)布控制器 項目結構: 接下來是websocket模塊的搭建 1. 依賴添加 2.application.yml配

    2024年02月08日
    瀏覽(15)
  • Spring cloud stream 結合 rabbitMq使用

    之前的開發(fā)主要是底層開發(fā),沒有深入涉及到消息方面?,F(xiàn)在面對的是一個這樣的場景: 假設公司項目A用了RabbitMQ,而項目B用了Kafka。這時候就會出現(xiàn)有兩個消息框架,這兩個消息框架可能編碼有所不同,且結構也有所不同,而且之前甚至可能使用的是別的框架,造成了一個

    2024年02月04日
    瀏覽(24)
  • spring Cloud Stream 實戰(zhàn)應用深度講解

    spring Cloud Stream 實戰(zhàn)應用深度講解

    Spring Cloud Stream 是一個框架,用于構建與共享消息傳遞系統(tǒng)連接的高度可擴展的事件驅動微服務。 該框架提供了一個靈活的編程模型,該模型建立在已經建立和熟悉的 Spring 習慣用語和最佳實踐之上,包括對持久發(fā)布/訂閱語義、消費者組和有狀態(tài)分區(qū)的支持。 核心模塊 Dest

    2024年01月24日
    瀏覽(31)
  • Spring Cloud 項目中實現(xiàn)推送消息到 RabbitMQ 消息中間件

    Spring Cloud 項目中實現(xiàn)推送消息到 RabbitMQ 消息中間件

    (注:安裝在虛擬機則填虛擬機地址,否則則為本機地址) 用戶名和密碼都為guest 看到如下頁面則為RabbitMQ安裝登錄成功。 三、依賴注入 導入依賴坐標 四、配置yaml文件 配置yaml配置文件 (注:host為地址,如果安裝在虛擬機則為虛擬機地址,安裝在本機則本機地址。port為端

    2024年04月13日
    瀏覽(25)
  • spring cloud steam 整合kafka 進行消息發(fā)送與接收

    spring cloud steam : Binder和Binding Binder是SpringCloud Stream的一個抽象概念,是應用與消息中間件之間的粘合劑,目前SpringCloud Stream實現(xiàn)了Kafka和RabbitMQ的binder Binder可以生成Binding,Binding用來綁定消息容器的生產者和消費者,它有兩種類型,INPUT和OUTPUT,INPUT對應于消費者,OUTPUT對應于

    2024年02月10日
    瀏覽(23)
  • Spring Boot 整合 RabbitMQ 實現(xiàn)延遲消息

    Spring Boot 整合 RabbitMQ 實現(xiàn)延遲消息

    消息隊列(Message Queuing,簡寫為 MQ)最初是為了解決金融行業(yè)的特定業(yè)務需求而產生的。慢慢的,MQ 被應用到了更多的領域,然而商業(yè) MQ 高昂的價格讓很多初創(chuàng)公司望而卻步,于是 AMQP(Advanced Message Queuing Protocol,高級消息隊列協(xié)議)應運而生。 隨著 AMQP 草案的發(fā)布,兩個月

    2024年04月08日
    瀏覽(29)
  • 消息隊列——spring和springboot整合rabbitmq

    消息隊列——spring和springboot整合rabbitmq

    目錄 spring整合rabbitmq——生產者 rabbitmq配置文件信息 倒入生產者工程的相關代碼 簡單工作模式 spring整合rabbitmq——消費者 spring整合rabbitmq——配置詳解 SpringBoot整合RabbitMQ——生產者 ?SpringBoot整合RabbitMQ——消費者 ? 使用原生amqp來寫應該已經沒有這樣的公司了 創(chuàng)建兩個工程

    2024年02月16日
    瀏覽(28)
  • Spring整合RabbitMQ-配制文件方式-3-消息拉模式

    拉消息的消費者 spring-rabbit.xml 當啟動消費者后,便可獲取到發(fā)送至隊列的消息 檢查隊列的消息的情況: 經過檢查確認,發(fā)現(xiàn)消息已經被消費了。 至此拉模式的消費者完成。

    2024年02月09日
    瀏覽(19)
  • Spring整合RabbitMQ-配制文件方式-1-消息生產者

    Spring-amqp是對AMQP的一些概念的一些抽象,Spring-rabbit是對RabbitMQ操作的封裝實現(xiàn)。 主要有幾個核心類 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等 RabbitAdmin 類完成對Exchange、Queue、Binding的操作,在容器中管理 了 RabbitAdmin 類的時候,可以對Exchange、Queue、Binding進行自動聲

    2024年02月09日
    瀏覽(34)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包