国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SpringBoot集成RocketMQ實現(xiàn)三種消息發(fā)送方式

這篇具有很好參考價值的文章主要介紹了SpringBoot集成RocketMQ實現(xiàn)三種消息發(fā)送方式。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

一、pom文件引入依賴

二、application.yml文件添加內(nèi)容

三、創(chuàng)建producer生產(chǎn)者

四、創(chuàng)建Consumer消費(fèi)者(創(chuàng)建兩個消費(fèi)者,所屬一個Topic)

五、啟動項目測試


RocketMQ 支持3 種消息發(fā)送方式: 同步 (sync)、異步(async)、單向(oneway)。

  • 同步:發(fā)送者向 MQ 執(zhí)行發(fā)送消息API 時,同步等待,直到消息服務(wù)器返回發(fā)送結(jié)果。
  • 異步:發(fā)送者向MQ 執(zhí)行發(fā)送消息API 時,指定消息發(fā)送成功后的回調(diào)函數(shù),然后調(diào)用消息發(fā)送API 后,立即返回,消息發(fā)送者線程不阻塞,直到運(yùn)行結(jié)束,消息發(fā)送成功或失敗的回調(diào)任務(wù)在一個新的線程中返回。
  • 單向:消息發(fā)送者向MQ 執(zhí)行發(fā)送消息API 時,直接返回,不等待消息服務(wù)器的結(jié)果,也不注冊回調(diào)函數(shù),只管發(fā),不管是否成功存儲在消息服務(wù)器上。

前提:

運(yùn)行項目需要具備RocketMQ環(huán)境,參考Docker搭建RocketMQ集群

一、pom文件引入依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

二、application.yml文件添加內(nèi)容

rocketmq:
  name-server: IP:9876  #IP為rocketmq訪問的地址
  producer:
    group: first1-group  #事務(wù)消息才會用到

三、創(chuàng)建producer生產(chǎn)者

rocketmq發(fā)送異步消息,spring boot,java-rocketmq,rocketmq

package com.tlxy.lhn.controller.rocketmq;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RocketController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @RequestMapping(value = "/rocket", method = RequestMethod.GET)
    public void noTag() {
        // convertAndSend() 發(fā)送普通字符串消息
        rocketMQTemplate.convertAndSend("sendMessage_topic", "Hello Word");
    }

    @RequestMapping(value = "/tagA", method = RequestMethod.GET)
    public void tagA() {
        rocketMQTemplate.convertAndSend("sendMessage_topic:tagA", "hello world tagA");
    }

    @RequestMapping(value = "/tagB", method = RequestMethod.GET)
    public void tagB() {
        rocketMQTemplate.convertAndSend("sendMessage_topic:tagB", "hello world tagB");
    }

    @RequestMapping(value = "/syncSend", method = RequestMethod.GET)
    public void syncSend() {
        String json = "發(fā)送同步消息";
        SendResult sendResult = rocketMQTemplate.syncSend("sendMessage_topic:1", json);
        System.out.println(sendResult);
    }

    @RequestMapping(value = "/aSyncSend", method = RequestMethod.GET)
    public void aSyncSend() {
        String json = "發(fā)送異步消息";
        SendCallback callback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("發(fā)送消息成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("發(fā)送消息失敗");
            }
        };
        rocketMQTemplate.asyncSend("sendMessage_topic", json, callback);
    }

    @RequestMapping(value = "/sendOneWay", method = RequestMethod.GET)
    public void sendOneWay() {
        rocketMQTemplate.sendOneWay("sendMessage_topic", "發(fā)送單向消息");
    }
}

四、創(chuàng)建Consumer消費(fèi)者(創(chuàng)建兩個消費(fèi)者,所屬一個Topic)

Consumer1:

package com.tlxy.lhn.controller.rocketmq;

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * topic 是主題
 * consumerGroup 是消費(fèi)者組,一條消息只能被同一個消費(fèi)者組里的一個消費(fèi)者消費(fèi)。
 * selectorExpression 是用于消息過濾的,以 TAG 方式為例:
 * 默認(rèn)為 "*",表示不過濾,消費(fèi)此 topic 下所有消息
 * 配置為 "tagA",表示只消費(fèi)此 topic 下 TAG = tagA 的消息
 * 配置為 "tagA || tagB",表示消費(fèi)此 topic 下 TAG = tagA 或  TAG = tagB 的消息,以此類推
 * 消費(fèi)模式:默認(rèn) CLUSTERING ( CLUSTERING:負(fù)載均衡 )( BROADCASTING:廣播機(jī)制 )
 */
@RocketMQMessageListener(topic = "sendMessage_topic",
        consumerGroup = "consumer-group-test1",
//        selectorExpression = "tagA || tagB",
        messageModel = MessageModel.CLUSTERING)
