国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用StreamBridge實(shí)現(xiàn)RabbitMq 消息收發(fā) && ack確認(rèn) && 延時(shí)消息

這篇具有很好參考價(jià)值的文章主要介紹了使用StreamBridge實(shí)現(xiàn)RabbitMq 消息收發(fā) && ack確認(rèn) && 延時(shí)消息。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

使用StreamBridge實(shí)現(xiàn)RabbitMq && 延時(shí)消息

Maven依賴

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

延時(shí)消息需要安裝插件

下載地址:link
1.下載完成放到rabbitmq安裝目錄plugins下
2.執(zhí)行命令啟用插件
3.重啟mq

rabbitmq-plugins enable rabbitmq_delayed_message_exchange  // 啟用插件
//重啟mq
rabbitmq-server stop
rabbitmq-server start

Exchanges -> add a new exchange -> type 出現(xiàn)x-delayed-message即安裝成功

使用StreamBridge實(shí)現(xiàn)RabbitMq 消息收發(fā) && ack確認(rèn) && 延時(shí)消息文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-512066.html

yml配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: xxxx
    password: xxxx

  function:
      # 與消費(fèi)者對(duì)應(yīng)(消費(fèi)者方法名稱)
      definition: ackMessage;normal;delay
    stream:
      rabbit:
        bindings:
          ackMessage-in-0:
            consumer:
              acknowledge-mode: manual  # manual手動(dòng)確認(rèn) ,auto 自動(dòng)確認(rèn)
          delay-in-0:
            consumer:
              delayedExchange: true # 開(kāi)啟延時(shí)
          delay-out-0:
            producer:
              delayedExchange: true  # 開(kāi)啟延時(shí)

      bindings:
        delay-in-0:
          destination: delay.exchange.cloud  # mq對(duì)應(yīng)交換機(jī)
          content-type: application/json
          consumer:
            acknowledge-mode: auto # manual手動(dòng)確認(rèn) ,auto 自動(dòng)確認(rèn)
          group: delay-group	# 消息組
          binder: rabbit
        delay-out-0:
          destination: delay.exchange.cloud
          content-type: application/json
          group: delay-group
          binder: rabbit

        ackMessage-in-0:
          destination: ackMessage.exchange.cloud
          content-type: application/json
          consumer:
            acknowledge-mode: manual # manual手動(dòng)確認(rèn) ,auto 自動(dòng)確認(rèn)
          group: ackMessage-group
          binder: rabbit
        ackMessage-out-0:
          destination: ackMessage.exchange.cloud
          content-type: application/json
          group: ackMessage-group
          binder: rabbit
        normal-in-0:
          destination: normal.exchange.cloud
          content-type: application/json
          consumer:
            acknowledge-mode: auto # manual手動(dòng)確認(rèn) ,auto 自動(dòng)確認(rèn)
          group: normal-group
          binder: rabbit
        normal-out-0:
          destination: normal.exchange.cloud
          content-type: application/json
          group: normal-group
          binder: rabbit

接口controller


import com.alibaba.fastjson2.JSON;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;

/**
 * @Description: RabbitmqController
 */
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("/mq")
public class MqController {

	//消息發(fā)送者
    private final RabbitMqProducer rabbitMqProducer;

    /**
     * 發(fā)送普通消息Rabbitmq
     * bindingName 綁定隊(duì)列名稱
     * @param msg 消息內(nèi)容
     */
    @GetMapping("/sendMessage/{msg}/{bindingName}")
    public R<Void> sendMessage(@PathVariable("msg") String msg, @PathVariable("bindingName") String bindingName) {
        log.info(bindingName + "發(fā)送消息: " + msg);
        rabbitMqProducer.sendMsg(msg, bindingName);
        return R.ok();
    }

