国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

rocketMq消息隊(duì)列詳細(xì)使用與實(shí)踐整合spring

這篇具有很好參考價(jià)值的文章主要介紹了rocketMq消息隊(duì)列詳細(xì)使用與實(shí)踐整合spring。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、RocketMQ原生API使用

使用RocketMQ的原生API開發(fā)是最簡單也是目前看來最牢靠的方式。這里用SpringBoot來搭建一系列消息生產(chǎn)者和消息消費(fèi)者,來訪問之前搭建的RocketMQ集群。

1、測(cè)試環(huán)境搭建

首先創(chuàng)建一個(gè)基于Maven的SpringBoot工程,引入如下依賴:

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.7.1</version>
</dependency>

RocketMQ的官網(wǎng)上有很多經(jīng)典的測(cè)試代碼,這些代碼雖然依賴的版本比較老,但是還是都可以運(yùn)行的。以官網(wǎng)上的順序進(jìn)行學(xué)習(xí)。

2、RocketMQ的編程模型

然后RocketMQ的生產(chǎn)者和消費(fèi)者的編程模型都是有個(gè)比較固定的步驟的,掌握這個(gè)固定的步驟,對(duì)于我們學(xué)習(xí)源碼以及以后使用都是很有幫助的。

  • 消息發(fā)送者的固定步驟

    1.創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名

    2.指定Nameserver地址

    3.啟動(dòng)producer

    4.創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體

    5.發(fā)送消息

    6.關(guān)閉生產(chǎn)者producer

  • 消息消費(fèi)者的固定步驟

    1.創(chuàng)建消費(fèi)者Consumer,制定消費(fèi)者組名

    2.指定Nameserver地址

    3.訂閱主題Topic和Tag

    4.設(shè)置回調(diào)函數(shù),處理消息

    5.啟動(dòng)消費(fèi)者consumer

3、RocketMQ的消息樣例

RocketMQ都支持類型的消息:

3.1 基本樣例

同步發(fā)送:能夠?qū)崟r(shí)知道消息是否從生產(chǎn)者推送到broke

異步發(fā)送:能夠異步回調(diào)的方式知道消息是否從生產(chǎn)者推送的broke

單向發(fā)送:消息發(fā)送出去就不管了,無法感知消息是否從生產(chǎn)者推送到broke的狀態(tài)

基本樣例部分我們使用消息生產(chǎn)者分別通過三種方式發(fā)送消息同步發(fā)送、異步發(fā)送以及單向發(fā)送。

然后使用消費(fèi)者來消費(fèi)這些消息。

1、同步發(fā)送消息的樣例見:org.apache.rocketmq.example.simple.Producer

2、異步發(fā)送消息的樣例見:org.apache.rocketmq.example.simple.AsyncProducer

等待消息返回后再繼續(xù)進(jìn)行下面的操作。

這個(gè)示例有個(gè)比較有趣的地方就是引入了一個(gè)countDownLatch來保證所有消息回調(diào)方法都執(zhí)行完了再關(guān)閉Producer。 所以從這里可以看出,RocketMQ的Producer也是一個(gè)服務(wù)端,在往Broker發(fā)送消息的時(shí)候也要作為服務(wù)端提供服務(wù)。

3、單向發(fā)送消息的樣例:

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);        
        producer.shutdown();
    }
}

關(guān)鍵點(diǎn)就是使用producer.sendOneWay方式來發(fā)送消息,這個(gè)方法沒有返回值,也沒有回調(diào)。就是只管把消息發(fā)出去就行了。

4、使用消費(fèi)者消費(fèi)消息。
消費(fèi)者消費(fèi)消息有兩種模式,

一種是消費(fèi)者主動(dòng)去Broker上拉取消息的拉模式,

另一種是消費(fèi)者等待Broker把消息推送過來的推模式。

拉模式的樣例見:org.apache.rocketmq.example.simple.PullConsumer

推模式的樣例見:org.apache.rocketmq.example.simple.PushConsumer

通常情況下,用推模式比較簡單。

實(shí)際上RocketMQ的推模式也是由拉模式封裝出來的。

4.7.1版本中DefaultMQPullConsumerImpl這個(gè)消費(fèi)者類已標(biāo)記為過期,但是還是可以使用的。替換的類是DefaultLitePullConsumerImpl。

3.2 順序消息

順序消息生產(chǎn)者樣例見:org.apache.rocketmq.example.order.Producer

順序消息消費(fèi)者樣例見:org.apache.rocketmq.example.order.Consumer

驗(yàn)證時(shí),可以啟動(dòng)多個(gè)Consumer實(shí)例,觀察下每一個(gè)訂單的消息分配以及每個(gè)訂單下多個(gè)步驟的消費(fèi)順序。
不管訂單在多個(gè)Consumer實(shí)例之前是如何分配的,每個(gè)訂單下的多條消息順序都是固定從0~5的。

