rocketMq消息隊(duì)列
一、RocketMQ原生API使用
使用RocketMQ的原生API開發(fā)是最簡單也是目前看來最牢靠的方式。這里用SpringBoot來搭建一系列消息生產(chǎn)者和消息消費(fèi)者,來訪問之前搭建的RocketMQ集群。
1、測試環(huán)境搭建
首先創(chuàng)建一個(gè)基于Maven的SpringBoot工程,引入如下依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
RocketMQ的官網(wǎng)上有很多經(jīng)典的測試代碼,這些代碼雖然依賴的版本比較老,但是還是都可以運(yùn)行的。以官網(wǎng)上的順序進(jìn)行學(xué)習(xí)。
2、RocketMQ的編程模型
然后RocketMQ的生產(chǎn)者和消費(fèi)者的編程模型都是有個(gè)比較固定的步驟的,掌握這個(gè)固定的步驟,對于我們學(xué)習(xí)源碼以及以后使用都是很有幫助的。
-
消息發(fā)送者的固定步驟
1.創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
2.指定Nameserver地址
3.啟動producer
4.創(chuàng)建消息對象,指定主題Topic、Tag和消息體
5.發(fā)送消息
6.關(guān)閉生產(chǎn)者producer
-
消息消費(fèi)者的固定步驟
1.創(chuàng)建消費(fèi)者Consumer,制定消費(fèi)者組名
2.指定Nameserver地址
3.訂閱主題Topic和Tag
4.設(shè)置回調(diào)函數(shù),處理消息
5.啟動消費(fèi)者consumer
3、RocketMQ的消息樣例
RocketMQ都支持類型的消息:
3.1 基本樣例
同步發(fā)送:能夠?qū)崟r(shí)知道消息是否從生產(chǎn)者推送到broke
異步發(fā)送:能夠異步回調(diào)的方式知道消息是否從生產(chǎn)者推送的broke
單向發(fā)送:消息發(fā)送出去就不管了,無法感知消息是否從生產(chǎn)者推送到broke的狀態(tài)
基本樣例部分我們使用消息生產(chǎn)者分別通過三種方式發(fā)送消息,同步發(fā)送、異步發(fā)送以及單向發(fā)送。
然后使用消費(fèi)者來消費(fèi)這些消息。
1、同步發(fā)送消息的樣例見:org.apache.rocketmq.example.simple.Producer
2、異步發(fā)送消息的樣例見:org.apache.rocketmq.example.simple.AsyncProducer
等待消息返回后再繼續(xù)進(jìn)行下面的操作。
這個(gè)示例有個(gè)比較有趣的地方就是引入了一個(gè)countDownLatch來保證所有消息回調(diào)方法都執(zhí)行完了再關(guān)閉Producer。 所以從這里可以看出,RocketMQ的Producer也是一個(gè)服務(wù)端,在往Broker發(fā)送消息的時(shí)候也要作為服務(wù)端提供服務(wù)。
3、單向發(fā)送消息的樣例:
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
}
}
關(guān)鍵點(diǎn)就是使用producer.sendOneWay方式來發(fā)送消息,這個(gè)方法沒有返回值,也沒有回調(diào)。就是只管把消息發(fā)出去就行了。
4、使用消費(fèi)者消費(fèi)消息。
消費(fèi)者消費(fèi)消息有兩種模式,
一種是消費(fèi)者主動去Broker上拉取消息的拉模式,
另一種是消費(fèi)者等待Broker把消息推送過來的推模式。
拉模式的樣例見:org.apache.rocketmq.example.simple.PullConsumer
推模式的樣例見:org.apache.rocketmq.example.simple.PushConsumer
通常情況下,用推模式比較簡單。
實(shí)際上RocketMQ的推模式也是由拉模式封裝出來的。
4.7.1版本中DefaultMQPullConsumerImpl這個(gè)消費(fèi)者類已標(biāo)記為過期,但是還是可以使用的。替換的類是DefaultLitePullConsumerImpl。
3.2 順序消息
順序消息生產(chǎn)者樣例見:org.apache.rocketmq.example.order.Producer
順序消息消費(fèi)者樣例見:org.apache.rocketmq.example.order.Consumer
驗(yàn)證時(shí),可以啟動多個(gè)Consumer實(shí)例,觀察下每一個(gè)訂單的消息分配以及每個(gè)訂單下多個(gè)步驟的消費(fèi)順序。
不管訂單在多個(gè)Consumer實(shí)例之前是如何分配的,每個(gè)訂單下的多條消息順序都是固定從0~5的。RocketMQ保證的是消息的局部有序,而不是全局有序。
mqs是什么(一個(gè)topic中對應(yīng)的多個(gè)隊(duì)列)。
再回看我們的樣例,實(shí)際上,RocketMQ也只保證了每個(gè)OrderID的所有消息有序(發(fā)到了同一個(gè)queue),而并不能保證所有消息都有序。所以這就涉及到了RocketMQ消息有序的原理。要保證最終消費(fèi)到的消息是有序的,需要從Producer、Broker、Consumer三個(gè)步驟都保證消息有序才行。首先在發(fā)送者端:在默認(rèn)情況下,消息發(fā)送者會采取Round Robin輪詢方式把消息發(fā)送到不同的MessageQueue(分區(qū)隊(duì)列),而消費(fèi)者消費(fèi)的時(shí)候也從多個(gè)MessageQueue上拉取消息,這種情況下消息是不能保證順序的。而只有當(dāng)一組有序的消息發(fā)送到同一個(gè)MessageQueue上時(shí),才能利用MessageQueue先進(jìn)先出的特性保證這一組消息有序。
而Broker中一個(gè)隊(duì)列內(nèi)的消息是可以保證有序的。
然后在消費(fèi)者端:消費(fèi)者會從多個(gè)消息隊(duì)列上去拿消息。這時(shí)雖然每個(gè)消息隊(duì)列上的消息是有序的,但是多個(gè)隊(duì)列之間的消息仍然是亂序的。消費(fèi)者端要保證消息有序,就需要按隊(duì)列一個(gè)一個(gè)來取消息,即取完一個(gè)隊(duì)列的消息后,再去取下一個(gè)隊(duì)列的消息。而給consumer注入的MessageListenerOrderly對象,在RocketMQ內(nèi)部就會通過鎖隊(duì)列的方式保證消息是一個(gè)一個(gè)隊(duì)列來取的。MessageListenerConcurrently這個(gè)消息監(jiān)聽器則不會鎖隊(duì)列,每次都是從多個(gè)Message中取一批數(shù)據(jù)(默認(rèn)不超過32條)。因此也無法保證消息有序。
3.3 廣播消息
廣播消息的消息生產(chǎn)者樣例見:org.apache.rocketmq.example.broadcast.PushConsumer
廣播消息并沒有特定的消息消費(fèi)者樣例,這是因?yàn)檫@涉及到消費(fèi)者的集群消費(fèi)模式。在集群狀態(tài)(MessageModel.CLUSTERING)下,每一條消息只會被同一個(gè)消費(fèi)者組中的一個(gè)實(shí)例消費(fèi)到(這跟kafka和rabbitMQ的集群模式是一樣的)。而廣播模式則是把消息發(fā)給了所有訂閱了對應(yīng)主題的消費(fèi)者,而不管消費(fèi)者是不是同一個(gè)消費(fèi)者組。
3.4 延遲消息
延遲消息的生產(chǎn)者案例
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
延遲消息實(shí)現(xiàn)的效果就是在調(diào)用producer.send方法后,消息并不會立即發(fā)送出去,而是會等一段時(shí)間再發(fā)送出去。這是RocketMQ特有的一個(gè)功能。
那會延遲多久呢?延遲時(shí)間的設(shè)置就是在Message消息對象上設(shè)置一個(gè)延遲級別message.setDelayTimeLevel(3);
開源版本的RocketMQ中,對延遲消息并不支持任意時(shí)間的延遲設(shè)定(商業(yè)版本中支持),而是只支持18個(gè)固定的延遲級別,1到18分別對應(yīng)messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。這從哪里看出來的?其實(shí)從rocketmq-console控制臺就能看出來。而這18個(gè)延遲級別也支持自行定義,不過一般情況下最好不要自定義修改。
那這么好用的延遲消息是怎么實(shí)現(xiàn)的?這18個(gè)延遲級別除了在延遲消息中用,還有什么地方用到了?別急,我們會在后面部分進(jìn)行詳細(xì)講解。
3.5 批量消息
批量消息是指將多條消息合并成一個(gè)批量消息,一次發(fā)送出去。這樣的好處是可以減少網(wǎng)絡(luò)IO,提升吞吐量。
批量消息的消息生產(chǎn)者樣例見:org.apache.rocketmq.example.batch.SimpleBatchProducer和org.apache.rocketmq.example.batch.SplitBatchProducer
相信大家在官網(wǎng)以及測試代碼中都看到了關(guān)鍵的注釋:如果批量消息大于1MB就不要用一個(gè)批次發(fā)送,而要拆分成多個(gè)批次消息發(fā)送。也就是說,一個(gè)批次消息的大小不要超過1MB
實(shí)際使用時(shí),這個(gè)1MB的限制可以稍微擴(kuò)大點(diǎn),實(shí)際最大的限制是4194304字節(jié),大概4MB。但是使用批量消息時(shí),這個(gè)消息長度確實(shí)是必須考慮的一個(gè)問題。而且批量消息的使用是有一定限制的,這些消息應(yīng)該有相同的Topic,相同的waitStoreMsgOK。而且不能是延遲消息、事務(wù)消息等。
3.6 過濾消息
在大多數(shù)情況下,可以使用Message的Tag屬性來簡單快速的過濾信息。
使用Tag過濾消息的消息生產(chǎn)者案例見:org.apache.rocketmq.example.filter.TagFilterProducer
使用Tag過濾消息的消息消費(fèi)者案例見:org.apache.rocketmq.example.filter.TagFilterConsumer
主要是看消息消費(fèi)者。consumer.subscribe(“TagFilterTest”, “TagA || TagC”); 這句只訂閱TagA和TagC的消息。
TAG是RocketMQ中特有的一個(gè)消息屬性。RocketMQ的最佳實(shí)踐中就建議,使用RocketMQ時(shí),一個(gè)應(yīng)用可以就用一個(gè)Topic,而應(yīng)用中的不同業(yè)務(wù)就用TAG來區(qū)分。
但是,這種方式有一個(gè)很大的限制,就是一個(gè)消息只能有一個(gè)TAG,這在一些比較復(fù)雜的場景就有點(diǎn)不足了。 這時(shí)候,可以使用SQL表達(dá)式來對消息進(jìn)行過濾。
SQL過濾的消息生產(chǎn)者案例見:org.apache.rocketmq.example.filter.SqlFilterProducer
SQL過濾的消息消費(fèi)者案例見:org.apache.rocketmq.example.filter.SqlFilterConsumer
,>=,<,<=,BETWEEN,=;**
* 字符比較,比如:=,<>,IN;
* IS NULL 或者 IS NOT NULL;
* 邏輯符號 AND,OR,NOT;常量支持類型為:
* 數(shù)值,比如:123,3.1415;
* 字符,比如:‘a(chǎn)bc’,必須用單引號包裹起來;
* NULL,特殊的常量
* 布爾值,TRUE 或 FALSE使用注意:只有推模式的消費(fèi)者可以使用SQL過濾。拉模式是用不了的。
大家想一下,這個(gè)消息過濾是在Broker端進(jìn)行的還是在Consumer端進(jìn)行的?
3.7 事務(wù)消息
這個(gè)事務(wù)消息是RocketMQ提供的一個(gè)非常有特色的功能,需要著重理解。
首先,我們了解下什么是事務(wù)消息。官網(wǎng)的介紹是:事務(wù)消息是在分布式系統(tǒng)中保證最終一致性的兩階段提交的消息實(shí)現(xiàn)。他可以保證本地事務(wù)執(zhí)行與消息發(fā)送兩個(gè)操作的原子性,也就是這兩個(gè)操作一起成功或者一起失敗。
其次,我們來理解下事務(wù)消息的編程模型。事務(wù)消息只保證消息發(fā)送者的本地事務(wù)與發(fā)消息這兩個(gè)操作的原子性,因此,事務(wù)消息的示例只涉及到消息發(fā)送者,對于消息消費(fèi)者來說,并沒有什么特別的。
事務(wù)消息生產(chǎn)者的案例見:org.apache.rocketmq.example.transaction.TransactionProducer
事務(wù)消息的關(guān)鍵是在TransactionMQProducer中指定了一個(gè)TransactionListener事務(wù)監(jiān)聽器,這個(gè)事務(wù)監(jiān)聽器就是事務(wù)消息的關(guān)鍵控制器。源碼中的案例有點(diǎn)復(fù)雜,我這里準(zhǔn)備了一個(gè)更清晰明了的事務(wù)監(jiān)聽器示例
public class TransactionListenerImpl implements TransactionListener {
//在提交完事務(wù)消息后執(zhí)行。
//返回COMMIT_MESSAGE狀態(tài)的消息會立即被消費(fèi)者消費(fèi)到。
//返回ROLLBACK_MESSAGE狀態(tài)的消息會被丟棄。
//返回UNKNOWN狀態(tài)的消息會由Broker過一段時(shí)間再來回查事務(wù)的狀態(tài)。
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
//TagA的消息會立即被消費(fèi)者消費(fèi)到
if(StringUtils.contains(tags,"TagA")){
return LocalTransactionState.COMMIT_MESSAGE;
//TagB的消息會被丟棄
}else if(StringUtils.contains(tags,"TagB")){
return LocalTransactionState.ROLLBACK_MESSAGE;
//其他消息會等待Broker進(jìn)行事務(wù)狀態(tài)回查。
}else{
return LocalTransactionState.UNKNOW;
}
}
//在對UNKNOWN狀態(tài)的消息進(jìn)行狀態(tài)回查時(shí)執(zhí)行。返回的結(jié)果是一樣的。
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
//TagC的消息過一段時(shí)間會被消費(fèi)者消費(fèi)到
if(StringUtils.contains(tags,"TagC")){
return LocalTransactionState.COMMIT_MESSAGE;
//TagD的消息也會在狀態(tài)回查時(shí)被丟棄掉
}else if(StringUtils.contains(tags,"TagD")){
return LocalTransactionState.ROLLBACK_MESSAGE;
//剩下TagE的消息會在多次狀態(tài)回查后最終丟棄
}else{
return LocalTransactionState.UNKNOW;
}
}
}
然后,我們要了解下事務(wù)消息的使用限制:
1、事務(wù)消息不支持延遲消息和批量消息。
2、為了避免單個(gè)消息被檢查太多次而導(dǎo)致半隊(duì)列消息累積,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為 15 次,但是用戶可以通過 Broker 配置文件的 transactionCheckMax
參數(shù)來修改此限制。如果已經(jīng)檢查某條消息超過 N 次的話( N = transactionCheckMax
) 則 Broker 將丟棄此消息,并在默認(rèn)情況下同時(shí)打印錯誤日志。用戶可以通過重寫 AbstractTransactionCheckListener
類來修改這個(gè)行為。
回查次數(shù)是由BrokerConfig.transactionCheckMax這個(gè)參數(shù)來配置的,默認(rèn)15次,可以在broker.conf中覆蓋。
然后實(shí)際的檢查次數(shù)會在message中保存一個(gè)用戶屬性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES。這個(gè)屬性值大于transactionCheckMax,就會丟棄。 這個(gè)用戶屬性值會按回查次數(shù)遞增,也可以在Producer中自行覆蓋這個(gè)屬性。
? 3、事務(wù)消息將在 Broker 配置文件中的參數(shù) transactionMsgTimeout 這樣的特定時(shí)間長度之后被檢查。當(dāng)發(fā)送事務(wù)消息時(shí),用戶還可以通過設(shè)置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個(gè)限制,該參數(shù)優(yōu)先于 transactionMsgTimeout
參數(shù)。
由BrokerConfig.transactionTimeOut這個(gè)參數(shù)來配置。默認(rèn)6秒,可以在broker.conf中進(jìn)行修改。
另外,也可以給消息配置一個(gè)MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS屬性來給消息指定一個(gè)特定的消息回查時(shí)間。
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”); 這樣就是10秒。
4、事務(wù)性消息可能不止一次被檢查或消費(fèi)。
5、提交給用戶的目標(biāo)主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機(jī)制來保證,如果希望確保事務(wù)消息不丟失、并且事務(wù)完整性得到保證,建議使用同步的雙重寫入機(jī)制。
6、事務(wù)消息的生產(chǎn)者 ID 不能與其他類型消息的生產(chǎn)者 ID 共享。與其他類型的消息不同,事務(wù)消息允許反向查詢、MQ服務(wù)器能通過它們的生產(chǎn)者 ID 查詢到消費(fèi)者。
接下來,我們還要了解下事務(wù)消息的實(shí)現(xiàn)機(jī)制,參見下圖:
事務(wù)消息機(jī)制的關(guān)鍵是在發(fā)送消息時(shí),會將消息轉(zhuǎn)為一個(gè)half半消息,并存入RocketMQ內(nèi)部的一個(gè) RMQ_SYS_TRANS_HALF_TOPIC 這個(gè)Topic,這樣對消費(fèi)者是不可見的。再經(jīng)過一系列事務(wù)檢查通過后,再將消息轉(zhuǎn)存到目標(biāo)Topic,這樣對消費(fèi)者就可見了。
最后,我們還需要思考下事務(wù)消息的作用。
大家想一下這個(gè)事務(wù)消息跟分布式事務(wù)有什么關(guān)系?為什么扯到了分布式事務(wù)相關(guān)的兩階段提交上了?事務(wù)消息只保證了發(fā)送者本地事務(wù)和發(fā)送消息這兩個(gè)操作的原子性,但是并不保證消費(fèi)者本地事務(wù)的原子性,所以,事務(wù)消息只保證了分布式事務(wù)的一半。但是即使這樣,對于復(fù)雜的分布式事務(wù),RocketMQ提供的事務(wù)消息也是目前業(yè)內(nèi)最佳的降級方案。
3.8 ACL權(quán)限控制
權(quán)限控制(ACL)主要為RocketMQ提供Topic資源級別的用戶訪問控制。用戶在使用RocketMQ權(quán)限控制時(shí),可以在Client客戶端通過 RPCHook注入AccessKey和SecretKey簽名;同時(shí),將對應(yīng)的權(quán)限控制屬性(包括Topic訪問權(quán)限、IP白名單和AccessKey和SecretKey簽名等)設(shè)置在$ROCKETMQ_HOME/conf/plain_acl.yml的配置文件中。Broker端對AccessKey所擁有的權(quán)限進(jìn)行校驗(yàn),校驗(yàn)不過,拋出異常; ACL客戶端可以參考:org.apache.rocketmq.example.simple包下面的AclClient代碼。
注意,如果要在自己的客戶端中使用RocketMQ的ACL功能,還需要引入一個(gè)單獨(dú)的依賴包
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1</version>
</dependency>
? 而Broker端具體的配置信息可以參見源碼包下docs/cn/acl/user_guide.md。主要是在broker.conf中打開acl的標(biāo)志:aclEnable=true。然后就可以用plain_acl.yml來進(jìn)行權(quán)限配置了。并且這個(gè)配置文件是熱加載的,也就是說要修改配置時(shí),只要修改配置文件就可以了,不用重啟Broker服務(wù)。我們來簡單分析下源碼中的plan_acl.yml的配置:
#全局白名單,不受ACL控制
#通常需要將主從架構(gòu)中的所有節(jié)點(diǎn)加進(jìn)來
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
accounts:
#第一個(gè)賬戶
- accessKey: 自定義1
secretKey: 自定義1
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY #默認(rèn)Topic訪問策略是拒絕
defaultGroupPerm: SUB #默認(rèn)Group訪問策略是只允許訂閱
topicPerms:
- topicA=DENY #topicA拒絕
- topicB=PUB|SUB #topicB允許發(fā)布和訂閱消息
- topicC=SUB #topicC只允許訂閱
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
#第二個(gè)賬戶,只要是來自192.168.1.*的IP,就可以訪問所有資源
- accessKey: 自定義2
secretKey: 自定義2
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
二、SpringBoot整合RocketMQ
1、快速實(shí)戰(zhàn)
這部分我們看下SpringBoot如何快速集成RocketMQ。
在使用SpringBoot的starter集成包時(shí),要特別注意版本。因?yàn)镾pringBoot集成RocketMQ的starter依賴是由Spring社區(qū)提供的,目前正在快速迭代的過程當(dāng)中,不同版本之間的差距非常大,甚至基礎(chǔ)的底層對象都會經(jīng)常有改動。例如如果使用rocketmq-spring-boot-starter:2.0.4版本開發(fā)的代碼,升級到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。
我們創(chuàng)建一個(gè)maven工程,引入關(guān)鍵依賴:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,這里把SpringBoot的依賴包升級了一下。
然后我們以SpringBoot的方式,快速創(chuàng)建一個(gè)簡單的Demo
啟動類:
@SpringBootApplication
public class RocketMQScApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQScApplication.class,args);
}
}
配置文件 application.properties
#NameServer地址
rocketmq.name-server=192.168.232.128:9876
#默認(rèn)的消息生產(chǎn)者組
rocketmq.producer.group=springBootGroup
消息生產(chǎn)者
package com.roy.rocket.basic;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/**
* @author :
* @date :Created in 2020/10/22
* @description:
**/
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
//發(fā)送普通消息的示例
public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg);
}
//發(fā)送事務(wù)消息的示例
public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message<String> message = MessageBuilder.withPayload(msg).build();
String destination =topic+":"+tags[i % tags.length];
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
}
}
消息消費(fèi)者
package com.roy.rocket.basic;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author :
* @date :Created in 2020/10/22
* @description:
**/
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : "+ message);
}
}
SpringBoot集成RocketMQ,消費(fèi)者部分的核心就在這個(gè)@RocketMQMessageListener注解上。所有消費(fèi)者的核心功能也都會集成到這個(gè)注解中。所以我們還要注意下這個(gè)注解里面的屬性:
例如:消息過濾可以由里面的selectorType屬性和selectorExpression來定制
消息有序消費(fèi)還是并發(fā)消費(fèi)則由consumeMode屬性定制。
消費(fèi)者是集群部署還是廣播部署由messageModel屬性定制。
然后關(guān)于事務(wù)消息,還需要配置一個(gè)事務(wù)消息監(jiān)聽器:
package com.roy.rocket.config;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author :
* @date :Created in 2020/11/5
* @description:
**/
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object id = msg.getHeaders().get("id");
String destination = arg.toString();
localTrans.put(id,destination);
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
String tags = message.getTags();
if(StringUtils.contains(tags,"TagA")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagB")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
//SpringBoot的消息對象中,并沒有transactionId這個(gè)屬性。跟原生API不一樣。
// String destination = localTrans.get(msg.getTransactionId());
return RocketMQLocalTransactionState.COMMIT;
}
}
這樣我們啟動應(yīng)用后,就能夠通過訪問 http://localhost:8080/MQTest/sendMessage?message=123 接口來發(fā)送一條簡單消息。并在SpringConsumer中消費(fèi)到。
也可以通過訪問http://localhost:8080/MQTest/sendTransactionMessage?message=123 ,來發(fā)送一條事務(wù)消息。
這里可以看到,對事務(wù)消息,SpringBoot進(jìn)行封裝時(shí),就缺少了transactionId,這在事務(wù)控制中是非常關(guān)鍵的。
2、其他更多消息類型:
對于其他的消息類型,文檔中就不一一記錄了。具體可以參見源碼中的junit測試案例。
3、總結(jié):
- SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依賴后,就可以通過內(nèi)置的RocketMQTemplate來與RocketMQ交互。相關(guān)屬性都以rockemq.開頭。具體所有的配置信息可以參見org.apache.rocketmq.spring.autoconfigure.RocketMQProperties這個(gè)類。
- SpringBoot依賴中的Message對象和RocketMQ-client中的Message對象是兩個(gè)不同的對象,這在使用的時(shí)候要非常容易弄錯。例如RocketMQ-client中的Message里的TAG屬性,在SpringBoot依賴中的Message中就沒有。Tag屬性被移到了發(fā)送目標(biāo)中,與Topic一起,以Topic:Tag的方式指定。
- 最后強(qiáng)調(diào)一次,一定要注意版本。rocketmq-spring-boot-starter的更新進(jìn)度一般都會略慢于RocketMQ的版本更新,并且版本不同會引發(fā)很多奇怪的問題。apache有一個(gè)官方的rocketmq-spring示例,rocketmq-spring.git 以后如果版本更新了,可以參考下這個(gè)示例代碼。
三、SpringCloudStream整合RocketMQ
SpringCloudStream是Spring社區(qū)提供的一個(gè)統(tǒng)一的消息驅(qū)動框架,目的是想要以一個(gè)統(tǒng)一的編程模型來對接所有的MQ消息中間件產(chǎn)品。我們還是來看看SpringCloudStream如何來集成RocketMQ。
1、快速實(shí)戰(zhàn)
創(chuàng)建Maven工程,引入依賴:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2.2.3.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>
</dependencies>
應(yīng)用啟動類:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
/**
* @author :
* @date :Created in 2020/10/22
* @description:
**/
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {
public static void main(String[] args) {
SpringApplication.run(ScRocketMQApplication.class,args);
}
}
注意這個(gè)@EnableBinding({Source.class, Sink.class})注解,這是SpringCloudStream引入的Binder配置。
然后增加配置文件application.properties
#ScStream通用的配置以spring.cloud.stream開頭
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic
#rocketMQ的個(gè)性化配置以spring.cloud.stream.rocketmq開頭
#spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876
spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876
SpringCloudStream中,一個(gè)binding對應(yīng)一個(gè)消息通道。這其中配置的input,是在Sink.class中定義的,對應(yīng)一個(gè)消息消費(fèi)者。而output,是在Source.class中定義的,對應(yīng)一個(gè)消息生產(chǎn)者。
然后就可以增加消息消費(fèi)者:
package com.roy.scrocket.basic;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
/**
* @author :
* @date :Created in 2020/10/22
* @description:
**/
@Component
public class ScConsumer {
@StreamListener(Sink.INPUT)
public void onMessage(String messsage){
System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);
}
}
消息生產(chǎn)者:
package com.roy.scrocket.basic;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* @author :
* @date :Created in 2020/10/22
* @description:
**/
@Component
public class ScProducer {
@Resource
private Source source;
public void sendMessage(String msg){
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, "testTag");
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
this.source.output().send(message);
}
}
最后增加一個(gè)Controller類用于測試:文章來源:http://www.zghlxwxcb.cn/news/detail-524836.html
package com.roy.scrocket.controller;
import com.roy.scrocket.basic.ScProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author :
* @date :Created in 2020/10/27
* @description:
**/
@RestController
@RequestMapping("/MQTest")
public class MQTestController {
@Resource
private ScProducer producer;
@RequestMapping("/sendMessage")
public String sendMessage(String message){
producer.sendMessage(message);
return "消息發(fā)送完成";
}
}
啟動應(yīng)用后,就可以訪問http://localhost:8080/MQTest/sendMessage?message=123,給RocketMQ發(fā)送一條消息到TestTopic,并在ScConsumer中消費(fèi)到了。文章來源地址http://www.zghlxwxcb.cn/news/detail-524836.html
2、總結(jié)
- 關(guān)于SpringCloudStream。這是一套幾乎通用的消息中間件編程框架,例如從對接RocketMQ換到對接Kafka,業(yè)務(wù)代碼幾乎不需要動,只需要更換pom依賴并且修改配置文件就行了。但是,由于各個(gè)MQ產(chǎn)品都有自己的業(yè)務(wù)模型,差距非常大,所以使用使用SpringCloudStream時(shí)要注意業(yè)務(wù)模型轉(zhuǎn)換。并且在實(shí)際使用中,要非常注意各個(gè)MQ的個(gè)性化配置屬性。例如RocketMQ的個(gè)性化屬性都是以spring.cloud.stream.rocketmq開頭,只有通過這些屬性才能用上RocketMQ的延遲消息、排序消息、事務(wù)消息等個(gè)性化功能。
- SpringCloudStream是Spring社區(qū)提供的一套統(tǒng)一框架,但是官方目前只封裝了kafka、kafka Stream、RabbitMQ的具體依賴。而RocketMQ的依賴是交由廠商自己維護(hù)的, 也就是由阿里巴巴自己來維護(hù)。這個(gè)維護(hù)力度顯然是有不小差距的。所以一方面可以看到之前在使用SpringBoot時(shí)著重強(qiáng)調(diào)的版本問題,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本還是4.4.0。這個(gè)差距就非常大了。另一方面,RocketMQ這幫大神不屑于寫文檔的問題也特別嚴(yán)重,SpringCloudStream中關(guān)于RocketMQ的個(gè)性化配置幾乎很難找到完整的文檔。
- 總之,對于RocketMQ來說,SpringCloudStream目前來說還并不是一個(gè)非常好的集成方案。這方面跟kafka和Rabbit還沒法比。所以使用時(shí)要慎重。
到了這里,關(guān)于rocketMq消息隊(duì)列原生api使用以及rocketMq整合springboot的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!