国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 來配置發(fā)送和消費(fèi) RocketMQ 消息

這篇具有很好參考價值的文章主要介紹了Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 來配置發(fā)送和消費(fèi) RocketMQ 消息。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

? ? ? ?本文解析將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細(xì)節(jié),然后通過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發(fā)送和消費(fèi) RocketMQ 消息。

一、使用方法

添加maven依賴:

<!--在pom.xml中添加依賴-->
<dependency>
? ? <groupId>org.apache.rocketmq</groupId>
? ? <artifactId>rocketmq-spring-boot-starter</artifactId>
? ? <version>${RELEASE.VERSION}</version>
</dependency>

二、發(fā)送消息

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

注意:

請將上述示例配置中的127.0.0.1:9876替換成真實(shí)RocketMQ的NameServer地址與端口

1、編寫代碼

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    public void run(String... args) throws Exception {
      	//send message synchronously
        rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
      	//send spring message
        rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
        //send messgae asynchronously
      	rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                System.out.printf("async onSucess SendResult=%s %n", var1);
            }

            @Override
            public void onException(Throwable var1) {
                System.out.printf("async onException Throwable=%s %n", var1);
            }

        });
      	//Send messages orderly
      	rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
        
        //rocketMQTemplate.destroy(); // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
    }
    
    @Data
    @AllArgsConstructor
    public class OrderPaidEvent implements Serializable{
        private String orderId;
        
        private BigDecimal paidMoney;
    }
}

三、接收消息

1、Push模式

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876

注意:

請將上述示例配置中的127.0.0.1:9876替換成真實(shí)RocketMQ的NameServer地址與端口

編寫代碼

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer1 implements RocketMQListener<String>{
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
    public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
        public void onMessage(OrderPaidEvent orderPaidEvent) {
            log.info("received orderPaidEvent: {}", orderPaidEvent);
        }
    }
}

2、Pull模式

RocketMQ Spring 2.2.0開始,RocketMQ Srping支持Pull模式消費(fèi)

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
# When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer
# If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic
rocketmq.pull-consumer.group=my-group1
rocketmq.pull-consumer.topic=test

注意之前l(fā)ite pull consumer的生效配置為rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易與push-consumer混淆,因此在2.2.3版本之后修改為rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.

編寫代碼

@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Resource(name = "extRocketMQTemplate")
    private RocketMQTemplate extRocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        //This is an example of pull consumer using rocketMQTemplate.
        List<String> messages = rocketMQTemplate.receive(String.class);
        System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);

        //This is an example of pull consumer using extRocketMQTemplate.
        messages = extRocketMQTemplate.receive(String.class);
        System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
    }
}

四、事務(wù)消息

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

注意:

請將上述示例配置中的127.0.0.1:9876替換成真實(shí)RocketMQ的NameServer地址與端口

編寫代碼

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }

    public void run(String... args) throws Exception {
        try {
            // Build a SpringMessage for sending in transaction
            Message msg = MessageBuilder.withPayload(..)...;
            // In sendMessageInTransaction(), the first parameter transaction name ("test")
            // must be same with the @RocketMQTransactionListener's member field 'transName'
            rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
        } catch (MQClientException e) {
            e.printStackTrace(System.out);
        }
    }

    // Define transaction listener with the annotation @RocketMQTransactionListener
    @RocketMQTransactionListener
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // ... local transaction process, return bollback, commit or unknown
            return RocketMQLocalTransactionState.UNKNOWN;
          }

          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // ... check transaction status and return bollback, commit or unknown
            return RocketMQLocalTransactionState.COMMIT;
          }
    }
}

五、消息軌跡

Producer 端要想使用消息軌跡,需要多配置兩個配置項(xiàng):

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic

Consumer 端消息軌跡的功能需要在?@RocketMQMessageListener?中進(jìn)行配置對應(yīng)的屬性:

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    enableMsgTrace = true,
    customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}

注意:

默認(rèn)情況下 Producer 和 Consumer 的消息軌跡功能是開啟的且 trace-topic 為 RMQ_SYS_TRACE_TOPIC Consumer 端的消息軌跡 trace-topic 可以在配置文件中配置?rocketmq.consumer.customized-trace-topic?配置項(xiàng),不需要為在每個?@RocketMQMessageListener?配置。