RocketMQ保證的是消息的局部有序,而不是全局有序。

mqs是什么(一個(gè)topic中對(duì)應(yīng)的多個(gè)隊(duì)列)。
再回看我們的樣例,實(shí)際上,RocketMQ也只保證了每個(gè)OrderID的所有消息有序(發(fā)到了同一個(gè)queue),而并不能保證所有消息都有序。所以這就涉及到了RocketMQ消息有序的原理。要保證最終消費(fèi)到的消息是有序的,需要從Producer、Broker、Consumer三個(gè)步驟都保證消息有序才行。

首先在發(fā)送者端:在默認(rèn)情況下,消息發(fā)送者會(huì)采取Round Robin輪詢方式把消息發(fā)送到不同的MessageQueue(分區(qū)隊(duì)列),而消費(fèi)者消費(fèi)的時(shí)候也從多個(gè)MessageQueue上拉取消息,這種情況下消息是不能保證順序的。而只有當(dāng)一組有序的消息發(fā)送到同一個(gè)MessageQueue上時(shí),才能利用MessageQueue先進(jìn)先出的特性保證這一組消息有序。

而Broker中一個(gè)隊(duì)列內(nèi)的消息是可以保證有序的。

然后在消費(fèi)者端:消費(fèi)者會(huì)從多個(gè)消息隊(duì)列上去拿消息。這時(shí)雖然每個(gè)消息隊(duì)列上的消息是有序的,但是多個(gè)隊(duì)列之間的消息仍然是亂序的。消費(fèi)者端要保證消息有序,就需要按隊(duì)列一個(gè)一個(gè)來取消息,即取完一個(gè)隊(duì)列的消息后,再去取下一個(gè)隊(duì)列的消息。而給consumer注入的MessageListenerOrderly對(duì)象,在RocketMQ內(nèi)部就會(huì)通過鎖隊(duì)列的方式保證消息是一個(gè)一個(gè)隊(duì)列來取的。MessageListenerConcurrently這個(gè)消息監(jiān)聽器則不會(huì)鎖隊(duì)列,每次都是從多個(gè)Message中取一批數(shù)據(jù)(默認(rèn)不超過32條)。因此也無法保證消息有序。

3.3 廣播消息

廣播消息的消息生產(chǎn)者樣例見:org.apache.rocketmq.example.broadcast.PushConsumer

廣播消息并沒有特定的消息消費(fèi)者樣例,這是因?yàn)檫@涉及到消費(fèi)者的集群消費(fèi)模式。在集群狀態(tài)(MessageModel.CLUSTERING)下,每一條消息只會(huì)被同一個(gè)消費(fèi)者組中的一個(gè)實(shí)例消費(fèi)到(這跟kafka和rabbitMQ的集群模式是一樣的)。而廣播模式則是把消息發(fā)給了所有訂閱了對(duì)應(yīng)主題的消費(fèi)者,而不管消費(fèi)者是不是同一個(gè)消費(fèi)者組。

3.4 延遲消息

延遲消息的生產(chǎn)者案例

public class ScheduledMessageProducer {
   
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }
   
        // Shutdown producer after use.
        producer.shutdown();
    }
       
}

延遲消息實(shí)現(xiàn)的效果就是在調(diào)用producer.send方法后,消息并不會(huì)立即發(fā)送出去,而是會(huì)等一段時(shí)間再發(fā)送出去。這是RocketMQ特有的一個(gè)功能。

那會(huì)延遲多久呢?延遲時(shí)間的設(shè)置就是在Message消息對(duì)象上設(shè)置一個(gè)延遲級(jí)別message.setDelayTimeLevel(3);

開源版本的RocketMQ中,對(duì)延遲消息并不支持任意時(shí)間的延遲設(shè)定(商業(yè)版本中支持),而是只支持18個(gè)固定的延遲級(jí)別,1到18分別對(duì)應(yīng)messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。這從哪里看出來的?其實(shí)從rocketmq-console控制臺(tái)就能看出來。而這18個(gè)延遲級(jí)別也支持自行定義,不過一般情況下最好不要自定義修改。

那這么好用的延遲消息是怎么實(shí)現(xiàn)的?這18個(gè)延遲級(jí)別除了在延遲消息中用,還有什么地方用到了?別急,我們會(huì)在后面部分進(jìn)行詳細(xì)講解。

3.5 批量消息

批量消息是指將多條消息合并成一個(gè)批量消息,一次發(fā)送出去。這樣的好處是可以減少網(wǎng)絡(luò)IO,提升吞吐量。

批量消息的消息生產(chǎn)者樣例見:org.apache.rocketmq.example.batch.SimpleBatchProducer和org.apache.rocketmq.example.batch.SplitBatchProducer