@Component
public class DemoConsumer1 implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("receive message1:" + s);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("處理完成");
    }
}

Consumer2:

package com.tlxy.lhn.controller.rocketmq;

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * topic 是主題
 * consumerGroup 是消費(fèi)者組,一條消息只能被同一個消費(fèi)者組里的一個消費(fèi)者消費(fèi)。
 * selectorExpression 是用于消息過濾的,以 TAG 方式為例:
 * 默認(rèn)為 "*",表示不過濾,消費(fèi)此 topic 下所有消息
 * 配置為 "tagA",表示只消費(fèi)此 topic 下 TAG = tagA 的消息
 * 配置為 "tagA || tagB",表示消費(fèi)此 topic 下 TAG = tagA 或  TAG = tagB 的消息,以此類推
 * 消費(fèi)模式:默認(rèn) CLUSTERING ( CLUSTERING:負(fù)載均衡 )( BROADCASTING:廣播機(jī)制 )
 */
@RocketMQMessageListener(topic = "sendMessage_topic",
        consumerGroup = "consumer-group-test1",
        messageModel = MessageModel.CLUSTERING)
@Component
public class DemoConsumer2 implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("receive message2:" + s);
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("處理完成");
    }
}

五、啟動項目測試

1、發(fā)送同步消息:

rocketmq發(fā)送異步消息,spring boot,java-rocketmq,rocketmq

rocketmq發(fā)送異步消息,spring boot,java-rocketmq,rocketmq

?以上輸出可以看到:同步消息發(fā)送后,消息發(fā)送到broker后就返回結(jié)果了,消費(fèi)端還未處理完,兩者互互不影響。

2、發(fā)送異步消息:

rocketmq發(fā)送異步消息,spring boot,java-rocketmq,rocketmq

rocketmq發(fā)送異步消息,spring boot,java-rocketmq,rocketmq

?以上輸出:發(fā)送者向MQ 執(zhí)行發(fā)送消息API 時,指定消息發(fā)送成功后的回調(diào)函數(shù),然后調(diào)用消息發(fā)送API 后,立即返回,消息發(fā)送者線程不阻塞,直到運(yùn)行結(jié)束,消息發(fā)送成功或失敗的回調(diào)任務(wù)在一個新的線程中返回。

3、發(fā)送單向消息:

rocketmq發(fā)送異步消息,spring boot,java-rocketmq,rocketmq

rocketmq發(fā)送異步消息,spring boot,java-rocketmq,rocketmq

?以上輸出:消息發(fā)送者向MQ 執(zhí)行發(fā)送消息API 時,直接返回,不等待消息服務(wù)器的結(jié)果,也不注冊回調(diào)函數(shù),只管發(fā),不管是否成功存儲在消息服務(wù)器上。文章來源地址http://www.zghlxwxcb.cn/news/detail-637819.html

