這篇具有很好參考價值的文章主要介紹了阿里云RockMQ與SpringBoot的整合。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。
前言:
開源版本Rocket和商業(yè)版本的RocketMQ有些不同,研究的是商業(yè)版本的RocketMQ,阿里云的官方文檔,感覺有點亂??床徽γ靼?,網(wǎng)上雖然有教程,大都還是有點缺少,有時候會突然跳了步驟,抹去了一些細節(jié)。
前置步驟
阿里云MQ開通及子Access賬號的權限的生成
阿里云MQ開通
開通阿里云MQ(現(xiàn)在叫阿里云RocketMQ)百度的教程夠用,不多記錄,需要的參考該地址http://mtw.so/5Q5nHp,進行開通。PS:頁面由于開發(fā)人員一直在更新,教程的頁面不一定和現(xiàn)有頁面完全一樣,所以不要死腦筋。
子Access賬號
阿里云可以為賬號,創(chuàng)建兩個字段,用于你身份的驗證,下圖中可以進入申請子賬戶

跳出提示,選擇開始使用子用戶AccessKey

點擊 創(chuàng)建用戶


點擊確定,會要你驗證手機,輸入驗證碼即可
創(chuàng)建完以后會給你兩個字段的值,一個是AccessKey ID
和AccessKey Secret
一定要及時妥善保存,雖然可以重新創(chuàng)建

ps:這里別忘了給賬戶賦予MQ的權限,不然無法進行消息的訂閱和發(fā)送
如何設置權限?


點擊添加權限,添加以下權限

Topic和Group的創(chuàng)建(在阿里云控制臺頁面進行)
首先創(chuàng)建實例,點擊創(chuàng)建實例


點擊確定

按提示創(chuàng)建Group和Topic 即可,然后將Group和Topic的名稱,填入到application.properties
對應字段中
nameSrvAddr的獲取,在創(chuàng)建好Group和Topic后,從這進入到接入點的獲取頁面


接入點有兩個,分別對應了不同的接入方式。TCP和HTTP,我這里用的TCP協(xié)議的接入方式
這里只能獲取到公網(wǎng)的接入地址,沒有內(nèi)網(wǎng)
?
開始開發(fā)
SpringBoot整合阿里云RocketMQ(普通消息為例)
Maven工程文章來源:http://www.zghlxwxcb.cn/news/detail-553336.html
POM文件依賴文章來源地址http://www.zghlxwxcb.cn/news/detail-553336.html
<dependencies>
<!--主要用來寫WEB接口,這里用來測試MQ的生產(chǎn)者-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--阿里云ons,方便的接入到云服務-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>
<!--神器,這里主要用來輸出日志@Slf4j-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<!--測試用,主要是目的是讓功能帶著spring容器中進行測試-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.properties
#啟動測試之前請?zhí)鎿Q如下 XXX 為您的配置,從阿里云MQ里獲取,具體獲取方式,看下前置步驟
rocketmq.accessKey=xxx
rocketmq.secretKey=xxx
rocketmq.nameSrvAddr=xxx
rocketmq.topic=TpMQTest
rocketmq.groupId=GID_MQTEST
rocketmq.tag=*
rocketmq.orderTopic=XXX
rocketmq.orderGroupId=XXX
rocketmq.orderTag=*
配置類,用于讀取application.properties中相應字段的值
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfig {
private String accessKey;
private String secretKey;
private String nameSrvAddr;
private String topic;
private String groupId;
private String tag;
private String orderTopic;
private String orderGroupId;
private String orderTag;
public Properties getMqPropertie() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
return properties;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getNameSrvAddr() {
return nameSrvAddr;
}
public void setNameSrvAddr(String nameSrvAddr) {
this.nameSrvAddr = nameSrvAddr;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getOrderTopic() {
return orderTopic;
}
public void setOrderTopic(String orderTopic) {
this.orderTopic = orderTopic;
}
public String getOrderGroupId() {
return orderGroupId;
}
public void setOrderGroupId(String orderGroupId) {
this.orderGroupId = orderGroupId;
}
public String getOrderTag() {
return orderTag;
}
public void setOrderTag(String orderTag) {
this.orderTag = orderTag;
}
}
消費者的注冊類
消費者的build,主要目的是將配置文件里的配置設置到ConsumerBean中,使其在Spring啟動時,一同啟動。
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.aliyun.openservices.springboot.example.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Configuration
public class ConsumerClient {
@Autowired
private MqConfig mqConfig;
@Autowired
private DemoMessageListener messageListener;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
consumerBean.setProperties(properties);
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.getTopic());
subscription.setExpression(mqConfig.getTag());
subscriptionTable.put(subscription, messageListener);
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
}
注冊完成以后,開啟監(jiān)聽,在消息隊列有消息時就會進行消費 @Component這個注解,阿里云官方的Demo,并沒有出現(xiàn),導致一直消費者消費不到消息。后來加上以后就能正常消費消息了
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DemoMessageListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
log.info("Receive: " + message);
try {
return Action.CommitMessage;
} catch (Exception e) {
return Action.ReconsumeLater;
}
}
}
生產(chǎn)者注冊類
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.springboot.example.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProducerClient {
@Autowired
private MqConfig mqConfig;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(mqConfig.getMqPropertie());
return producer;
}
}
生產(chǎn)者生產(chǎn)消息工具類
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.springboot.example.config.MqConfig;
import org.springframework.stereotype.Component;
@Component
public class RocketMessageProducer {
private static ProducerBean producer;
private static MqConfig mqConfig;
public RocketMessageProducer(ProducerBean producer, MqConfig mqConfig) {
this.producer = producer;
this.mqConfig = mqConfig;
}
public static void producerMsg(String tag, String key, String body) {
Message msg = new Message(mqConfig.getTopic(), tag, key, body.getBytes());
long time = System.currentTimeMillis();
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
System.out.println(time
+ " Send mq message success.Topic is:" + msg.getTopic()
+ " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()
+ " msgId is:" + sendResult.getMessageId());
} catch (ONSClientException e) {
e.printStackTrace();
System.out.println(time + " Send mq message failed. Topic is:" + msg.getTopic());
}
}
}
WEB接口,測試Controller類
import com.aliyun.openservices.springboot.example.normal.RocketMessageProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@RequestMapping(value = {"/useRocketMQ"}, method = RequestMethod.GET)
public String useRocketMQ() {
RocketMessageProducer.producerMsg("RocketProdTagTest","RocketProdKeyTest","RocketProdBodyTest");
return "請求成功!";
}
}