相信大家在官網(wǎng)以及測(cè)試代碼中都看到了關(guān)鍵的注釋:如果批量消息大于1MB就不要用一個(gè)批次發(fā)送,而要拆分成多個(gè)批次消息發(fā)送。也就是說,一個(gè)批次消息的大小不要超過1MB

實(shí)際使用時(shí),這個(gè)1MB的限制可以稍微擴(kuò)大點(diǎn),實(shí)際最大的限制是4194304字節(jié),大概4MB。但是使用批量消息時(shí),這個(gè)消息長度確實(shí)是必須考慮的一個(gè)問題。而且批量消息的使用是有一定限制的,這些消息應(yīng)該有相同的Topic,相同的waitStoreMsgOK。而且不能是延遲消息、事務(wù)消息等。

3.6 過濾消息

在大多數(shù)情況下,可以使用Message的Tag屬性來簡單快速的過濾信息。

使用Tag過濾消息的消息生產(chǎn)者案例見:org.apache.rocketmq.example.filter.TagFilterProducer

使用Tag過濾消息的消息消費(fèi)者案例見:org.apache.rocketmq.example.filter.TagFilterConsumer

主要是看消息消費(fèi)者。consumer.subscribe(“TagFilterTest”, “TagA || TagC”); 這句只訂閱TagA和TagC的消息。

TAG是RocketMQ中特有的一個(gè)消息屬性。RocketMQ的最佳實(shí)踐中就建議,使用RocketMQ時(shí),一個(gè)應(yīng)用可以就用一個(gè)Topic,而應(yīng)用中的不同業(yè)務(wù)就用TAG來區(qū)分。

但是,這種方式有一個(gè)很大的限制,就是一個(gè)消息只能有一個(gè)TAG,這在一些比較復(fù)雜的場(chǎng)景就有點(diǎn)不足了。 這時(shí)候,可以使用SQL表達(dá)式來對(duì)消息進(jìn)行過濾。

SQL過濾的消息生產(chǎn)者案例見:org.apache.rocketmq.example.filter.SqlFilterProducer

SQL過濾的消息消費(fèi)者案例見:org.apache.rocketmq.example.filter.SqlFilterConsumer

,>=,<,<=,BETWEEN,=;**
* 字符比較,比如:=,<>,IN;
* IS NULL 或者 IS NOT NULL;
* 邏輯符號(hào) AND,OR,NOT;

常量支持類型為:

* 數(shù)值,比如:123,3.1415;
* 字符,比如:‘a(chǎn)bc’,必須用單引號(hào)包裹起來;
* NULL,特殊的常量
* 布爾值,TRUEFALSE

使用注意:只有推模式的消費(fèi)者可以使用SQL過濾。拉模式是用不了的。

大家想一下,這個(gè)消息過濾是在Broker端進(jìn)行的還是在Consumer端進(jìn)行的?

3.7 事務(wù)消息

這個(gè)事務(wù)消息是RocketMQ提供的一個(gè)非常有特色的功能,需要著重理解。

首先,我們了解下什么是事務(wù)消息。官網(wǎng)的介紹是:事務(wù)消息是在分布式系統(tǒng)中保證最終一致性的兩階段提交的消息實(shí)現(xiàn)。他可以保證本地事務(wù)執(zhí)行與消息發(fā)送兩個(gè)操作的原子性,也就是這兩個(gè)操作一起成功或者一起失敗。

其次,我們來理解下事務(wù)消息的編程模型。事務(wù)消息只保證消息發(fā)送者的本地事務(wù)與發(fā)消息這兩個(gè)操作的原子性,因此,事務(wù)消息的示例只涉及到消息發(fā)送者,對(duì)于消息消費(fèi)者來說,并沒有什么特別的。

事務(wù)消息生產(chǎn)者的案例見:org.apache.rocketmq.example.transaction.TransactionProducer

事務(wù)消息的關(guān)鍵是在TransactionMQProducer中指定了一個(gè)TransactionListener事務(wù)監(jiān)聽器,這個(gè)事務(wù)監(jiān)聽器就是事務(wù)消息的關(guān)鍵控制器。源碼中的案例有點(diǎn)復(fù)雜,我這里準(zhǔn)備了一個(gè)更清晰明了的事務(wù)監(jiān)聽器示例

