1.消息的基本概念
1.1.生產(chǎn)者和消費者
生產(chǎn)者(Producer)
消息的創(chuàng)建者。
負責(zé)創(chuàng)建和推送數(shù)據(jù)到消息服務(wù)器。
消費者(Consumer)
消息的接收方。
負責(zé)接收消息和處理數(shù)據(jù)。
1.2.消息隊列(Queue)
消息隊列是RabbitMQ的內(nèi)部對象,用于存儲生產(chǎn)者的消息直到發(fā)送給消費者,它是消費者接收消息的地方。
消息隊列的重要屬性:
持久性
broker重啟前都有效。
自動刪除
在所有消費者停止使用之后自動刪除。
惰性
沒有主動聲明隊列,調(diào)用會導(dǎo)致異常。
排他性
一旦啟用,聲明它的消費者才能使用。
1.3.交換機(Exchange)
交換機用于接收,分配消息。
- 生產(chǎn)者要先指定一個routing key,然后將消息發(fā)送到交換機。
- routing key需要與exchange type和binding key聯(lián)合使用才能最終生效。
- 交換機將消息路由到一個或多個隊列中,或丟棄。
交換機包含4中類型: direct, topic, fanout, headers。
direct(直連交換機)
具有路由功能的交換機,綁定到此交換機的時候需要指定一個routing_key,交換機發(fā)送消息的時候需要routing_key,會將消息發(fā)送道對應(yīng)的隊列。先匹配,再投送。Direct Exchange是RabbitMQ的默認交換機模式。這是最簡單的模式。它根據(jù)routing key全文匹配去尋找隊列。
1.4.消息確認
消息確認是指當(dāng)一個消息從隊列中投遞給消費者(consumer)后,消費者會通知一下消息代理(broker)。消息確認可以自動,也可以由處理消息的開發(fā)者手動執(zhí)行。當(dāng)啟用消息確認后,消息代理需要收到來自消費者的確認回執(zhí)后,才完全將消息從隊列中刪除。
2.七種隊列模式
2.1.簡單模式(Hello World)
做最簡單的事情,一個生產(chǎn)者對應(yīng)一個消費者,RabbitMQ相當(dāng)于一個消息代理,負責(zé)將A的消息轉(zhuǎn)發(fā)給B。
單生產(chǎn)者,單消費者,單隊列。
應(yīng)用場景:
將發(fā)送的電子郵件放到消息隊列,然后郵件服務(wù)在隊列中獲取郵件并發(fā)送給收件人。
2.2.工作隊列模式(Work queues)
在多個消費者之間分配任務(wù)(競爭的消費者模式),一個生產(chǎn)者對應(yīng)多個消費者。適用于資源密集型任務(wù), 單個消費者處理不過來,需要多個消費者進行處理的場景。單生產(chǎn)者,多消費者,單隊列。
應(yīng)用場景:
一個訂單的處理需要10s,有多個訂單可以同時放到消息隊列, 然后讓多個消費者同時并行處理,而不是單個消費者的串行消費。
2.3.發(fā)布訂閱模式(Publish/Subscribe)
一次向許多消費者發(fā)送消息,將消息將廣播到所有的消費者。單生產(chǎn)者,多消費者,多隊列。
應(yīng)用場景:
更新商品庫存后需要通知多個緩存和多個數(shù)據(jù)庫。
結(jié)構(gòu)如下:
一個fanout類型交換機扇出兩個消息隊列,分別為緩存消息隊列、數(shù)據(jù)庫消息隊列
一個緩存消息隊列對應(yīng)著多個緩存消費者
一個數(shù)據(jù)庫消息隊列對應(yīng)著多個數(shù)據(jù)庫消費者
2.4.路由模式(Routing)
根據(jù)Routing Key有選擇地接收消息。多消費者,選擇性多隊列,每個隊列通過routing key全文匹配。發(fā)送消息到交換機并且要指定路由鍵(Routing key) 。消費者將隊列綁定到交換機時需要指定路由key,僅消費指定路由key的消息。
應(yīng)用場景:
在商品庫存中增加了1臺iphone12,iphone12促銷活動消費者指定routing key為iphone12 promote,只有此促銷活動會接收到消息,其它促銷活動不關(guān)心也不會消費此routing key的消息。
2.5.主題模式(Topics)
主題交換機方式接收消息,將routing key和模式進行匹配。多消費者,選擇性多隊列,每個隊列通過模式匹配。隊列需要綁定在一個模式上。#匹配一個詞或多個詞,*只匹配一個詞。
應(yīng)用場景:
iphone促銷活動可以接收主題為多種iPhone的消息,如iphone12、iphone13等。
2.6.遠程過程調(diào)用(RPC)
在遠程計算機上運行功能并等待結(jié)果。
應(yīng)用場景:
需要等待接口返回數(shù)據(jù),如訂單支付。
2.7.發(fā)布者確認(Publisher Confirms)
與發(fā)布者進行可靠的發(fā)布確認,發(fā)布者確認是RabbitMQ擴展,可以實現(xiàn)可靠的發(fā)布。在通道上啟用發(fā)布者確認后,RabbitMQ將異步確認發(fā)送者發(fā)布的消息,這意味著它們已在服務(wù)器端處理。
應(yīng)用場景:
對于消息可靠性要求較高,比如錢包扣款。
3.rabbitmq控制臺管理
打開瀏覽器。訪問 http://127.0.0.1:15672
出現(xiàn)管理頁面:
賬號:guest
密碼:guest
登錄后,您將看到 RabbitMQ 的控制臺界面。該界面將顯示以下幾個主要部分:
Overview:概覽頁面提供了關(guān)于 RabbitMQ 節(jié)點、隊列和交換機等的統(tǒng)計信息。
Connections:連接頁面提供了有關(guān)當(dāng)前客戶端連接的詳細信息。
Channels:通道頁面提供了有關(guān)當(dāng)前打開通道的詳細信息。
Exchanges:交換機頁面提供了有關(guān) RabbitMQ 服務(wù)器上已聲明的交換機的詳細信息。
Queues:隊列頁面提供了有關(guān) RabbitMQ 服務(wù)器上已聲明的隊列的詳細信息。
Admin:管理頁面提供了一些高級管理功能,例如添加用戶、設(shè)置權(quán)限和定義策略等。
通過 RabbitMQ 控制臺,您可以執(zhí)行許多操作,例如創(chuàng)建和刪除隊列、交換機和綁定、查看隊列和交換機的詳細信息、監(jiān)視連接和通道等??刂婆_還提供了一些高級管理功能,例如添加用戶、設(shè)置權(quán)限和定義策略等。
3.1.三種基本隊列
隊列是具有兩個主要操作的順序數(shù)據(jù)結(jié)構(gòu):入隊和出隊。RabbitMQ中的隊列是FIFO(先進先出)。一些隊列因為一些特性,即消費者的優(yōu)先級和重新排隊,會影響消費者消費的順序。
3.1.2.Classic經(jīng)典隊列-單機環(huán)境隊列
這個是RabbitMQ最經(jīng)典的隊列類型。在單機環(huán)境中,擁有比較高的消息可靠性。我們在創(chuàng)建隊列的時候,根據(jù)上圖可以看到,經(jīng)典隊列可以選擇是否持久化(Durability)以及是否自動刪除(Auto delete)兩個屬性。
Durability:是否持久化,可選項為持久化(Durable)和 臨時(Transient)。Durable相對消息安全性更高。但是同時需要有更多的IO操作,所以生產(chǎn)和消費消息的性能,相比Transient會比較低。
Auto delete:是否自動刪除,如果選擇是,則消息會被其中一個消費者消費之后,隊列會自動銷毀,其他消費者也會斷開連接。一般不會自動刪除。
3.1.3.Quorum仲裁隊列-高可用隊列
仲裁隊列,是RabbitMQ從3.8.0版本引入的新的隊列類型,整個3.8.X版本,也是針對仲裁隊列進行完善和優(yōu)化。Quorum相比Classic在分布式環(huán)境下對消息的可靠性保障更高。官方文檔中表示,未來會使用Quorum代替Classic。
Quorum是基于Raft一致性協(xié)議實現(xiàn)的一種新型的分布式消息隊列,他實現(xiàn)了持久化,多備份的FIFO隊列,主要就是針對RabbitMQ的鏡像模式設(shè)計的。簡單理解就是Quorum隊列中的消息需要有集群中多半節(jié)點同意確認后,才會寫入到隊列中。這種隊列類似于RocketMQ當(dāng)中的DLedger集群。這種方式可以保證消息在集群內(nèi)部不會丟失。同時,Quorum是以犧牲很多高級隊列特性為代價,來進一步保證消息在分布式環(huán)境下的高可靠。從整體功能上來說,Quorum隊列是在Classic經(jīng)典隊列的基礎(chǔ)上做減法,因此對于RabbitMQ的長期使用者而言,其實是會影響使用體驗的。他與普通隊列的區(qū)別:
特性 | Classic |
Quorum |
---|---|---|
非持久化隊列(Non-durable queues ) |
支持 | 不支持 |
獨占隊列(Exclusivity ) |
支持 | 不支持 |
每條消息的持久化(Per message persistence ) |
每條消息 | 總是 |
會員變更(Membership changes ) |
自動 | 手動 |
消息TTL (Message TTL ) |
支持 | 支持(3.10版本開始) |
隊列TTL (Queue TTL ) |
支持 | 支持 |
隊列長度限制(Queue length limits ) |
支持 | 支持 |
懶加載(Lazy behaviour ) |
支持 | 始終 |
消息優(yōu)先級(Message priority ) |
支持 | 不支持 |
消費者優(yōu)先級(Consumer priority ) |
支持 | 支持 |
死信交換(Dead letter exchanges ) |
支持 | 支持 |
毒消息處理(Poison message handling ) |
不支持 | 支持 |
全局Qos (Global QoS Prefetch ) |
支持 | 不支持 |
Exclusivity表示獨占隊列,即表示隊列只能由聲明該隊列的Connection連接來進行使用,包括隊列創(chuàng)建、刪除、收發(fā)消息等,并且獨占隊列會在聲明該隊列的Connection斷開后自動刪除。其中有個特例就是這個Poison Message。所謂毒消息是指消息一直不能被消費者正常消費(可能是由于消費者失敗或者消費邏輯有問題等),就會導(dǎo)致消息不斷的重新入隊,這樣這些消息就成為了毒消息。這些讀消息應(yīng)該有保障機制進行標記并及時刪除。
Quorum隊列會持續(xù)跟蹤消息的失敗投遞嘗試次數(shù),并記錄在x-delivery-count這樣一個頭部參數(shù)中。然后,就可以通過設(shè)置 Delivery limit參數(shù)來定制一個毒消息的刪除策略。當(dāng)消息的重復(fù)投遞次數(shù)超過了Delivery limit參數(shù)閾值時,RabbitMQ就會刪除這些毒消息。當(dāng)然,如果配置了死信隊列的話,就會進入對應(yīng)的死信隊列。
Quorum隊列更適合于 隊列長期存在,并且對容錯、數(shù)據(jù)安全方面的要求比低延遲、不持久等高級隊列更能要求更嚴格的場景。例如 電商系統(tǒng)的訂單,引入MQ后,處理速度可以慢一點,但是訂單不能丟失。
也對應(yīng)以下一些不適合使用的場景:
一些臨時使用的隊列:比如transient臨時隊列,exclusive獨占隊列,或者經(jīng)常會修改和刪除的隊列。
對消息低延遲要求高: 一致性算法會影響消息的延遲。
對數(shù)據(jù)安全性要求不高:Quorum隊列需要消費者手動通知或者生產(chǎn)者手動確認。
隊列消息積壓嚴重 : 如果隊列中的消息很大,或者積壓的消息很多,就不要使用Quorum隊列。Quorum隊列當(dāng)前會將所有消息始終保存在內(nèi)存中,直到達到內(nèi)存使用極限。
3.1.3.Stream隊列-大規(guī)模隊列
Stream隊列是RabbitMQ自3.9.0版本開始引入的一種新的數(shù)據(jù)隊列類型,也是目前官方最為推薦的隊列類型。這種隊列類型的消息是持久化到磁盤并且具備分布式備份的,更適合于消費者多,讀消息非常頻繁的場景。
Stream隊列的核心是以append-only只添加的日志來記錄消息,整體來說,就是消息將以append-only的方式持久化到日志文件中,然后通過調(diào)整每個消費者的消費進度offset,來實現(xiàn)消息的多次分發(fā)。這種隊列提供了RabbitMQ已有的其他隊列類型不太好實現(xiàn)的四個特點:
大規(guī)模分發(fā)(large fan-outs)
當(dāng)想要向多個訂閱者發(fā)送相同的消息時,以往的隊列類型必須為每個消費者綁定一個專用的隊列。如果消費者的數(shù)量很大,這就會導(dǎo)致性能低下。而Stream隊列允許任意數(shù)量的消費者使用同一個隊列的消息,從而消除綁定多個隊列的需求。
消息回溯(Replay/Time-travelling)
RabbitMQ已有的這些隊列類型,在消費者處理完消息后,消息都會從隊列中刪除,因此,無法重新讀取已經(jīng)消費過的消息。而Stream隊列允許用戶在日志的任何一個連接點開始重新讀取數(shù)據(jù)。
高吞吐性能(Throughput Performance)
Stream隊列的設(shè)計以性能為主要目標,對消息傳遞吞吐量的提升非常明顯。
大日志(Large logs)
RabbitMQ一直以來有一個讓人詬病的地方,就是當(dāng)隊列中積累的消息過多時,性能下降會非常明顯。但是Stream隊列的設(shè)計目標就是以最小的內(nèi)存開銷高效地存儲大量的數(shù)據(jù)。
3.2.生產(chǎn)者和消費者
生產(chǎn)者是消息的提供方,消費者是消息的執(zhí)行方,代表的是行為的不同,rabbitmq通過登錄來驗證不同的用戶,不同的行為來代表不同人的特征。
3.3.交換機(exchange)
在RabbitMQ中,所有的producer都不會直接把message發(fā)送到queue中,甚至producer都不知道m(xù)essage在發(fā)出后有沒有發(fā)送到queue中,事實上,producer只能將message發(fā)送給exchange,由exchange來決定發(fā)送到哪個queue中。
exchange的一端用來從producer中接收message,另一端用來發(fā)送message到queue,exchange的類型規(guī)定了怎么處理接收到的message,發(fā)布訂閱模式使用到的exchange類型為 fanout ,這種exchange類型非常簡單,就是將接收到的message廣播給已知的(即綁定到此exchange的)所有consumer。
當(dāng)然,如果不想使用特定的exchange,可以使用 exchange=‘’ 表示使用默認的exchange,默認的exchange會將消息發(fā)送到 routing_key 指定的queue,可以參考工作(任務(wù))隊列模式和Hello world模式。
4.Python訪問rabbitmq
在python語言環(huán)境下,使用pika庫來訪問訪問操作rabbitmq中間件。
pip install pika
文章來源:http://www.zghlxwxcb.cn/news/detail-776777.html
4.1.生產(chǎn)者提交消息
# coding=utf-8
# producer
import pika
# 指定遠程 rabbitmq 的用戶名密碼并創(chuàng)建憑證
credentials = pika.PlainCredentials(username="guest", password="guest")
# 1. 創(chuàng)建 connect 連接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))
# 2. 在 connect 上創(chuàng)建一個 channel
channel = connect.channel()
# 3. 在 channel 上聲明交換器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)
# 4. 聲明一個隊列,生產(chǎn)者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行
channel.queue_declare(queue='hello')
# 5. 通過鍵 'world' 將隊列和交換器綁定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')
# 6. 創(chuàng)建純文本消息
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'
# 7. 將消息發(fā)送到 RabbitMQ
message = 'quit'
channel.basic_publish(exchange='hello', routing_key='world', properties=msg_props, body=message)
# 8. 關(guān)閉通道
channel.close()
# 9. 當(dāng)生產(chǎn)者發(fā)送完消息后,可選擇關(guān)閉連接
connect.close()
4.2.消費者執(zhí)行消息
#!/usr/bin/env python
# coding=utf-8
# consumer
import pika
# 指定遠程 rabbitmq 的用戶名密碼并創(chuàng)建憑證
credentials = pika.PlainCredentials(username="guest", password="guest")
# 1. 創(chuàng)建 connect 連接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))
# 2. 在 connect 上創(chuàng)建一個 channel
channel = connect.channel()
# 3. 在 channel 上聲明交換器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)
# 4. 聲明一個隊列,生產(chǎn)者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行
channel.queue_declare(queue='hello')
# 5. 通過鍵 'world' 將隊列和交換器綁定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')
# 6. 定義一個回調(diào)函數(shù),用來接收生產(chǎn)者發(fā)送的消息
'''
在 Python3 中,bytes 和 str 的互相轉(zhuǎn)換方式是
str.encode('utf-8')
bytes.decode('utf-8')
'''
def callback(channel, method, properties, body):
# 消息確認
channel.basic_ack(delivery_tag=method.delivery_tag)
if body.decode('utf-8') == "quit":
# 停止消費,并退出
channel.basic_cancel(consumer_tag='hello-consumer')
channel.close()
connect.close()
else:
print("msg is {}".format(body))
# print("msg type {}".format(type(body)))
# print("msg eval after type {}".format(type(eval(body))))
# 7. 消費者消費
channel.basic_consume('hello', callback, auto_ack=False)
# 8. 開始循環(huán)取消息
channel.start_consuming()
5.總結(jié)
通過使用rabbitmq技術(shù),可以實現(xiàn)生產(chǎn)者和消費者模式,并實現(xiàn)兩者的解耦,生產(chǎn)者負責(zé)通過交換機將數(shù)據(jù)存入隊列,而消費者從隊列中取數(shù)據(jù),并執(zhí)行相應(yīng)的消息。可以用在服務(wù)器復(fù)雜耗時任務(wù)的并行計算中使用,與常用的web服務(wù)器(如apache等)解耦,提高服務(wù)器計算資源的利用效率。文章來源地址http://www.zghlxwxcb.cn/news/detail-776777.html
到了這里,關(guān)于rabbitmq-常見七種消息隊列-控制臺界面管理-python-實現(xiàn)簡單訪問的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!