好了 代碼就放在下面了
Gitee代碼地址
前言:
開源版本Rocket和商業(yè)版本的RocketMQ有些不同,研究的是商業(yè)版本的RocketMQ,阿里云的官方文檔,感覺有點亂??床徽γ靼祝W(wǎng)上雖然有教程,大都還是有點缺少,有時候會突然跳了步驟,抹去了一些細節(jié)。
前置步驟
阿里云MQ開通及子Access賬號的權限的生成
阿里云MQ開通
開通阿里云MQ(現(xiàn)在叫阿里云RocketMQ)百度的教程夠用,不多記錄,需要的參考該地址http://mtw.so/5Q5nHp,進行開通。PS:頁面由于開發(fā)人員一直在更新,教程的頁面不一定和現(xiàn)有頁面完全一樣,所以不要死腦筋。
子Access賬號
阿里云可以為賬號,創(chuàng)建兩個字段,用于你身份的驗證,下圖中可以進入申請子賬戶

跳出提示,選擇開始使用子用戶AccessKey

點擊 創(chuàng)建用戶


點擊確定,會要你驗證手機,輸入驗證碼即可
創(chuàng)建完以后會給你兩個字段的值,一個是AccessKey ID
和AccessKey Secret
一定要及時妥善保存,雖然可以重新創(chuàng)建

ps:這里別忘了給賬戶賦予MQ的權限,不然無法進行消息的訂閱和發(fā)送
如何設置權限?


點擊添加權限,添加以下權限

Topic和Group的創(chuàng)建(在阿里云控制臺頁面進行)
首先創(chuàng)建實例,點擊創(chuàng)建實例


點擊確定

按提示創(chuàng)建Group和Topic 即可,然后將Group和Topic的名稱,填入到application.properties
對應字段中
nameSrvAddr的獲取,在創(chuàng)建好Group和Topic后,從這進入到接入點的獲取頁面


接入點有兩個,分別對應了不同的接入方式。TCP和HTTP,我這里用的TCP協(xié)議的接入方式
這里只能獲取到公網(wǎng)的接入地址,沒有內(nèi)網(wǎng)
?
開始開發(fā)
SpringBoot整合阿里云RocketMQ(普通消息為例)
Maven工程
POM文件依賴
到了這里,關于阿里云RockMQ與SpringBoot的整合的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!
本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!