到了這里,關(guān)于SpringBoot集成RocketMQ實現(xiàn)三種消息發(fā)送方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RocketMQ如何實現(xiàn)消息軌跡:消息何時發(fā)送的?耗時多久?誰消費(fèi)的?存在哪個broker了?

    RocketMQ如何實現(xiàn)消息軌跡:消息何時發(fā)送的?耗時多久?誰消費(fèi)的?存在哪個broker了?

    更多RocketMQ內(nèi)容,見專欄:https://blog.csdn.net/saintmm/category_11280399.html 消息軌跡簡單來說就是日志,其把消息的生產(chǎn)、存儲、消費(fèi)等所有的訪問和操作日志。 在項目中存在發(fā)送方與消費(fèi)方相互“扯皮”的情況: 發(fā)送方說消息已經(jīng)發(fā)送成功,而消費(fèi)方說沒有消費(fèi)到。 這時我們就希

    2024年01月17日
    瀏覽(26)
  • java實現(xiàn)阿里云rocketMQ消息的發(fā)送與消費(fèi)(tcp協(xié)議sdk)

    java實現(xiàn)阿里云rocketMQ消息的發(fā)送與消費(fèi)(tcp協(xié)議sdk)

    登錄阿里云官網(wǎng),先申請rocketMQ,再申請Topic、Group ID,然后就是參考阿里云的JAVA SDK進(jìn)行編程實現(xiàn)。 環(huán)境要求: 安裝JDK 1.8或以上版本 安裝Maven 安裝Java SDK 參照 阿里云 官方文檔,來一步一步操作。 文檔提供的SDK有 TCP 和Http協(xié)議,這里使用 TCP協(xié)議 來實現(xiàn)rocketMQ消息的發(fā)送與消

    2024年02月07日
    瀏覽(30)
  • 【Spring Boot】集成Kafka實現(xiàn)消息發(fā)送和訂閱

    【Spring Boot】集成Kafka實現(xiàn)消息發(fā)送和訂閱

    最近忙著搞低代碼開發(fā),好久沒新建spring項目了,結(jié)果今天心血來潮準(zhǔn)備建個springboot項目 注意Type選Maven,java選8,其他默認(rèn) 點下一步后完成就新建了一個spring boot項目,配置下Maven環(huán)境,主要是settings.xml文件,里面要包含阿里云倉庫,不然可能依賴下載不下來 在maven配置沒問

    2024年02月09日
    瀏覽(32)
  • RocketMQ 發(fā)送批量消息、過濾消息和事務(wù)消息

    RocketMQ 發(fā)送批量消息、過濾消息和事務(wù)消息

    前面我們知道RocketMQ 發(fā)送延時消息與順序消息,現(xiàn)在我們看下怎么發(fā)送批量消息、過濾消息和事務(wù)消息。 限制是這些批量消息應(yīng)該有相同的 topic,相同的 waitStoreMsgOK,而且不能是延時消息。 此外,這一批消息的總大小不應(yīng)超過4MB。 消息的生產(chǎn)者 消息的消費(fèi)者 消息分割 如果

    2023年04月21日
    瀏覽(20)
  • SpringBoot實現(xiàn)WebSocket發(fā)送接收消息 + Vue實現(xiàn)SocketJs接收發(fā)送消息

    1、https://www.mchweb.net/index.php/dev/887.html 2、https://itonline.blog.csdn.net/article/details/81221103?spm=1001.2101.3001.6661.1utm_medium=distribute.pc_relevant_t0.none-task-blog-2~default~CTRLIST~default-1-81221103-blog-121078449.pc_relevant_aadepth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2~default~CTRLIST~default-1-81221103-blog-12107

    2024年02月05日
    瀏覽(19)
  • RocketMQ發(fā)送消息失敗排查

    RocketMQ發(fā)送消息失敗排查

    錯誤信息: 錯誤截圖: 查看結(jié)果: 說明:發(fā)現(xiàn)對應(yīng)的訂閱組已經(jīng)離線(查看對應(yīng)的項目MQ地址和配置都是正確的),然后從服務(wù)日志中也看不出更多的問題 說明:調(diào)整服務(wù)日志級別到info,通過詳細(xì)的日志信息定位發(fā)送失敗的原因 日志截圖: 說明:日志不斷打印 closeChanne

    2024年02月04日
    瀏覽(25)
  • RocketMQ發(fā)送消息超時異常

    RocketMQ發(fā)送消息超時異常

    說明:在使用RocketMQ發(fā)送消息時,出現(xiàn)下面這個異常(org.springframework.messging.MessgingException:sendDefaultImpl call timeout……); 解決:修改RocketMQ中broke.conf配置,添加下面這兩行配置,重啟服務(wù)后再試就可以了; 啟動時,注意使用下面的命令,帶上配置文件

    2024年02月13日
    瀏覽(22)
  • RocketMQ發(fā)送消息

    RocketMQ發(fā)送消息

    目錄 一.消費(fèi)模式?編輯 二.發(fā)送消息 1.普通消息 同步消息(***)? 異步消息(***) 單向消息(*) 日志服務(wù)的編寫思路 2.延遲消息(***) 延遲等級? 3.批量消息 4.順序消息(*) 三.Tag過濾 訂閱關(guān)系的一致性 ①訂閱一個Topic且訂閱一個Tag ②訂閱一個Topic且訂閱多個Tag ③訂閱多個Topic且訂閱多

    2024年02月11日
    瀏覽(23)
  • rocketMQ-console 發(fā)送消息

    rocketMQ-console 發(fā)送消息

    rocketMQ-console是一款非常使用的rocketMQ擴(kuò)展工具 工具代碼倉 mirrors / apache / rocketmq-externals · GitCode 安裝詳細(xì)教程 ??????rocketMQ學(xué)習(xí)筆記二:RocketMQ-Console安裝、使用詳解_麥田里的碼農(nóng)-CSDN博客_rocketmq-consoled 直接來到工具頁面 ,右上角可以切換語言 發(fā)送消息流程 1.點擊 最

    2024年02月14日
    瀏覽(19)
  • RocketMQ如何安全的批量發(fā)送消息?

    優(yōu)點: 批量發(fā)送消息可以提高rocketmq的生產(chǎn)者性能和吞吐量。 使用場景: 發(fā)送大量小型消息時; 需要降低消息發(fā)送延遲時; 需要提高生產(chǎn)者性能時; 注意事項: 消息列表的大小不能超過broker設(shè)置的最大消息大小; 消息列表的大小不能超過生產(chǎn)證設(shè)置的maxMessageSize 參數(shù),此參

    2024年02月03日
    瀏覽(48)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包