    /**
     * 發(fā)送延遲消息
     *
     * @param message  消息實(shí)體
     * @return
     */
    @PostMapping("/sendDelayedMessage")
    public R<Void> sendDelayedMessage(@RequestBody Message message) {
        log.info(MqTExchangesEnum.delay + "發(fā)送延時(shí)消息: " + LocalDateTime.now() + "  " + message);
        rabbitMqProducer.sendDelayMsg(JSON.toJSONString(message), message.getBindingName(), message.getSeconds());// 延遲時(shí)間(秒)
        return R.ok();
    }
}

發(fā)送者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * RabbitMq消息生產(chǎn)者
 */
@Component
public class RabbitMqProducer {

    @Autowired
    private StreamBridge streamBridge;

    /**
     * @Description RabbitMq消息生產(chǎn)者
     * @Param msg 消息內(nèi)容
     * @Param bindingName  exchange綁定queue名稱
     **/
    public void sendMsg(String msg, String bindingName) {
        // 構(gòu)建消息對(duì)象
        Messaging messaging = new Messaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);
        Message<Messaging> message = MessageBuilder.withPayload(messaging).build();
        streamBridge.send(bindingName, message);
    }

    /**
     * 發(fā)送延遲消息
     *
     * @param msg
     * @param bindingName
     * @param seconds
     */
    public void sendDelayMsg(String msg, String bindingName, Integer seconds) {
        // 構(gòu)建消息對(duì)象
        Messaging messaging = new Messaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);
        Message<Messaging> message = MessageBuilder.withPayload(messaging).setHeader("x-delay", seconds * 1000).build();
        streamBridge.send(bindingName, message);
    }
}

消費(fèi)者

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;

import java.time.LocalDateTime;
import java.util.function.Consumer;


/**
 * RabbitMq消息消費(fèi)者
 */
@Component
@Slf4j
public class RabbitMqConsumer {

    /**
     * mq接收ackMessage消息/手動(dòng)ack確認(rèn)
     * @methodName 配置文件對(duì)應(yīng)
     **/
    @Bean
    Consumer<Message<Messaging>> ackMessage() {
        log.info("ackMessage-初始化訂閱");
        return obj -> {
            Channel channel = obj.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
            Long deliveryTag = obj.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
            try {
                log.info("ackMessage-消息接收成功:" + obj.getPayload());
                //業(yè)務(wù)邏輯處理
                //ack確認(rèn)
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                //重新回隊(duì)列-true則重新入隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列。
//                    channel.basicReject(deliveryTag, true);
                log.error(e.getMessage());
            }

        };
    }

    /**
     * mq接收normal消息
     **/
    @Bean
    Consumer<Messaging> normal() {
        log.info("normal-初始化訂閱");
        return obj -> {
            log.info("normal-消息接收成功:" + obj);
            //業(yè)務(wù)邏輯處理
        };
    }


    /**
     * mq接收延時(shí)消息
     * Messaging 發(fā)送實(shí)體消息接收實(shí)體消息
     **/
    @Bean
    Consumer<Message<Messaging>> delay() {
        log.info("delay-初始化訂閱");
        return obj -> {
            Messaging payload = obj.getPayload();
            log.info("delay-消息接收成功:" + LocalDateTime.now() + "  " + payload);
            //業(yè)務(wù)邏輯處理
        };
    }
}