public class TransactionListenerImpl implements TransactionListener {
  //在提交完事務(wù)消息后執(zhí)行。
  //返回COMMIT_MESSAGE狀態(tài)的消息會(huì)立即被消費(fèi)者消費(fèi)到。
  //返回ROLLBACK_MESSAGE狀態(tài)的消息會(huì)被丟棄。
  //返回UNKNOWN狀態(tài)的消息會(huì)由Broker過一段時(shí)間再來回查事務(wù)的狀態(tài)。
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        //TagA的消息會(huì)立即被消費(fèi)者消費(fèi)到
        if(StringUtils.contains(tags,"TagA")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagB的消息會(huì)被丟棄
        }else if(StringUtils.contains(tags,"TagB")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //其他消息會(huì)等待Broker進(jìn)行事務(wù)狀態(tài)回查。
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
  //在對(duì)UNKNOWN狀態(tài)的消息進(jìn)行狀態(tài)回查時(shí)執(zhí)行。返回的結(jié)果是一樣的。
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    String tags = msg.getTags();
        //TagC的消息過一段時(shí)間會(huì)被消費(fèi)者消費(fèi)到
        if(StringUtils.contains(tags,"TagC")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagD的消息也會(huì)在狀態(tài)回查時(shí)被丟棄掉
        }else if(StringUtils.contains(tags,"TagD")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //剩下TagE的消息會(huì)在多次狀態(tài)回查后最終丟棄
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
}

然后,我們要了解下事務(wù)消息的使用限制:

1、事務(wù)消息不支持延遲消息和批量消息。

2、為了避免單個(gè)消息被檢查太多次而導(dǎo)致半隊(duì)列消息累積,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為 15 次,但是用戶可以通過 Broker 配置文件的 transactionCheckMax參數(shù)來修改此限制。如果已經(jīng)檢查某條消息超過 N 次的話( N = transactionCheckMax ) 則 Broker 將丟棄此消息,并在默認(rèn)情況下同時(shí)打印錯(cuò)誤日志。用戶可以通過重寫 AbstractTransactionCheckListener 類來修改這個(gè)行為。

回查次數(shù)是由BrokerConfig.transactionCheckMax這個(gè)參數(shù)來配置的,默認(rèn)15次,可以在broker.conf中覆蓋。
然后實(shí)際的檢查次數(shù)會(huì)在message中保存一個(gè)用戶屬性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES。這個(gè)屬性值大于transactionCheckMax,就會(huì)丟棄。 這個(gè)用戶屬性值會(huì)按回查次數(shù)遞增,也可以在Producer中自行覆蓋這個(gè)屬性。

? 3、事務(wù)消息將在 Broker 配置文件中的參數(shù) transactionMsgTimeout 這樣的特定時(shí)間長度之后被檢查。當(dāng)發(fā)送事務(wù)消息時(shí),用戶還可以通過設(shè)置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個(gè)限制,該參數(shù)優(yōu)先于 transactionMsgTimeout 參數(shù)。

由BrokerConfig.transactionTimeOut這個(gè)參數(shù)來配置。默認(rèn)6秒,可以在broker.conf中進(jìn)行修改。
另外,也可以給消息配置一個(gè)MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS屬性來給消息指定一個(gè)特定的消息回查時(shí)間。
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”); 這樣就是10秒。

4、事務(wù)性消息可能不止一次被檢查或消費(fèi)。

5、提交給用戶的目標(biāo)主題消息可能會(huì)失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機(jī)制來保證,如果希望確保事務(wù)消息不丟失、并且事務(wù)完整性得到保證,建議使用同步的雙重寫入機(jī)制。

6、事務(wù)消息的生產(chǎn)者 ID 不能與其他類型消息的生產(chǎn)者 ID 共享。與其他類型的消息不同,事務(wù)消息允許反向查詢、MQ服務(wù)器能通過它們的生產(chǎn)者 ID 查詢到消費(fèi)者。

接下來,我們還要了解下事務(wù)消息的實(shí)現(xiàn)機(jī)制,參見下圖:

rocketMq消息隊(duì)列詳細(xì)使用與實(shí)踐整合spring,java-rocketmq,rocketmq,spring,springboot,spring cloud,消息隊(duì)列

事務(wù)消息機(jī)制的關(guān)鍵是在發(fā)送消息時(shí),會(huì)將消息轉(zhuǎn)為一個(gè)half半消息,并存入RocketMQ內(nèi)部的一個(gè) RMQ_SYS_TRANS_HALF_TOPIC 這個(gè)Topic,這樣對(duì)消費(fèi)者是不可見的。再經(jīng)過一系列事務(wù)檢查通過后,再將消息轉(zhuǎn)存到目標(biāo)Topic,這樣對(duì)消費(fèi)者就可見了。

最后,我們還需要思考下事務(wù)消息的作用。

大家想一下這個(gè)事務(wù)消息跟分布式事務(wù)有什么關(guān)系?為什么扯到了分布式事務(wù)相關(guān)的兩階段提交上了?事務(wù)消息只保證了發(fā)送者本地事務(wù)和發(fā)送消息這兩個(gè)操作的原子性,但是并不保證消費(fèi)者本地事務(wù)的原子性,所以,事務(wù)消息只保證了分布式事務(wù)的一半。但是即使這樣,對(duì)于復(fù)雜的分布式事務(wù),RocketMQ提供的事務(wù)消息也是目前業(yè)內(nèi)最佳的降級(jí)方案。

3.8 ACL權(quán)限控制

權(quán)限控制(ACL)主要為RocketMQ提供Topic資源級(jí)別的用戶訪問控制。用戶在使用RocketMQ權(quán)限控制時(shí),可以在Client客戶端通過 RPCHook注入AccessKey和SecretKey簽名;同時(shí),將對(duì)應(yīng)的權(quán)限控制屬性(包括Topic訪問權(quán)限、IP白名單和AccessKey和SecretKey簽名等)設(shè)置在$ROCKETMQ_HOME/conf/plain_acl.yml的配置文件中。Broker端對(duì)AccessKey所擁有的權(quán)限進(jìn)行校驗(yàn),校驗(yàn)不過,拋出異常; ACL客戶端可以參考:org.apache.rocketmq.example.simple包下面的AclClient代碼。

注意,如果要在自己的客戶端中使用RocketMQ的ACL功能,還需要引入一個(gè)單獨(dú)的依賴包

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1</version>
</dependency>

? 而Broker端具體的配置信息可以參見源碼包下docs/cn/acl/user_guide.md。主要是在broker.conf中打開acl的標(biāo)志:aclEnable=true。然后就可以用plain_acl.yml來進(jìn)行權(quán)限配置了。并且這個(gè)配置文件是熱加載的,也就是說要修改配置時(shí),只要修改配置文件就可以了,不用重啟Broker服務(wù)。我們來簡單分析下源碼中的plan_acl.yml的配置:

#全局白名單,不受ACL控制
#通常需要將主從架構(gòu)中的所有節(jié)點(diǎn)加進(jìn)來
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*

accounts:
#第一個(gè)賬戶
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false 
  defaultTopicPerm: DENY #默認(rèn)Topic訪問策略是拒絕
  defaultGroupPerm: SUB #默認(rèn)Group訪問策略是只允許訂閱
  topicPerms:
  - topicA=DENY #topicA拒絕
  - topicB=PUB|SUB #topicB允許發(fā)布和訂閱消息
  - topicC=SUB #topicC只允許訂閱
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB
#第二個(gè)賬戶,只要是來自192.168.1.*的IP,就可以訪問所有資源
- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true

二、SpringBoot整合RocketMQ

1、快速實(shí)戰(zhàn)

這部分我們看下SpringBoot如何快速集成RocketMQ。

在使用SpringBoot的starter集成包時(shí),要特別注意版本。因?yàn)镾pringBoot集成RocketMQ的starter依賴是由Spring社區(qū)提供的,目前正在快速迭代的過程當(dāng)中,不同版本之間的差距非常大,甚至基礎(chǔ)的底層對(duì)象都會(huì)經(jīng)常有改動(dòng)。例如如果使用rocketmq-spring-boot-starter:2.0.4版本開發(fā)的代碼,升級(jí)到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。