若需使用阿里云消息軌跡,則需要在@RocketMQMessageListener中將accessChannel配置為CLOUD。

五、ACL功能

Producer 端要想使用 ACL 功能,需要多配置兩個配置項(xiàng):

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK

Consumer 端 ACL 功能需要在?@RocketMQMessageListener?中進(jìn)行配置:

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    accessKey = "AK",
    secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}

注意:

可以不用為每個?@RocketMQMessageListener?注解配置 AK/SK,在配置文件中配置?rocketmq.consumer.access-key?和?rocketmq.consumer.secret-key?配置項(xiàng),這兩個配置項(xiàng)的值就是默認(rèn)值

六、請求 應(yīng)答語義支持

RocketMQ-Spring 提供 請求/應(yīng)答 語義支持。

  • Producer端

發(fā)送Request消息使用SendAndReceive方法

注意

同步發(fā)送需要在方法的參數(shù)中指明返回值類型

異步發(fā)送需要在回調(diào)的接口中指明返回值類型

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    public void run(String... args) throws Exception {
        // 同步發(fā)送request并且等待String類型的返回值
        String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
        System.out.printf("send %s and receive %s %n", "request string", replyString);

        // 異步發(fā)送request并且等待User類型的返回值
        rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
            @Override public void onSuccess(User message) {
                System.out.printf("send user object and receive %s %n", message.toString());
            }

            @Override public void onException(Throwable e) {
                e.printStackTrace();
            }
        }, 5000);
    }
    
    @Data
    @AllArgsConstructor
    public class User implements Serializable{
        private String userName;
    		private Byte userAge;
    }
}
  • Consumer端

需要實(shí)現(xiàn)RocketMQReplyListener<T, R> 接口,其中T表示接收值的類型,R表示返回值的類型。文章來源地址http://www.zghlxwxcb.cn/news/detail-842461.html

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Service
    @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
    public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
        @Override
        public String onMessage(String message) {
          System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
          return "reply string";
        }
      }
   
    @Service
    @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
    public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
        public User onMessage(User user) {
          	System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
          	User replyUser = new User("replyUserName",(byte) 10);	
          	return replyUser;
        }
    }

    @Data
    @AllArgsConstructor
    public class User implements Serializable{
        private String userName;
    		private Byte userAge;
    }
}