到了這里,關(guān)于使用StreamBridge實(shí)現(xiàn)RabbitMq 消息收發(fā) && ack確認(rèn) && 延時(shí)消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • SpringBoot集成RabbitMQ之ACK確認(rèn)機(jī)制(第三節(jié))

    SpringBoot集成RabbitMQ之ACK確認(rèn)機(jī)制(第三節(jié))

    目錄 開(kāi)始語(yǔ) ??簡(jiǎn)述 ???模式NONE application配置 生產(chǎn)者 消費(fèi)者 結(jié)果驗(yàn)證 ???模式AUTO application配置 生產(chǎn)者 消費(fèi)者 結(jié)果驗(yàn)證 ???模式ACK(重點(diǎn)) application配置 生產(chǎn)者 消費(fèi)者 結(jié)果驗(yàn)證 ???生產(chǎn)者確認(rèn)機(jī)制 yml添加配置 修改生產(chǎn)者代碼 結(jié)果驗(yàn)證 結(jié)束語(yǔ) 一位普通的程序員,

    2024年02月04日
    瀏覽(21)
  • 搭建RabbitMQ消息服務(wù),整合SpringBoot實(shí)現(xiàn)收發(fā)消息

    搭建RabbitMQ消息服務(wù),整合SpringBoot實(shí)現(xiàn)收發(fā)消息

    作者主頁(yè) :Designer 小鄭 作者簡(jiǎn)介 :3年JAVA全棧開(kāi)發(fā)經(jīng)驗(yàn),專(zhuān)注JAVA技術(shù)、系統(tǒng)定制、遠(yuǎn)程指導(dǎo),致力于企業(yè)數(shù)字化轉(zhuǎn)型,CSDN博客專(zhuān)家,藍(lán)橋云課認(rèn)證講師。 消息隊(duì)列是一種在應(yīng)用程序之間傳遞數(shù)據(jù)的通信機(jī)制 ,它基于 發(fā)布-訂閱 模式,將消息發(fā)送者(發(fā)布者)和消息接收者

    2024年02月09日
    瀏覽(19)
  • 基于springboot實(shí)現(xiàn)的rabbitmq消息確認(rèn)

    基于springboot實(shí)現(xiàn)的rabbitmq消息確認(rèn)

    RabbitMQ的消息確認(rèn)有兩種。 一種是消息發(fā)送確認(rèn)。這種是用來(lái)確認(rèn)生產(chǎn)者將消息發(fā)送給交換器,交換器傳遞給隊(duì)列的過(guò)程中,消息是否成功投遞。發(fā)送確認(rèn)分為兩步,一是確認(rèn)是否到達(dá)交換器,二是確認(rèn)是否到達(dá)隊(duì)列。 第二種是消費(fèi)接收確認(rèn)。這種是確認(rèn)消費(fèi)者是否成功消費(fèi)

    2024年02月06日
    瀏覽(19)
  • SpringCloudStream集成RabbitMQ實(shí)現(xiàn)消息收發(fā)

    SpringCloudStream集成RabbitMQ實(shí)現(xiàn)消息收發(fā)

    ? SpringCloudStream 是一個(gè)構(gòu)建高擴(kuò)展和事件驅(qū)動(dòng)的微服務(wù)系統(tǒng)的框架,用于連接共有消息系統(tǒng),官網(wǎng)地址:?spring.io/projects/sp…?。整體上是把各種花里胡哨的MQ產(chǎn)品抽象成了一套非常簡(jiǎn)單的統(tǒng)一的編程框架,以實(shí)現(xiàn)事件驅(qū)動(dòng)的編程模型。社區(qū)官方實(shí)現(xiàn)了RabbitMQ,Apache Kafka,Kaf

    2024年02月03日
    瀏覽(18)
  • SpringBoot-RabbitMQ06-持久化和ACK確認(rèn)機(jī)制

    SpringBoot-RabbitMQ06-持久化和ACK確認(rèn)機(jī)制

    1.什么是消息確認(rèn)ACK? 如果在處理消息的過(guò)程中,消費(fèi)者的服務(wù)器在處理消息時(shí)出現(xiàn)異常,那么可能這條正在處理的消息劉沒(méi)有完成消息消費(fèi),數(shù)據(jù)就會(huì)丟失,為了確保數(shù)據(jù)不會(huì)丟失RabbitMQ支持消息確認(rèn)-ACK 2.ACK的消息確認(rèn)機(jī)制 ACK機(jī)制是消費(fèi)者從RabbitMQ收到消息并處理完成后,反

    2024年04月15日
    瀏覽(20)
  • 「RabbitMQ」實(shí)現(xiàn)消息確認(rèn)機(jī)制以確保消息的可靠發(fā)送、接收和拒收

    「RabbitMQ」實(shí)現(xiàn)消息確認(rèn)機(jī)制以確保消息的可靠發(fā)送、接收和拒收

    目錄 介紹 方案 配置手動(dòng)確認(rèn) 使用 「Bean?」 配置RabbitMQ的屬性 確定消費(fèi)、拒絕消費(fèi)、拒絕消費(fèi)進(jìn)入死信隊(duì)列 模擬生產(chǎn)者發(fā)送消息① ????????RabbitMQ 的消息確認(rèn)機(jī)制應(yīng)用場(chǎng)景非常廣泛,尤其是在需要確保消息可靠性和避免消息丟失的場(chǎng)合下更為重要,例如:金融系統(tǒng)、電

    2024年02月08日
    瀏覽(51)
  • 【MQ 系列】SpringBoot + RabbitMq 消息確認(rèn)/事務(wù)機(jī)制的使用姿勢(shì)

    【MQ 系列】SpringBoot + RabbitMq 消息確認(rèn)/事務(wù)機(jī)制的使用姿勢(shì)

    我們知道 RabbitMq 提供了兩種機(jī)制,來(lái)確保發(fā)送端的消息被 brocke 正確接收,本文將主要介紹,在消息確認(rèn)和事物兩種機(jī)制的場(chǎng)景下,發(fā)送消息的使用姿勢(shì) 首先創(chuàng)建一個(gè) SpringBoot 項(xiàng)目,用于后續(xù)的演示 springboot 版本為 2.2.1.RELEASE rabbitmq 版本為? 3.7.5 ? 依賴配置文件 pom.xml 在 a

    2024年01月18日
    瀏覽(24)
  • (七)「消息隊(duì)列」之 RabbitMQ 發(fā)布者確認(rèn)(使用 .NET 客戶端)

    (七)「消息隊(duì)列」之 RabbitMQ 發(fā)布者確認(rèn)(使用 .NET 客戶端)

    發(fā)布者確認(rèn) 是一個(gè) RabbitMQ 擴(kuò)展,用于實(shí)現(xiàn)可靠的發(fā)布。當(dāng)在通道上啟用發(fā)布者確認(rèn)時(shí),客戶端發(fā)布的消息將由代理 異步確認(rèn) ,這意味著它們已在服務(wù)器端得到處理。 先決條件 本教程假設(shè) RabbitMQ 已安裝并且正在 本地主機(jī) 的標(biāo)準(zhǔn)端口( 5672 )上運(yùn)行。如果您使用了不同的主

    2024年02月16日
    瀏覽(19)
  • 如何在Window系統(tǒng)中安裝RabbitMQ以及在.NET平臺(tái)上實(shí)現(xiàn)收發(fā)消息功能

    以下是接收客戶端代碼: #region RabbitMQ接收客戶端 private ConnectionFactory factory; private IConnection connection; private IModel channel; private EventingBasicConsumer consumer; /// /// 開(kāi)始創(chuàng)建連接對(duì)象 /// public void StartReceiving(string ListenIp,string queueName, string QueueUserName,string QueueUserPassword, int Port = 5672) {

    2024年02月11日
    瀏覽(25)
  • RabbitMQ(二) - RabbitMQ與消息發(fā)布確認(rèn)與返回、消費(fèi)確認(rèn)

    SpringBoot與RabbitMQ整合后,對(duì)RabbitClient的“確認(rèn)”進(jìn)行了封裝、使用方式與RabbitMQ官網(wǎng)不一致; 生產(chǎn)者給交換機(jī)發(fā)送消息后、若是不管了,則會(huì)出現(xiàn)消息丟失; 解決方案1: 交換機(jī)接受到消息、給生產(chǎn)者一個(gè)答復(fù)ack, 若生產(chǎn)者沒(méi)有收到ack, 可能出現(xiàn)消息丟失,因此重新發(fā)送消息;

    2024年02月14日
    瀏覽(21)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包