MQ基本結構
- message: 消息數(shù)據(jù)對象
- product: 程序代碼,生成消息,發(fā)送消息到隊列
- consumer: 程序代碼,監(jiān)聽(綁定)隊列,獲取消息,執(zhí)行消費代碼
- queue: Rocketmq rabbitmq kafka這些消息隊列中間件軟件.
依賴
<dependency>
<!--2.2.2底層rocketmq客戶端4.9.1-->
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
案例:
product
public class MyProducer {
/**
* 向rocketmq發(fā)送第一條消息
*/
@Test
public void sendTest01() throws Exception {
//1.準備一個生產者對象,開啟長鏈接
DefaultMQProducer producer=new DefaultMQProducer();
//對當前producer設置分組
producer.setProducerGroup("first-producer-group");
//連接nameserver localhost:9876
producer.setNamesrvAddr("localhost:9876");
//開啟長鏈接
producer.start();
//2.封裝一個消息對象,我們想要發(fā)送的內容,只是消息的一部分
//創(chuàng)建一個消息對象
Message message=new Message();
//消息攜帶的內容 body
String msg="當前發(fā)送的第一條消息";
message.setBody(msg.getBytes(StandardCharsets.UTF_8));
//設置消息主題,分類,按業(yè)務分類
message.setTopic("first-topic-a");
//主題標簽 和key標識
//3.調用api方法將消息發(fā)送,接收返回結果,查看發(fā)送的信息比如狀態(tài)
//分為異步發(fā)送,同步發(fā)送,異步發(fā)送性能速度更高,但是無法保證成功.
//同步發(fā)送,性能速度沒有異步快,但是可以接收反饋結果
SendResult send = producer.send(message);
//result解析獲取發(fā)送相關的信息
System.out.println("發(fā)送狀態(tài):"+send.getSendStatus());
System.out.println("消息到達主題,隊列,broker信息:"+send.getMessageQueue());
}
}
Consumer
public class MyConsumer1 {
@Test
public void consumerTest01() throws Exception {
//1.構建一個消費者對象,連接nameserver創(chuàng)建長鏈接
// push pull的區(qū)別 push消費端,消費的消息是隊列推送給他的
// pull 消費端代碼執(zhí)行一次pull 拉取過來一條消息
// 收郵件 推的, 搶紅包 拉取的
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer();
//設置nameserver地址
consumer.setNamesrvAddr("localhost:9876");
//消費者分組
consumer.setConsumerGroup("first-consumer-group-a");
//定義監(jiān)聽的主題,消費端代碼會根據(jù)定義的主題尋找nameserver路由信息,找到主題的隊列進行綁定
//topic 主題名稱,subExpression 定義過濾邏輯 *表示匹配所有
consumer.subscribe("first-topic-a","*");
//2.執(zhí)行api開始監(jiān)聽主題,實現(xiàn)隊列的消費
//提供給consumer一個監(jiān)聽器
consumer.setMessageListener(new MessageListenerConcurrently() {
/**
* 推送過來的消息,都會調用consumerMessage執(zhí)行消費邏輯
* @param list 消息數(shù)據(jù) list表示可以批量處理的消息,不是批量消息,list元素只有1個
* @param consumeConcurrentlyContext
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//獲取消息 由于不是批量發(fā)送只有l(wèi)ist一個元素
MessageExt messageExt = list.get(0);
messageExt.getMsgId();//唯一的一個標識,每次消息組裝的對象都會在發(fā)送時,生成一個msgId
byte[] body = messageExt.getBody();
//將消息轉化
String message=new String(body, StandardCharsets.UTF_8);
System.out.println("消費端獲取到消息:"+message);
//context 控制返回確認信息 ackIndex順序
//返回消費狀態(tài) success 隊列會將消息對應當前消費組,移動偏移量,記錄消費完成
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//開啟長連接
consumer.start();
while (true);
}
}
核心概念
1.nameserver
NameServer是一個簡單的 Topic 路由注冊中心,支持 Topic、Broker 的動態(tài)注冊與發(fā)現(xiàn)。
主要包括兩個功能:
- Broker管理,NameServer接受Broker集群的注冊信息并且保存下來作為路由信息的基本數(shù)據(jù)。然后提供心跳檢測機制,檢查Broker是否還存活;
- 路由信息管理,每個NameServer將保存關于 Broker 集群的整個路由信息和用于客戶端查詢的隊列信息。Producer和Consumer通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。
NameServer通常會有多個實例部署,各實例間相互不進行信息通訊。Broker是向每一臺NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當某個NameServer因某種原因下線了,客戶端仍然可以向其它NameServer獲取路由信息。
- 總之: nameserver作為協(xié)調器, 誰的信息被用到,就要到nameserver注冊,誰要用注冊信息,就要到nameserver同步抓取.
broker要作為rocketmq容器被生產者和消費者代碼使用.
2.broker
Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證。
- NameServer幾乎無狀態(tài)節(jié)點,因此可集群部署,節(jié)點之間無任何信息同步。Broker部署相對復雜。
- 在 Master-Slave 架構中,Broker 分為 Master 與 Slave。一個Master可以對應多個Slave,但是一個Slave只能對應一個Master。Master 與 Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。
3.主題隊列
-
簡單理解,主題是一類消息的集合,每次我們發(fā)送消息必須指定消息綁定某一個主題.
-
生產者發(fā)送的某一條消息,只能指向一個主題,多條消息可以指向同一個主題,同一個主題中有多個消息隊列保存消息,消費端可以根據(jù)訂閱的主題消費不同主題的消息.這樣可以實現(xiàn)業(yè)務隔離.
-
比如電商主題可以是order,主題也可以是cart,還可以是product相關的…
-
一類消息,從數(shù)據(jù)的格式,攜帶body格式,都是完全一致的. 不會出現(xiàn)"第一條消息的"body是普通字符串,第二條消息是個對象Json,不可能第一條消息延遲消息(支付訂單倒計時取消),第二條消息普通同步消息.
4.queue隊列
存儲消息的物理實體(最小單位)。一個Topic中可以包含多個Queue(分布式體現(xiàn)的關鍵),每個Queue中存放的就是該Topic的消息。一個Topic的Queue也被稱為一個Topic中消息的分區(qū)**(Partition**)。
注意:一個Topic的Queue中的消息只能被一個消費者組中的一個消費者消費(消費點位邏輯)。一個Queue中的消息不允許同一個消費者組中的多個消費者同時消費。
5. 生產者
問題:通過上述概念的了解,生產者,nameserver,broker之間是如何交互的.
- 啟動 nameserver 保存broker路由信息
- 主題一旦創(chuàng)建,保存broker里,同時生成隊列,這些數(shù)據(jù),作為路由信息保存nameserver
- 生產者從nameserver拿到當前集群所注冊信息(路由)
- 發(fā)送消息的時候,連接具體的那個broker找具體的topic的具體queue實現(xiàn)消息發(fā)送,使用的具體信息,在返回的SendResult中體現(xiàn)
6.消費者分組和生產者分組
消息生產者,負責生產消息。本質上是程序中的一段代碼.Producer投遞消息到broker代理中.找到主題,負載均衡存放到隊列中.
RocketMQ中的消息生產者都是以生產者組(Producer Group)的形式出現(xiàn)的。生產者組是同一類生產者的集合,產生同一類型的消息,這類Producer發(fā)送相同Topic類型的消息。一個生產者組可以同時向多個主題發(fā)送消息。
RocketMQ中的消息消費者都是以消費者組(Consumer Group)的形式出現(xiàn)的。消費者組是同一類消費者的集合,這類Consumer消費的是同一個Topic類型的消息,對應同一類消息數(shù)據(jù)。消費者組使得在消息消費方面,實現(xiàn)負載均衡(將一個Topic中的不同的Queue平均分配給同一個Consumer Group的不同的Consumer,注意,并不是將消息負載均衡)和容錯(一個Consmer掛了,該Consumer Group中的其它Consumer可以接著執(zhí)行消費邏輯.
由于主題中有多個隊列,一組消費者,最多有和隊列一樣數(shù)量的消費成員,再多,無法綁定隊列消費消息了.
7.消費點位
在隊列中記錄了所有和偏移量有關的數(shù)據(jù)比如:文章來源:http://www.zghlxwxcb.cn/news/detail-616238.html
- 最小偏移量:都是0
- 最大偏移量:當前消息的個數(shù)
在消費者中也在記錄偏移量文章來源地址http://www.zghlxwxcb.cn/news/detail-616238.html
- 當前組對應主題隊列的消費最小偏移量,和隊列的最大偏移量(通過這兩個值,能夠知道當前消費者消費到哪個消息,還有多少沒消費)
到了這里,關于RocketMQ基本概念與入門的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!