到了這里,關(guān)于Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 來配置發(fā)送和消費(fèi) RocketMQ 消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • flink 整合rocketmq

    flink 整合rocketmq

    下面代碼路徑 :source-rocketmq-common-selector DefaultTopicSelector.java 類 TopicSelector.java 下面代碼路徑:source-rocketmq-common-serialization AlarmEventSerializationSchema.java KeyValueDeserializationSchema.java KeyValueSerializationSchema.java VehiclePosInfoDeserializationSchema.java 下面代碼路徑:source-rocketmq-example 這個只是

    2023年04月08日
    瀏覽(16)
  • Springboot 整合RocketMQ

    ?業(yè)務(wù)場景:該接口為H5、小程序提供,用于用戶提交信息,后臺計算用戶數(shù)據(jù),進(jìn)行審核。 根據(jù)用戶提交的手機(jī)號計算用戶數(shù)據(jù),計算用戶實(shí)時數(shù)據(jù)比較長,數(shù)據(jù)量大的3-5分鐘,數(shù)據(jù)小的1分鐘上下,移動端不需要實(shí)時返回用戶計算數(shù)據(jù),所以接口可以保存完用戶基本信息,

    2024年02月11日
    瀏覽(13)
  • Spring Cloud Alibaba整合RocketMQ架構(gòu)原理分析

    Spring Cloud Alibaba整合RocketMQ架構(gòu)原理分析

    關(guān)于RocketMQ的原理,本文就不做詳細(xì)分析了,這里就重點(diǎn)關(guān)注Spring Cloud Alibaba是如何整合RocketrMQ的。 RocketMQ提供了RocketMQ Client SDK,開發(fā)者可以直接依賴這個SDK,就可以完成消息的生產(chǎn)和消費(fèi)。 1.生產(chǎn)消息 RocketMQ Client SDK提供了生產(chǎn)消息的API接口DefaultMQProducer,開發(fā)者可以直接使

    2024年01月22日
    瀏覽(94)
  • SpringBoot整合RocketMQ,老鳥們都是這么玩的!

    SpringBoot整合RocketMQ,老鳥們都是這么玩的!

    今天我們來討論如何在項(xiàng)目開發(fā)中優(yōu)雅地使用RocketMQ。本文分為三部分,第一部分實(shí)現(xiàn)SpringBoot與RocketMQ的整合,第二部分解決在使用RocketMQ過程中可能遇到的一些問題并解決他們,第三部分介紹如何封裝RocketMQ以便更好地使用。 在SpringBoot中集成RocketMQ,只需要簡單四步: 引入

    2023年04月10日
    瀏覽(24)
  • SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的發(fā)送方:生產(chǎn)者 消息的接收方:消費(fèi)者 同步消息:發(fā)送方發(fā)送消息到接收方,接收方有所回應(yīng)后才能夠進(jìn)行下一次的消息發(fā)送 異步消息:不需要接收方回應(yīng)就可以進(jìn)行下一步的發(fā)送 什么是消息隊(duì)列? 當(dāng)此時有很多個用戶同時訪問服務(wù)器,需要服務(wù)器進(jìn)行操作,但此

    2024年04月27日
    瀏覽(53)
  • springboot整合rocketmq:一個消費(fèi)者組怎么訂閱多個topic

    springboot整合rocketmq:一個消費(fèi)者組怎么訂閱多個topic

    ????????一個消費(fèi)者組中的所有消費(fèi)者訂閱關(guān)系,可以多個topic,多個tag,但是必須一致,否則就倒沫子了,如下圖: ?下面貼了如下結(jié)構(gòu)的代碼 ?一個消費(fèi)組(消費(fèi)者)訂閱多個topic的代碼(只寫了一個消費(fèi)組的,其他類似): 結(jié)果:

    2024年02月15日
    瀏覽(26)
  • SpringBoot3.0整合RocketMQ時出現(xiàn)未能加載bean文件

    SpringBoot3.0整合RocketMQ時出現(xiàn)未能加載bean文件

    問題 APPLICATION FAILED TO START Description: Field rocketMQTemplate in com.spt.message.service.MqProducerService required a bean of type ‘org.apache.rocketmq.spring.core.RocketMQTemplate’ that could not be found. The injection point has the following annotations: - @org.springframework.beans.factory.annotation.Autowired(required=true) Action: Consider

    2024年02月12日
    瀏覽(24)
  • Sprint Cloud Stream整合RocketMq和websocket實(shí)現(xiàn)消息發(fā)布訂閱

    Sprint Cloud Stream整合RocketMq和websocket實(shí)現(xiàn)消息發(fā)布訂閱

    1. 引入RocketMQ依賴 :首先,在 pom.xml 文件中添加RocketMQ的依賴: 2. 配置RocketMQ連接信息 :在 application.properties 或 application.yml 中配置RocketMQ的連接信息,包括Name Server地址等: 3.消息發(fā)布組件 4.消息發(fā)布控制器 項(xiàng)目結(jié)構(gòu): 接下來是websocket模塊的搭建 1. 依賴添加 2.application.yml配

    2024年02月08日
    瀏覽(14)
  • 使用 Docker 安裝 RocketMQ 使用 docker 安裝 rocketmq

    Docker常用命令大全 RocketMQ 是一個分布式的消息中間件,由 NameServer 和Broker兩個角色組成,是一種典型的基于發(fā)布/訂閱模式的消息通信解決方案。 NameServer 是 RocketMQ 的命名服務(wù),可以理解為類似于 DNS 的服務(wù),它主要負(fù)責(zé)記錄 Topic 的路由信息和 Broker 的地址信息。每個 Rocket

    2024年02月13日
    瀏覽(25)
  • RocketMQ高階使用

    RocketMQ高階使用

    1. 流程 2. 探討功能點(diǎn) RocketMQ的順序消息 消息投遞策略 消息保障 3. 順序消息 3.1 順序類型 3.1.1 無序消息 無序消息也指普通的消息,Producer 只管發(fā)送消息,Consumer 只管接收消息,至于消息和消息之間的順序并沒有保證。 Producer 依次發(fā)送 orderId 為 1、2、3 的消息 Consumer 接到的消

    2024年02月16日
    瀏覽(14)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包