我們創(chuàng)建一個(gè)maven工程,引入關(guān)鍵依賴:

<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
    </dependencies>

rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,這里把SpringBoot的依賴包升級(jí)了一下。

然后我們以SpringBoot的方式,快速創(chuàng)建一個(gè)簡單的Demo

啟動(dòng)類:

@SpringBootApplication
public class RocketMQScApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketMQScApplication.class,args);
    }
}

配置文件 application.properties

#NameServer地址
rocketmq.name-server=192.168.232.128:9876
#默認(rèn)的消息生產(chǎn)者組
rocketmq.producer.group=springBootGroup

消息生產(chǎn)者

package com.roy.rocket.basic;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
public class SpringProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
  //發(fā)送普通消息的示例
    public void sendMessage(String topic,String msg){
        this.rocketMQTemplate.convertAndSend(topic,msg);
    }
  //發(fā)送事務(wù)消息的示例
    public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload(msg).build();
            String destination =topic+":"+tags[i % tags.length];
            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);
            System.out.printf("%s%n", sendResult);

            Thread.sleep(10);
        }
    }
}

消息消費(fèi)者

package com.roy.rocket.basic;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message : "+ message);
    }
}

SpringBoot集成RocketMQ,消費(fèi)者部分的核心就在這個(gè)@RocketMQMessageListener注解上。所有消費(fèi)者的核心功能也都會(huì)集成到這個(gè)注解中。所以我們還要注意下這個(gè)注解里面的屬性:

例如:消息過濾可以由里面的selectorType屬性和selectorExpression來定制

消息有序消費(fèi)還是并發(fā)消費(fèi)則由consumeMode屬性定制。

