RocketMQ
一、MQ概述
Message Queue,是一種提供消息隊列服務(wù)的中間件。提供了消息生產(chǎn)、存儲、消費全過程API的軟件系統(tǒng)。
MQ的作用
- 限流削峰:當(dāng)用戶發(fā)送超量請求時,將請求暫存,以便后期慢慢處理。如果不使用MQ暫存直接請求到業(yè)務(wù)系統(tǒng)中容易引起系統(tǒng)崩潰。
- 異步解耦:若上游系統(tǒng)和下游系統(tǒng)為同步調(diào)用,會大大降低系統(tǒng)的吞吐量和并發(fā)量。MQ層實現(xiàn)兩個系統(tǒng)之間的異步調(diào)用
- 數(shù)據(jù)收集:分布式系統(tǒng)會產(chǎn)生海量數(shù)據(jù)流,如業(yè)務(wù)日志、監(jiān)控數(shù)據(jù)、用戶行為。針對這些數(shù)據(jù)流采集匯總,進行大數(shù)據(jù)分析。
主流應(yīng)用的MQ產(chǎn)品
- Kafka:Scala/Java語言開發(fā)。特點是高吞吐量,但會丟數(shù)據(jù),常用與大數(shù)據(jù)領(lǐng)域的實時計算、日志采集等場景。不遵循任何MQ協(xié)議,使用自研協(xié)議。
- RocketMQ:Java語言開發(fā)。經(jīng)過數(shù)年阿里雙十一考驗,性能與穩(wěn)定性非常高,功能全面。不遵循任何MQ協(xié)議,使用自研協(xié)議。開源版不如云上版(阿里商業(yè)版)
MQ常見協(xié)議
-
JMS:Java Messaging Service。Java平臺上有關(guān)MOM(Message Orientated Middleware)的技術(shù)規(guī)范。他便于Java應(yīng)用程序的消息交換,提供標(biāo)準(zhǔn)的接口簡化開發(fā)。ActiveMQ時典型實現(xiàn)
-
STOMP:Streaming Text Orientated Message Protocol。是一種MOM的簡單文本協(xié)議。STOMP提供一個可互操作的連接格式,允許 客戶端與任意STOMP消息代理進行交互。ActiveMQ時典型實現(xiàn)
-
AMQP:Advanced Message Queuing Protocol。一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn),是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn)。RabbitMQ是典型實現(xiàn)
-
MQTT:Message Queueing Telemetry Transport。IBM開發(fā)的一個即時通訊協(xié)議(二進制協(xié)議),主要用于服務(wù)器和低功耗IoT設(shè)備之間的通信
二、基本概念
主題(Topic):表示一類消息的集合(可以理解為消息的類型),每個消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。一個生產(chǎn)者可以同時發(fā)送多種Topic消息,而一個消費者只能接收一種Topic消息
標(biāo)簽(Tag):用于快速過濾消息
三、Linux部署RocketMQ服務(wù)
1、在官網(wǎng)下載編譯好的二進制壓縮包,版本5.0.0即可,上傳到Linux中
2、進行解壓
3、配置環(huán)境變量ROCKETMQ_HOME和NAMESRV_ADDR
4、配置bin目錄下的runserver.sh,根據(jù)實際情況修改JVM的內(nèi)存參數(shù)
5、配置bin目錄下的runbroker.sh,根據(jù)實際情況修改JVM的內(nèi)存參數(shù)
6、執(zhí)行nohup命令后臺運行RocketMQ服務(wù)(nameserver必須先啟動,broker需要再nameserver上注冊)
# 啟動nameserver
nohup bin/mqnamesrv &
# 啟動broker
nohup bin/mqbroker -c [confFile] & # -c可指定加載的配置文件,默認為conf/broker.conf
# 查看日志rocketmq是否成功啟動
tail nohup.out
# 查看進程
jps
# 停止broker
sh bin/mqshutdown broker
# 停止namesrv
sh bin/mqshutdown namesrv
7、執(zhí)行命令測試(rocketmq提供的測試樣例,生產(chǎn)者會發(fā)送一千條消息)
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
8、執(zhí)行命令測試(rocketmq提供的測試樣例,消費者會接受一千條消息)
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
四、RocketMQ API
生產(chǎn)者同步發(fā)送消息
public void test_SyncProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
//設(shè)置注冊服務(wù)的ip地址的端口
producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
//啟動生產(chǎn)者
producer.start();
for(int i=0; i<3; i++){
try {
// 封裝消息,設(shè)置topic,tag(用于消息快速過濾),消息數(shù)據(jù)
Message message = new Message(
"TopicTest",
"TagA",
"ID04287777",
("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步發(fā)送消息并獲取發(fā)送結(jié)果,producer從broker獲取發(fā)送結(jié)果
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
Thread.sleep(1500);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
producer.shutdown();
}
生產(chǎn)者異步發(fā)送消息
public void test_AsyncProducer() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME);
producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for(int i=0; i<messageCount; i++){
final int index = i;
// 封裝消息,設(shè)置topic,tag(用于消息快速過濾),消息數(shù)據(jù)
Message message = new Message(
"TopicTest",
"TagA",
"ID04287777",
("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 異步發(fā)送消息,若broker有響應(yīng)會調(diào)用SendCallback中的方法
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.println(" Send Message "+ index +" OK: "+sendResult);
}
public void onException(Throwable throwable) {
countDownLatch.countDown();
System.out.println(" Send Message "+ index +" Exception: "+throwable);
}
});
//單向發(fā)送
producer.sendOneway(message);
System.out.println("Message "+index+" send done");
}
//在100條消息發(fā)送完后關(guān)閉
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
生產(chǎn)者單向發(fā)送消息
public void test_OneWayProducer() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME);
producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for(int i=0; i<messageCount; i++){
final int index = i;
// 封裝消息,設(shè)置topic,tag(用于消息快速過濾),消息數(shù)據(jù)
Message message = new Message(
"TopicTest",
"TagA",
"ID04287777",
("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//單向發(fā)送
producer.sendOneway(message);
System.out.println("Message "+index+" send done");
}
//在100條消息發(fā)送完后關(guān)閉
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
消費者推模式
public static void test_PushConsumer() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
consumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//消費者訂閱的消息topic和tag(subExpression,*表示任意)
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("Receive New Message : "+list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Start...");
}
消費者拉模式
不同于推模式消費者,拉模式下需要手動管理消息隊列MessageQueue和偏移量offset的映射關(guān)系。但是最新的LitePullConsumer底層源碼已經(jīng)實現(xiàn)對mq和offset的管理,比較方便。
//拉模式消費者
public static void test_LitePullConsumer() throws Exception{
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(RocketMQConstant.CONSUMER_GROUP_NAME);
litePullConsumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
while(true){
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
}finally {
litePullConsumer.shutdown();
}
}
RocketMQ傳遞對象,對象所屬類需要實現(xiàn)序列化接口,并且將對象轉(zhuǎn)換為字節(jié)數(shù)組存入消息體中。
順序消息
保證消息的局部有序(其中幾條消息的有序,不一定是全部消息都要有序),以防止受到網(wǎng)絡(luò)傳輸?shù)挠绊憽?/p>
實現(xiàn)原理
生產(chǎn)者將一組有序的消息一次發(fā)到同一個MessageQueue中(依靠隊列的特點保證局部有序性)。消費者消費完一個MessageQueue的消息后才會去消費下一個MessageQueue的消息。
public class OrderProducer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME);
try {
producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);
producer.start();
for(int i=0; i<5; i++){
//用于指定順序的id
int orderId = i;
for(int j=0; j<5; j++){
Message message = new Message(
WanfengConstant.ORDER_TOPIC,
"order_"+orderId,
"KEY"+orderId,
("order_"+orderId+" step "+j).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//實現(xiàn)消息隊列選擇器對象,使同一個orderId的消息發(fā)送到同一個消息隊列
SendResult sendResult = producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
},
orderId
);
System.out.printf("%s%n", sendResult);
}
}
}catch(Exception e){
e.printStackTrace();
producer.shutdown();
}
}
}
public class OrderConsumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME);
consumer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
try {
consumer.subscribe(WanfengConstant.ORDER_TOPIC, "*");
//實現(xiàn)順序消息監(jiān)聽者接口
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for(MessageExt messageExt : msgs){
System.out.println("Receive Message: " + new String(messageExt.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Start...");
} catch (Exception e) {
e.printStackTrace();
consumer.shutdown();
}
}
}
廣播消息
生產(chǎn)者發(fā)送的消息推送給所有g(shù)roup的消費者
實現(xiàn)原理:將消費者設(shè)置MessageModel為廣播模式。
public class BroadcastConsumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//設(shè)定消息模式為廣播
consumer.setMessageModel(MessageModel.BROADCASTING);
try {
consumer.subscribe(WanfengConstant.ARCHIVE_TOPIC, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach(messageExt -> {
Archive archive = (Archive) WanfengObjectUtil.bytesToObject(messageExt.getBody());
System.out.println("Receive Message : "+archive.getId());
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Broadcast Consumer Start...");
}catch (Exception e){
e.printStackTrace();
consumer.shutdown();
}
}
}
若指定MessageModel為CLUSTERING,則生產(chǎn)者發(fā)送的消息會隨機指定消費者消費。
延遲消息
顧名思義就是消息發(fā)送到broker時延遲指定的時間后再發(fā)送給消費者。常用于定時發(fā)送
過濾消息
過濾消息通過tag實現(xiàn),在消費者端指定過濾的tag即可。
//消費者訂閱tag1或tag2的消息
consumer.subscribe("TopicTest", "tag1 || tag2");
在RocketMQ中,消費者指定過濾條件后,將其上推到Broker中,在Broker中進行tag過濾,以減少網(wǎng)絡(luò)IO,但同時也增加了Broker的繁忙。
事務(wù)消息
public class TransactionProducer {
public static void main(String[] args) {
TransactionMQProducer producer = new TransactionMQProducer(WanfengConstant.PRODUCER_GROUP_NAME);
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("[WANFENG-INFO] TransactionProducer.executeLocalTransaction(): 執(zhí)行成功...");
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagA")) {
//消息提交(發(fā)送出去)
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagB")) {
//消息回滾(丟掉消息)
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("[WANFENG-INFO] TransactionProducer.checkLocalTransaction(): 執(zhí)行成功...");
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
};
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3)
);
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
try {
producer.start();
} catch (Exception e) {
e.printStackTrace();
}
String[] tags = new String[]{"TagA", "TagB", "TagC"};
CountDownLatch countDownLatch = new CountDownLatch(9);
for (int i = 0; i < 9; i++) {
try {
Message message = new Message("TopicTest", tags[i % tags.length], "Key" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.println(sendResult);
Thread.sleep(1000);
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
producer.shutdown();
}
}
}
ACL權(quán)限控制
ACL對用戶對Topic資源的訪問權(quán)限進行控制
在pom依賴中引入acl的依賴包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>5.0.0</version>
</dependency>
在服務(wù)端的conf/broker.conf文件,添加配置,開啟acl
aclEnable=true
在服務(wù)端的conf/plain_acl.yml文件,配置具體權(quán)限規(guī)則(熱加載,不需要重啟mq)
accounts:
- accessKey: RocketMQ #用戶名
secretKey: 12345678 #密碼
whiteRemoteAddress: #訪問地址白名單
admin: false #是否為管理員(管理員可以訪問所有Topic)
defaultTopicPerm: DENY #默認Topic訪問權(quán)限
defaultGroupPerm: SUB #默認組權(quán)限
topicPerms: #Topic對應(yīng)的權(quán)限,若這里找不到則采用defaultTopicPerm
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
在創(chuàng)建生產(chǎn)者對象時需加入RPCHook(acl的用戶信息)
public class AclProducer {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
/**
* 通過用戶名和密碼獲取RPCHook
* @return
*/
public static RPCHook getAclRPCHook(){
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));
}
public static void main(String[] args) throws MQClientException, InterruptedException {
//創(chuàng)建生產(chǎn)者時加入用戶信息,即RPCHook
DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, getAclRPCHook());
producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);
producer.start();
for (int i = 0; i < 20; i++) {
try {
Message message = new Message(
"TopicTest",
WanfengConstant.TAGS_NAME,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息體轉(zhuǎn)換成二進制數(shù)組*/
);
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
}
}
消息軌跡
Producer,Consumer,Broker處理消息的相關(guān)信息
消息軌跡的實現(xiàn)原理是MQ把消息軌跡都往RMQ_SYS_TRACE_TOPIC的Topic中放
在Broker端配置文件開啟消息軌跡文章來源:http://www.zghlxwxcb.cn/news/detail-616237.html
traceTopicEnable=true
創(chuàng)建生產(chǎn)者時指定enableMsgTrace參數(shù)為true,開啟消息軌跡。也可以指定customizedTraceTopic參數(shù)來自定義消息軌跡的Topic。文章來源地址http://www.zghlxwxcb.cn/news/detail-616237.html
public class TraceProducer {
public static void main(String[] args) throws MQClientException {
//指定enableMsgTrace參數(shù)為true,開啟消息軌跡
DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, true);
producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);
producer.start();
for (int i = 0; i < 20; i++) {
try {
Message message = new Message(
"TopicTest",
WanfengConstant.TAGS_NAME,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息體轉(zhuǎn)換成二進制數(shù)組*/
);
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
到了這里,關(guān)于【Java中間件】RocketMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!