????????RocketMQ是阿里巴巴開源的消息分布中間件,在阿里內(nèi)部使用非常更廣泛,已經(jīng)經(jīng)過了“雙11”這種萬億級(jí)的應(yīng)用場(chǎng)景考驗(yàn)。
1.安裝
????????下載地址:http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
????????下載完成后解壓縮安裝包到指定目錄。
2.配置
? ?????????(1)環(huán)境變量:ROCKETMQ_HOME
???????????????
? ? ? ? ? ?(2)Path
???????????????
3.啟動(dòng)
? ? ? ? ? 切換到當(dāng)前下載RocketMQ的bin目錄下
????????(1)啟動(dòng)NameServer ---> start mqnamesrv.cmd
?
? ? ? ? ? ? ???
????????(2)啟動(dòng)Broker --->?? start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
?
4.RocketMQ的架構(gòu)及概念
? ? ? ? 看下面這張圖
?????????
?文章來源地址http://www.zghlxwxcb.cn/news/detail-708056.html
? ? ? ? RocketMQ整體分為4個(gè)角色,NameServer、Broker、Producer、Consumer、
? ? ? ? Broker:為RocketMQ的核心,負(fù)責(zé)消息的接受,存儲(chǔ),投遞等功能。
? ? ? ? NameServer:消息隊(duì)列的協(xié)商者,Broker向它注冊(cè)路由信息,同時(shí)Producer和Consumer向其獲取路由信息。
? ? ? ? Producer:消息的生產(chǎn)者,需要從NameServer獲取Borker信息,然后與Broker建立連接,向Broker發(fā)送消息。
? ? ? ? Consumer:消息的消費(fèi)者,需要從NameServer獲取Broker信息,然后與Broker建立連接,從Broker獲取消息。
? ? ? ? 另外還包括別的組件,
? ? ? ? Topic:用來區(qū)分不同的消息類型,發(fā)送和接收消息前都要先創(chuàng)建Topic,針對(duì)Topic來發(fā)送和接收消息。
? ? ? ? Message Queue:消息隊(duì)列,一個(gè)Topic可以設(shè)置一個(gè)或多個(gè)MessageQueue,這樣消息就可以并行往各個(gè)Message Queue發(fā)送消息,消費(fèi)者也可以并行的從多個(gè)Message Queue讀取消息,提高性能和吞吐量。
? ? ? ? Message:消息的載體。
5.消息發(fā)送和接收(應(yīng)用)
? ? ? ?搭建好SpringBoot項(xiàng)目后,先導(dǎo)入RocketMQ所需的依賴:
<!--MQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
5.1 同步發(fā)送? ? ? ??
? ? ? ? 同步發(fā)送方式比較可靠,應(yīng)用也比較廣泛,比如:重要的消息通知,短信通知。
消息發(fā)送方:
//發(fā)送同步消息
public class RocketMQSendTest1 {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//1.創(chuàng)建消息生產(chǎn)者,指定生產(chǎn)者所屬的組名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2.指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3.啟動(dòng)生產(chǎn)者
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("myTopic", "myTag", ("十行代碼九個(gè)錯(cuò)誤八個(gè)警告竟敢說七日精通六天學(xué)會(huì)五湖四海也不見如此三心二意之程序簡(jiǎn)直一等下流" + i).getBytes());
//發(fā)送消息
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
//關(guān)閉生產(chǎn)者
producer.shutdown();
}
}
消息接收方:
//接收消息
public class RocketMQReceiveTest1 {
public static void main(String[] args) throws MQClientException {
//創(chuàng)建消息消費(fèi)者。指定消費(fèi)者所屬的組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//指定Nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//指定消費(fèi)者訂閱的主題和標(biāo)簽
consumer.subscribe("myTopic", "*");
//設(shè)置回調(diào)函數(shù),編寫處理消息的方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("獲取到的消費(fèi)數(shù)據(jù):" + list);
System.out.println(new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動(dòng)消息消費(fèi)者
consumer.start();
System.out.println("Consumer Starting.");
}
}
? ? ? ? 啟動(dòng)時(shí),切記先啟動(dòng)消費(fèi)者(接收方),再啟動(dòng)生產(chǎn)者(發(fā)送方)。
????????啟動(dòng)測(cè)試,同步發(fā)送消息結(jié)果:?
?????????
5.2 異步消息
????????異步消息通常用在對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,即發(fā)送端不能容忍長時(shí)間地等待Broker的響應(yīng)。
發(fā)送方:
//發(fā)送異步消息
//異步消息比較浪費(fèi)性能,經(jīng)常會(huì)失敗,所以多發(fā)送幾次并且讓線程休眠幾秒
public class RocketMQSendTest2 {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
//1.創(chuàng)建消息生產(chǎn)者,指定生產(chǎn)者所屬的組名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2.設(shè)置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3.啟動(dòng)生產(chǎn)者
producer.start();
for (int i = 0; i < 10; i++) {
//4.創(chuàng)建消息對(duì)象,制定主題、標(biāo)簽、消息體
Message message = new Message("myTopic", "myTag2", ("十行代碼九個(gè)錯(cuò)誤八個(gè)警告竟敢說七日精通六天學(xué)會(huì)五湖四海也不見如此三心二意之程序簡(jiǎn)直一等下流").getBytes());
//5.發(fā)送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("發(fā)送成功:" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("發(fā)送異常:" + e);
}
});
//休眠
TimeUnit.SECONDS.sleep(3);
}
//6.關(guān)閉生產(chǎn)者
producer.shutdown();
}
}
接收方:
public class RocketMQReceiveTest2 {
public static void main(String[] args) throws MQClientException {
//1.創(chuàng)建消息消費(fèi)者,指定消費(fèi)者所屬的組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//2.指定Nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//3.指定消費(fèi)者訂閱的主題和標(biāo)簽
consumer.subscribe("myTopic", "*");
consumer.setMessageModel(MessageModel.BROADCASTING);
//4.設(shè)置回調(diào)函數(shù),編寫處理請(qǐng)求的方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(new String(list.get(0).getBody()));
//返回消費(fèi)狀態(tài)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.啟動(dòng)消息消費(fèi)者
consumer.start();
System.out.println("Consumer Starting.");
}
}
啟動(dòng)測(cè)試,異步發(fā)送消息結(jié)果:?
?
5.3 單行發(fā)送消息
? ? ? 該方式用在不關(guān)注發(fā)送結(jié)果的場(chǎng)景,比如日志發(fā)送。
//單行發(fā)送消息
public class RocketMQSendTest3 {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("myTopic", "myTag3", ("蕪湖").getBytes());
producer.sendOneway(message);
TimeUnit.SECONDS.sleep(3);
}
producer.shutdown();
}
}
啟動(dòng)兩個(gè)消費(fèi)者接收10條消息,結(jié)果如下:
?
?
存在消息丟失的情況。
以上就是對(duì)RocketMQ的初步認(rèn)識(shí)啦!文章來源:http://www.zghlxwxcb.cn/news/detail-708056.html
?
到了這里,關(guān)于消息中間件-RocketMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!