消費(fèi)者是集群部署還是廣播部署由messageModel屬性定制。

然后關(guān)于事務(wù)消息,還需要配置一個(gè)事務(wù)消息監(jiān)聽器:

package com.roy.rocket.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author :
 * @date :Created in 2020/11/5
 * @description:
 **/

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {

    private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object id = msg.getHeaders().get("id");
        String destination = arg.toString();
        localTrans.put(id,destination);
        org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
        String tags = message.getTags();
        if(StringUtils.contains(tags,"TagA")){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagB")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        //SpringBoot的消息對(duì)象中,并沒有transactionId這個(gè)屬性。跟原生API不一樣。
//        String destination = localTrans.get(msg.getTransactionId());
        return RocketMQLocalTransactionState.COMMIT;
    }
}

這樣我們啟動(dòng)應(yīng)用后,就能夠通過訪問 http://localhost:8080/MQTest/sendMessage?message=123 接口來發(fā)送一條簡單消息。并在SpringConsumer中消費(fèi)到。

也可以通過訪問http://localhost:8080/MQTest/sendTransactionMessage?message=123 ,來發(fā)送一條事務(wù)消息。

這里可以看到,對(duì)事務(wù)消息,SpringBoot進(jìn)行封裝時(shí),就缺少了transactionId,這在事務(wù)控制中是非常關(guān)鍵的。

2、其他更多消息類型:

對(duì)于其他的消息類型,文檔中就不一一記錄了。具體可以參見源碼中的junit測(cè)試案例。

3、總結(jié):

  • SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依賴后,就可以通過內(nèi)置的RocketMQTemplate來與RocketMQ交互。相關(guān)屬性都以rockemq.開頭。具體所有的配置信息可以參見org.apache.rocketmq.spring.autoconfigure.RocketMQProperties這個(gè)類。
  • SpringBoot依賴中的Message對(duì)象和RocketMQ-client中的Message對(duì)象是兩個(gè)不同的對(duì)象,這在使用的時(shí)候要非常容易弄錯(cuò)。例如RocketMQ-client中的Message里的TAG屬性,在SpringBoot依賴中的Message中就沒有。Tag屬性被移到了發(fā)送目標(biāo)中,與Topic一起,以Topic:Tag的方式指定。
  • 最后強(qiáng)調(diào)一次,一定要注意版本。rocketmq-spring-boot-starter的更新進(jìn)度一般都會(huì)略慢于RocketMQ的版本更新,并且版本不同會(huì)引發(fā)很多奇怪的問題。apache有一個(gè)官方的rocketmq-spring示例,地址:https://github.com/apache/rocketmq-spring.git 以后如果版本更新了,可以參考下這個(gè)示例代碼。

三、SpringCloudStream整合RocketMQ

SpringCloudStream是Spring社區(qū)提供的一個(gè)統(tǒng)一的消息驅(qū)動(dòng)框架,目的是想要以一個(gè)統(tǒng)一的編程模型來對(duì)接所有的MQ消息中間件產(chǎn)品。我們還是來看看SpringCloudStream如何來集成RocketMQ。

1、快速實(shí)戰(zhàn)

創(chuàng)建Maven工程,引入依賴:

<dependencies>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.7.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-acl</artifactId>
      <version>4.7.1</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
      <version>2.2.3.RELEASE</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-acl</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      <version>2.3.3.RELEASE</version>
    </dependency>
  </dependencies>

應(yīng)用啟動(dòng)類:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScRocketMQApplication.class,args);
    }
}

注意這個(gè)@EnableBinding({Source.class, Sink.class})注解,這是SpringCloudStream引入的Binder配置。

然后增加配置文件application.properties

#ScStream通用的配置以spring.cloud.stream開頭
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic
#rocketMQ的個(gè)性化配置以spring.cloud.stream.rocketmq開頭
#spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876
spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876

SpringCloudStream中,一個(gè)binding對(duì)應(yīng)一個(gè)消息通道。這其中配置的input,是在Sink.class中定義的,對(duì)應(yīng)一個(gè)消息消費(fèi)者。而output,是在Source.class中定義的,對(duì)應(yīng)一個(gè)消息生產(chǎn)者。

然后就可以增加消息消費(fèi)者:

package com.roy.scrocket.basic;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
public class ScConsumer {

    @StreamListener(Sink.INPUT)
    public void onMessage(String messsage){
        System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);
    }
}

消息生產(chǎn)者:

package com.roy.scrocket.basic;

import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
public class ScProducer {

    @Resource
    private Source source;

    public void sendMessage(String msg){
        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "testTag");
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
        this.source.output().send(message);
    }
}

最后增加一個(gè)Controller類用于測(cè)試:

package com.roy.scrocket.controller;

import com.roy.scrocket.basic.ScProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author :
 * @date :Created in 2020/10/27
 * @description:
 **/
@RestController
@RequestMapping("/MQTest")
public class MQTestController {

    @Resource
    private ScProducer producer;
    @RequestMapping("/sendMessage")
    public String sendMessage(String message){
        producer.sendMessage(message);
        return "消息發(fā)送完成";
    }
}

啟動(dòng)應(yīng)用后,就可以訪問http://localhost:8080/MQTest/sendMessage?message=123,給RocketMQ發(fā)送一條消息到TestTopic,并在ScConsumer中消費(fèi)到了。文章來源地址http://www.zghlxwxcb.cn/news/detail-635443.html

2、總結(jié)

  • 關(guān)于SpringCloudStream。這是一套幾乎通用的消息中間件編程框架,例如從對(duì)接RocketMQ換到對(duì)接Kafka,業(yè)務(wù)代碼幾乎不需要?jiǎng)?,只需要更換pom依賴并且修改配置文件就行了。但是,由于各個(gè)MQ產(chǎn)品都有自己的業(yè)務(wù)模型,差距非常大,所以使用使用SpringCloudStream時(shí)要注意業(yè)務(wù)模型轉(zhuǎn)換。并且在實(shí)際使用中,要非常注意各個(gè)MQ的個(gè)性化配置屬性。例如RocketMQ的個(gè)性化屬性都是以spring.cloud.stream.rocketmq開頭,只有通過這些屬性才能用上RocketMQ的延遲消息、排序消息、事務(wù)消息等個(gè)性化功能。
  • SpringCloudStream是Spring社區(qū)提供的一套統(tǒng)一框架,但是官方目前只封裝了kafka、kafka Stream、RabbitMQ的具體依賴。而RocketMQ的依賴是交由廠商自己維護(hù)的, 也就是由阿里巴巴自己來維護(hù)。這個(gè)維護(hù)力度顯然是有不小差距的。所以一方面可以看到之前在使用SpringBoot時(shí)著重強(qiáng)調(diào)的版本問題,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本還是4.4.0。這個(gè)差距就非常大了。另一方面,RocketMQ這幫大神不屑于寫文檔的問題也特別嚴(yán)重,SpringCloudStream中關(guān)于RocketMQ的個(gè)性化配置幾乎很難找到完整的文檔。
  • 總之,對(duì)于RocketMQ來說SpringCloudStream目前來說還并不是一個(gè)非常好的集成方案。這方面跟kafka和Rabbit還沒法比。所以使用時(shí)要慎重。

到了這里,關(guān)于rocketMq消息隊(duì)列詳細(xì)使用與實(shí)踐整合spring的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 消息隊(duì)列RocketMQ、Kafka小計(jì)

    消息隊(duì)列RocketMQ、Kafka小計(jì)

    點(diǎn)對(duì)點(diǎn)模式 (一對(duì)一,消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后消息清除)點(diǎn)對(duì)點(diǎn)模型通常是一個(gè)基于拉取或者輪詢的消息傳送模型,這種模型從隊(duì)列中請(qǐng)求信息,而不是將消息推送到客戶端。這個(gè)模型的特點(diǎn)是發(fā)送到隊(duì)列的消息被一個(gè)且只有一個(gè)接收者接收處理,即使有多個(gè)消息

    2023年04月22日
    瀏覽(12)
  • rocketMQ消息隊(duì)列簡介及其實(shí)例

    rocketMQ消息隊(duì)列簡介及其實(shí)例

    ?RocketMQ優(yōu)點(diǎn): 單機(jī)吞吐量:十萬級(jí) 可用性:非常高,分布式架構(gòu) 消息可靠性:經(jīng)過參數(shù)優(yōu)化配置,消息可以做到0丟失 功能支持:MQ功能較為完善,還是分布式的,擴(kuò)展性好 支持10億級(jí)別的消息堆積,不會(huì)因?yàn)槎逊e導(dǎo)致性能下降 缺點(diǎn):兼容性差點(diǎn) 一、RocketMQ 核心的四大組件

    2024年02月08日
    瀏覽(18)
  • 消息隊(duì)列中間件 MetaQ/RocketMQ

    消息隊(duì)列中間件 MetaQ/RocketMQ

    推薦電子書:云原生架構(gòu)白皮書 2022版-藏經(jīng)閣-阿里云開發(fā)者社區(qū) (aliyun.com) 簡介—— 消息隊(duì)列中間件 MetaQ/RocketMQ 中間件 MetaQ 是一種基于隊(duì)列模型的消息中間件,MetaQ 據(jù)說最早是受 Kafka 的影響開發(fā)的,第一版的名字?\\\"metamorphosis\\\",是奧地利作家卡夫卡的名作——《變形記》。

    2024年02月14日
    瀏覽(96)
  • 分布式消息隊(duì)列RocketMQ概念詳解

    分布式消息隊(duì)列RocketMQ概念詳解

    目錄 1.MQ概述 1.1 RocketMQ簡介 1.2 MQ用途 1.3 常見MQ產(chǎn)品 2.RocketMQ 基本概念 2.1 消息 2.2 主題 2.3 標(biāo)簽 2.4 隊(duì)列 ?2.5 Producer 2.6 Consumer 2.7 NameServer 2.8 Broker 2.9 RocketMQ 工作流程 ? RocketMQ 是阿里開源的分布式消息中間件,跟其它中間件相比,RocketMQ 的特點(diǎn)是純JAVA實(shí)現(xiàn),是一套提供了消息

    2024年02月03日
    瀏覽(27)
  • 【面試需了解之消息隊(duì)列】RocketMQ、kafka、RabbitMQ概述

    消息隊(duì)列說明:RocketMQ、kafka、RabbitMQ概述及關(guān)鍵概念 概述 消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量控制等問題。實(shí)現(xiàn)高性能、高可用、可伸縮和最終一致性架構(gòu),是大型分布式系統(tǒng)不可缺少的中間件 作用 異構(gòu)系統(tǒng)消息傳遞:上游系統(tǒng)

    2024年02月10日
    瀏覽(62)
  • RabbitMQ與RocketMQ:消息隊(duì)列的兩大強(qiáng)者對(duì)比

    在現(xiàn)代分布式系統(tǒng)中,消息隊(duì)列已成為不可或缺的一部分,它們幫助我們?cè)诓煌姆?wù)之間實(shí)現(xiàn)異步通信、解耦和流量削峰。在眾多消息隊(duì)列中間件中,RabbitMQ和RocketMQ是兩個(gè)備受矚目的選項(xiàng)。本文將對(duì)它們進(jìn)行深入對(duì)比,幫助大家根據(jù)實(shí)際需求選擇合適的消息隊(duì)列中間件。

    2024年04月28日
    瀏覽(22)
  • mq 消息隊(duì)列 mqtt emqx ActiveMQ RabbitMQ RocketMQ

    十幾年前,淘寶的notify,借鑒ActiveMQ。京東的ActiveMQ集群幾百臺(tái),后面改成JMQ。 Linkedin的kafka,因?yàn)槭莝cala,國內(nèi)很多人不熟。淘寶的人把kafka用java寫了一遍,取名metaq,后來再改名RocketMQ。 總的來說,三大原因,語言、潮流、生態(tài)。 MQ這種東西,當(dāng)你的消息量不大的時(shí)候,用啥

    2024年02月12日
    瀏覽(17)
  • 解析RocketMQ:高性能分布式消息隊(duì)列的原理與應(yīng)用

    什么是消息隊(duì)列 消息隊(duì)列是一種消息傳遞機(jī)制,用于在應(yīng)用程序和系統(tǒng)之間傳遞消息,實(shí)現(xiàn)解耦和異步通信。它通過將消息發(fā)送到一個(gè)中間代理(消息隊(duì)列),然后由消費(fèi)者從該隊(duì)列中獲取消息并處理。 RocketMQ簡介 RocketMQ是阿里巴巴開源的一款高性能分布式消息隊(duì)列系統(tǒng)。它

    2024年02月14日
    瀏覽(27)
  • SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的發(fā)送方:生產(chǎn)者 消息的接收方:消費(fèi)者 同步消息:發(fā)送方發(fā)送消息到接收方,接收方有所回應(yīng)后才能夠進(jìn)行下一次的消息發(fā)送 異步消息:不需要接收方回應(yīng)就可以進(jìn)行下一步的發(fā)送 什么是消息隊(duì)列? 當(dāng)此時(shí)有很多個(gè)用戶同時(shí)訪問服務(wù)器,需要服務(wù)器進(jìn)行操作,但此

    2024年04月27日
    瀏覽(53)
  • Sprint Cloud Stream整合RocketMq和websocket實(shí)現(xiàn)消息發(fā)布訂閱

    Sprint Cloud Stream整合RocketMq和websocket實(shí)現(xiàn)消息發(fā)布訂閱

    1. 引入RocketMQ依賴 :首先,在 pom.xml 文件中添加RocketMQ的依賴: 2. 配置RocketMQ連接信息 :在 application.properties 或 application.yml 中配置RocketMQ的連接信息,包括Name Server地址等: 3.消息發(fā)布組件 4.消息發(fā)布控制器 項(xiàng)目結(jié)構(gòu): 接下來是websocket模塊的搭建 1. 依賴添加 2.application.yml配

    2024年02月08日
    瀏覽(15)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包