一.什么是消息隊列
1.簡介
在介紹消息隊列之前,應該先了解什么是 AMQP(Advanced Message Queuing Protocol, 高級消息隊列協(xié)議,點擊查看)
消息(Message)是指在應用間 傳送的數據,消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象;而 消息隊列(Message Queue)是一種 應用間的 通信方式,消息發(fā)送后可以 立即返回,由 消息系統(tǒng)來確保消息的 可靠傳遞, 消息發(fā)布者只管把消息發(fā)布到 MQ 中而不用管誰來取, 消息使用者只管從 MQ 中取消息而不管是誰發(fā)布的,這樣發(fā)布者和使用者都不用知道對方的存在,它是典型的 生產者-消費者模型,生產者不斷向消息隊列生產消息,消費者不斷的從隊列獲取消息。因為消息的生產和消費都是 異步的,并且只關心消息的發(fā)送和接收,沒有業(yè)務邏輯的浸入,這樣就實現了生產者和消費者的 解耦
2.總結
(1).消息隊列是隊列結構的 中間件
(2).消息發(fā)送后,不需要立即處理,而是由消息系統(tǒng)來處理
(3).消息處理是 消息使用者(消費者) 按順序處理的
3.結構圖

二.為什么要使用消息隊列
消息隊列是一種應用間的異步協(xié)作機制,是分布式系統(tǒng)中的重要的組件,主要目的是為了解決應用 藕合, 異步消息, 流城削鋒, 冗余,擴展性,排序保證等問題,實現 高性能, 高并發(fā), 可伸縮和 最終一致性架構,下面舉例說明
業(yè)務解耦
以常見的訂單系統(tǒng)為例,用戶點擊【下單】按鈕之后的業(yè)務邏輯可能包括:扣減庫存、生成相應單據、發(fā)紅包、發(fā)短信通知,商品配送等業(yè)務;在業(yè)務發(fā)展初期這些邏輯可能放在一起同步執(zhí)行,隨著業(yè)務的發(fā)展訂單量增長,需要提升系統(tǒng)服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執(zhí)行,或者單獨拆分出來作為一個獨立的系統(tǒng),比如生成相應單據為訂單系統(tǒng),扣減庫存為庫存系統(tǒng),發(fā)放紅包獨立為紅包系統(tǒng)、發(fā)短信通知為短信系統(tǒng),商品配送為配送系統(tǒng)等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發(fā)送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發(fā)現 MQ 中有發(fā)紅包或發(fā)短信或商品配送之類的消息時,執(zhí)行相應的業(yè)務系統(tǒng)邏輯,這樣各個業(yè)務系統(tǒng)相互獨立,就很方便進行分離部署,防止某一系統(tǒng)故障引起的連鎖故障

流量削峰
流量削峰一般在秒殺或者團搶活動中廣泛使用
(1).由來
主要是還是來自于互聯(lián)網的業(yè)務場景,例如:春節(jié)火車票搶購,大量的用戶需要同一時間去搶購;以及雙11秒殺, 短時間上億的用戶涌入,瞬間流量巨大(高并發(fā)),比如:200萬人準備在凌晨12:00準備搶購一件商品,但是商品的數量缺是有限的100-500件左右。這樣真實能購買到該件商品的用戶也只有幾百人左右, 但是從業(yè)務上來說,秒殺活動是希望更多的人來參與,也就是搶購之前希望有越來越多的人來看購買商品。但是,在搶購時間達到后,用戶開始真正下單時,秒殺的服務器后端卻不希望同時有幾百萬人同時發(fā)起搶購請求。因為服務器的處理資源是有限的,所以出現峰值的時候,很容易導致服務器宕機,用戶無法訪問的情況出現。這就好比出行的時候存在早高峰和晚高峰的問題,為了解決這個問題,出行就有了錯峰限行的解決方案。同理,在線上的秒殺等業(yè)務場景,也需要類似的解決方案,需要平安度過同時搶購帶來的流量峰值的問題,這就是流量削峰的由來。

(2).怎樣來實現流量削峰方案
削峰從本質上來說就是更多地延緩用戶請求,以及層層過濾用戶的訪問需求,遵從“最后落地到數據庫的請求數要盡量少”的原則。
1).消息隊列解決削峰
要對流量進行削峰,最容易想到的解決方案就是用消息隊列來緩沖瞬時流量,把同步的直接調用轉換成異步的間接推送,中間通過一個隊列在一端承接瞬時的流量洪峰,在另一端平滑地將消息推送出去。

消息隊列中間件主要解決應用耦合,異步消息, 流量削鋒等問題;常用消息隊列系統(tǒng):目前在生產環(huán)境,使用較多的消息隊列有 ActiveMQ、RabbitMQ、 ZeroMQ、Kafka、MetaMQ、RocketMQ 等。
在這里,消息隊列就像“水庫”一樣,攔蓄上游的洪水,削減進入下游河道的洪峰流量,從而達到減免洪水災害的目的。
2).流量削峰漏斗:層層削峰
針對秒殺場景還有一種方法,就是對請求進行分層過濾,從而過濾掉一些無效的請求。分層過濾其實就是采用“漏斗”式設計來處理請求的,如下圖所示:

這樣就像漏斗一樣,盡量把數據量和請求量一層一層地過濾和減少了
I.分層過濾的核心思想
通過在不同的層次盡可能地過濾掉無效請求
通過CDN過濾掉大量的圖片,靜態(tài)資源的請求
再通過類似Redis這樣的分布式緩存,過濾請求等就是典型的在上游攔截讀請求
II.分層過濾的基本原則
對寫數據進行基于時間的合理分片,過濾掉過期的失效請求
對寫請求做限流保護,將超出系統(tǒng)承載能力的請求過濾掉
涉及到的讀數據不做強一致性校驗,減少因為一致性校驗產生瓶頸的問題
對寫數據進行強一致性校驗,只保留最后有效的數據
最終,讓“漏斗”最末端(數據庫)的才是有效請求,例如:當用戶真實達到訂單和支付的流程,這個是需要數據強一致性的。
(3).總結
1).對于秒殺這樣的高并發(fā)場景業(yè)務,最基本的原則就是將請求攔截在系統(tǒng)上游,降低下游壓力。如果不在前端攔截很可能造成數據庫(mysql、oracle等)讀寫鎖沖突,甚至導致死鎖,最終還有可能出現雪崩等場景。
2).劃分好動靜資源,靜態(tài)資源使用CDN進行服務分發(fā)
3).充分利用緩存(redis等),增加QPS,從而加大整個集群的吞吐量
4).高峰值流量是壓垮系統(tǒng)很重要的原因,所以需要RabbitMQ等消息隊列在一端承接瞬時的流量洪峰,在另一端平滑地將消息推送出去
在
異步處理
用戶注冊后,需要發(fā)送注冊郵件和注冊短信

三.RabbitMQ介紹
RabbitMQ是一個由 erlang語言開發(fā)的,實現了AMQP協(xié)議的標準的開源消息代理和隊列服務器( 消息隊列中間件)
1.常見的消息隊列中間件

2.RabbitMQ特性
可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發(fā)布確認
靈活的路由(Flexible Routing)
在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange
消息集群(Clustering)
多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker
高可用(Highly Available Queues)
隊列可以在集群中的機器上進行鏡像,使得在部分節(jié)點出問題的情況下隊列仍然可用
多種協(xié)議(Multi-protocol)
RabbitMQ 支持多種消息隊列協(xié)議,比如 STOMP、MQTT 等
多語言客戶端(Many Clients)
RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby ,PHP等
管理界面(Management UI)
RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監(jiān)控和管理消息 Broker 的許多方面
跟蹤機制(Tracing)
如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發(fā)生了什么
插件機制(Plugin System)
RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件
3.RabbitMQ工作原理
內部實際上也是 AMQP 中的基本概念

上圖各個模塊的說明:
Message: 消息,消息是不具名的,它由消息頭和消息體組成,消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權)、delivery-mode(指出該消息可能需要持久性存儲)等
Publisher: 消息的生產者,也是一個向交換器發(fā)布消息的客戶端應用程序
Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序
Broker: 接收和分發(fā)消息的應用,表示消息隊列服務器實體,RabbitMQ Server就是Message Broker
Virtual host: 虛擬主機(共享相同的身份認證和加密環(huán)境的獨立服務器域),表示一批交換器、消息隊列和相關對象,類似于mysql的數據庫,當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創(chuàng)建exchange、queue等.每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制,vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 /
Connection: publisher、consumer和broker之間的網絡連接,比如:TCP連接,斷開連接的操作只會在client端進行,Broker不會斷開連接,除非出現網絡故障或broker服務出現問題
Channel: 管道,多路復用連接中的一條獨立的雙向數據流通道,信道是建立在真實的TCP連接內地虛擬連接(邏輯連接),如果應用程序支持多線程,通常每個多線程創(chuàng)建單獨的channel進行通訊, 因為AMQP 方法中包含了channel id幫助客戶端和broker識別channel,所以channel之間是完全隔離的,AMQP 命令都是通過管道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,這些動作都是通過管道完成。因為對于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低,所以引入了管道的概念,目的是為了減少操作系統(tǒng)建立TCP 連接的開銷,以復用一條 TCP 連接
Exchange: 交換器,用來接收生產者發(fā)送的消息并將這些消息路由給服務器中的隊列,message到達broker的第一站,根據分發(fā)規(guī)則,匹配查詢表中的路由鍵(routing key),分發(fā)消息到queue中去,常用的類型有:直連交換機-direct (point-to-point), 主題交換機-topic (publish-subscribe),扇型交換機-fanout (multicast), 頭交換機-headers(amq.match (and amq.headers in RabbitMQ))
Queue: 消息隊列,用來保存消息直到發(fā)送給消費者,它是消息的容器,也是消息的終點,一個消息可投入一個或多個隊列,消息最終被送到這里等待消費者連接到這個隊列將其取走
Binding: 綁定,消息隊列(queue)和交換器(exchange)之間的虛擬連接, binding中可以包含routing key, Binding信息被保存到exchange中的查詢表中,用于message的分發(fā)依據,一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規(guī)則,所以可以將交換器理解成一個由綁定構成的路由表
四.RabbitMQ安裝與啟動
見linux下,docker-compose 安裝nignx,php,mysql,redis,rabbitmq,mongo
端口說明:安裝完rabbitmq后,有幾個常見端口:
4369: epmd(Erlang Port Mapper Daemon),erlang服務端口
5672: client端通信端口
15672:http Api客戶端,管理UI(僅在啟用了管理插件的情況下使用)
25672:用于節(jié)點間通信(Erlang分發(fā)服務器端口)
五.RabbitMQ幾個重要特性概念講解
隊列模式-簡單隊列模式,工作隊列模式
ACK&NACK消費確認機制&重回隊列機制
消息持久化
公平調度(限流機制)
冪等性
return機制
消息的可靠性投遞
下面是RabbitMQ和消息所涉及到的一些 術語
生產(Producing)的意思就是發(fā)送:發(fā)送消息的程序就是一個生產者(producer),一般用"P"來表示:

隊列(queue)就是存在于RabbitMQ中郵箱的名稱:雖然消息的傳輸經過了RabbitMQ和你的應用程序,但是它只能被存儲于隊列當中。實質上隊列就是個巨大的消息緩沖區(qū),它的大小只受主機內存和硬盤限制。多個生產者(producers)可以把消息發(fā)送給同一個隊列,同樣,多個消費者(consumers)也能夠從同一個隊列(queue)中獲取數據。隊列可以繪制成這樣(圖上是隊列的名稱):

消費(Consuming)和接收(receiving)是同一個意思:一個消費者(consumer)就是一個等待獲取消息的程序。把它繪制為"C":

以php框架yii2為參照
簡單隊列模式(simple queue)
發(fā)送 單個消息的生產者,以及 接收消息并將其 打印出來的消費者。將忽略RabbitMQ API中的一些細節(jié)。 在下圖中,“P”是生產者,“C”是消費者。中間的框是一個隊列(保存消息的地方)

(1).生產者發(fā)布消息步驟
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($host, $port, $user, $pass, $v_host="/");
// 創(chuàng)建channel
$channel = $connection->channel();
// 初始化隊列,并持久化(聲明隊列)
$channel->queue_declare($queue_name, false, true, false, false);
//消息內容
$data = "this is message2";
// 聲明消息,并持久化(創(chuàng)建消息)
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到隊列里(發(fā)布消息)
$channel->basic_publish($mes, '', $queue_name);
//關閉通道和連接
$channel->close();
$connection->close();
上面聲明隊列方法queue_declare()參數詳解

(2).消費者消費消息步驟
//核心代碼
basic_consume($queue = '', $consumer_tag = '', $no_local = false,$no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
上面消費消息方法basic_consume()參數詳解

(3).具體代碼展示
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 賬戶
'pass' => 123456, // 密碼
"v_host" => "order", // 對應Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 隊列名稱
],
]
生產者代碼
<?php
/**
* 生產者生產消息
*/
namespace console\controllers\simple;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 隊列名稱
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 初始化隊列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
//消息
$data = "this is message2";
// 聲明消息,并持久化
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到隊列里
$channel->basic_publish($mes, '', $queue_name);
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者代碼
<?php
/**
* 消費者消費消息
*/
namespace console\controllers\simple;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 隊列名稱
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 初始化隊列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消費消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
};
$channel->basic_consume($queue_name, "", false, true, false, false,$callback);
// 監(jiān)控
while ($channel->is_open()){
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
工作隊列模式(worker queue)
創(chuàng)建一個 工作隊列(Work Queue),它會發(fā)送一些 耗時的任務給 多個工作者(Worker),工作隊列(又稱: 任務隊列——Task Queues)是為了 避免等待一些占用大量資源、時間的操作,當把任務(Task)當作消息發(fā)送到隊列中,一個運行在后臺的工作者(worker)進程就會取出任務然后處理,當運行多個工作者(workers),任務就會在它們之間 共享。這個概念在網絡應用中是非常有用的,它可以在短暫的HTTP請求中處理一些復雜的任務,使用工作隊列的一個 好處就是它能夠 并行的處理隊列。如果堆積了很多任務,只需要添加 更多的工作者(workers)就可以了,這就是所謂的 循環(huán)調度,擴展很簡單

(1).具體代碼展示
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 賬戶
'pass' => 123456, // 密碼
"v_host" => "order", // 對應Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 隊列名稱
"name2" => "task_queue", // 隊列名稱
],
]
生產者代碼
<?php
/**
* 生產者生產消息
*/
namespace console\controllers\worker;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 隊列名稱
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 初始化隊列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 生產多條消息
for ($i = 0; $i <= 10; ++$i) {
//消息
$data = "this is " . $i. " message";
// 聲明消息,并持久化
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到隊列里
$channel->basic_publish($mes, '', $queue_name);
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者代碼
當生產者生產了多條費時的消息時,一個消費者不能滿足需要,可以添加多個消費者處理生產者的消息,多個消費者之間采用 輪詢的方式獲取隊列的消息,并把該消息發(fā)送給對應的用戶
比如:可以對一個隊列的消息開多個消費者,這里我們開了兩個消費者,里面的代碼都是一致的
<?php
/**
* 消費者消費消息
*/
namespace console\controllers\worker;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 隊列名稱
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 初始化隊列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消費消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
};
$channel->basic_consume($queue_name, "", false, true, false, false,$callback);
// 監(jiān)控
while ($channel->is_open()){
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
<?php
/**
* 消費者2消費消息
*/
namespace console\controllers\worker;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer2Controller extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 隊列名稱
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 初始化隊列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消費消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
};
$channel->basic_consume($queue_name, "", false, true, false, false,$callback);
// 監(jiān)控
while ($channel->is_open()){
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
ACK消費確認機制&NACK&重回隊列機制
ACK消費確認機制
當處理一個 比較耗時得任務的時候,想知道消費者(consumers) 運行到一半就掛掉時, 正在處理的消息/發(fā)送給當前工作者的消息會怎樣,當消息在隊列中 沒有進行持久化操作時,消息被RabbitMQ發(fā)送給消費者(consumers)之后,馬上就會在內存中 移除。這種情況,只要把一個 工作者(worker)停止,正在處理的消息就會 丟失。同時,所有發(fā)送到這個工作者的還沒有處理的消息 都會丟失。所以,如果不想消息丟失,當一個工作者(worker)掛掉了,希望任務會重新發(fā)送給其他的工作者(worker),RabbitMQ就提供了 消息響應( acknowledgments)。消費者會通過一個 ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然后RabbitMQ就會 釋放并刪除這條消息。如果消費者(consumer) 掛掉了, 沒有發(fā)送響應,RabbitMQ就會認為消息 沒有被完全處理,然后 重新發(fā)送給 其他消費者(consumer)。這樣,即使工作者(workers)偶爾的掛掉,也不會丟失消息。消息是沒有超時這個概念的,當工作者與它斷開連的時候,RabbitMQ會重新發(fā)送消息,這樣在處理一個耗時非常長的消息任務的時候就不會出問題了。在該講解中,將使用手動消息確認,通過為 no_ack參數傳遞 false,一旦有任務完成,使用d.ack()(false)向RabbitMQ服務器發(fā)送消費完成的確認(這個
確認消息是單次傳遞的)

//核心代碼
basic_consume($queue = '', $consumer_tag = '', $no_local = false, $no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
// 消費消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
// 消費ack
$msg->ack();
};
// 第四個參數: 需要ack確認,這里我們在callback手動確認
$channel->basic_consume($queue_name, "", false, false, false, false,$callback);

rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 賬戶
'pass' => 123456, // 密碼
"v_host" => "order", // 對應Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 隊列名稱
"name2" => "task_queue", // 隊列名稱
"name3" => "task_ack", // 隊列名稱
],
]
生產者代碼
<?php
/**
* 生產者生產消息
*/
namespace console\controllers\ack;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 隊列名稱
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 初始化隊列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 生產多條消息
for ($i = 0; $i <= 10; ++$i) {
//消息
$data = "this is " . $i. " message";
// 聲明消息,并持久化
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到隊列里
$channel->basic_publish($mes, '', $queue_name);
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者代碼
<?php
/**
* 消費者消費消息
*/
namespace console\controllers\ack;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 隊列名稱
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 初始化隊列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消費消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
// 消費ack
$msg->ack();
};
// 第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者
$channel->basic_qos(null, 1, null);
// 第四個參數: 需要ack確認,這里我們在callback手動確認
$channel->basic_consume($queue_name, "", false, false, false, false,$callback);
// 監(jiān)控
while ($channel->is_open()){
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
NACK&重回隊列機制
當設置了方法 basic_consume中 $no_ack = false 時,使用手工 ACK 方式,除了ACK外,其實還有 NACK 方式,當手工 AcK 時,會發(fā)送給Broker( 服務器)一個應答,代表消息處理成功,Broker就可回送響應給生產端 .
NACK 則表示消息處理失敗,如果設設置了重回隊列, Broker 端就會將沒有成功處理的消息重新發(fā)送
通俗來講:
手工ACK:消費成功了,向發(fā)起者確認
NACK:消費失敗,讓生產者重新發(fā)
一般在實際應用中,都會關閉重回隊列,也就是設置為false
使用方式
消費端消費時.如果由于業(yè)務異常,可以手工 NACK 記錄日志,然后進行補償
API :basic_nack($delivery_tag, $multiple = false, $requeue = false)
如果由于服務器宕機等嚴里問題,就需要手工 ACK 保障消費端消費成功
API :basic_ack($delivery_tag, $multiple = false)
4.消息持久化
如果沒有特意告訴RabbitMQ,那么在它退出或者崩潰的時候,將會丟失所有隊列和消息,為了確保消息不會丟失,有兩個事情是需要注意的:必須把“隊列”和“消息”設為持久化首先,為了不讓隊列消失,需要把隊列聲明為持久化(durable),但這里面會有一定的問題,它會返回一個錯誤,可以使用一個快捷的解決方法——用不同名字的隊列,例如task_queue
代碼如上面生產者/消費者所示:
// 初始化隊列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
消息持久化配置: " delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT
5.公平調度(限流機制)
為什么要限流
當工作者處理消息時,會出現這么一個問題:比如有兩個工作者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕松,然而RabbitMQ并不知道這些,它仍然一如既往的派發(fā)消息,這時因為RabbitMQ只管分發(fā)進入隊列的消息,不會關心有多少消費者(consumer)沒有作出響應,它盲目的把第n-th條消息發(fā)給第n-th個消費者
假設還有這樣的場景:RabbitMQ服務器有上萬條未處理的消息,隨便打開一個消費者Client ,會造成巨量的消息瞬間全部推送過來,然而單個客戶端無法同時處理這么多數據,此時很有可能導致服務器崩潰,嚴重的可能導致線上的故障
還有一些其他的場景:比如說單個 生產端一分鐘產生了幾百條數據,但是單個消費端 一分鐘可能只能處理 60 條,這個時候生產端-消費端肯定是不平衡的,通常生產端是沒辦法做限制的,所以消費端肯定需要做一些限流措施,否則如果超出最大負載,可能導致 消費端 性能下降,服務器卡頓甚至崩潰等一系列嚴重后果
RabbitMQ 提供了一種 qos (服務質質量保證)功能,即在 非自動確認消息的前提下,如果一定數目的消息 ( 通過基于 生產者或者 channel設置 Qos 的值) 未被確認前,不消費新的消息
不能設置自動簽收功能( auto_ack = false ),如果消息未被確認,就不會到達 消費端 ,目的就是給 生產端 減壓

這是可以設置預取計數值為1,告訴RabbitMQ一次只向一個worker發(fā)送一條消息,換句話說,在處理并確認前一個消息之前,不要向工作人員發(fā)送新消息
如上面ACK消費確認機制中消費者代碼:
// 第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者
//basic_qos($prefetch_size, $prefetch_count, $a_global)
$channel->basic_qos(null, 1, null);
參數說明:
$prefetch_size:單條消息的大小限制, 通常設置為 0 ,表示不做限制
$prefetch_count:一次最多能處理多少條消息
$a_global:是否將上面設置: true 應用于 channel 級別, false 代表消費者級別
$prefetch_size,$a_global這兩項, RabbitMQ 沒有實現,暫且不研究.$prefetch_count 在auto_ack = f alse 的情況下生效,即在自動應答的情況下該值無效

6.冪等性概念
一句話概括: 用戶對于同一操作發(fā)起的一次請求或者多次請求的結果是一致的
比如:對一個SQL執(zhí)行100次1000次,我們可以借鑒數據庫的樂觀鎖機制:比如我們執(zhí)行一條更新庫存的SQL語句:update T_reps set count = count -1, version = version +1 where version = 1,數據庫的樂觀鎖在執(zhí)行更新操作前一先去數據庫查詢此version ,然后執(zhí)行更新語句,以此version作為條件,如果執(zhí)行更新時有其他人先更新了這張表的數據,那么這個條件就不生效了,也就不會執(zhí)行操作了,通過這種 樂觀鎖的機制來保障幕等性
消費端 - 冪等性保障
在海量訂單產生的業(yè)務高峰期,如何避免 消息的重復消費問題?
在業(yè)務高峰期:容易產生 消息重復消費問題,當消費端消費完消息時,在給生產者端返回ack時由于 網絡中斷,導致生產端 未收到確認信息,該條消息就會 重新發(fā)送并 被消費端消費,但實際上該消費端已成功消費了該條消息,這就造成了重復消費.而 消費端實現 冪等性,就意味著:消息不會被多次消費,即使收到了很多一樣的消息
業(yè)界主流的冪等性操作解決方案:
(1)唯一Id + 指紋碼 機制,核心:利用數據庫主鍵去重
唯一Id: 業(yè)務表主鍵
指紋碼: 為了區(qū)別每次正常操作的碼,每次操作時生成指紋碼;可以用時間截+業(yè)務編號或者標志位(具體視業(yè)務場景而定 )
select count(1) from t_order where id = 唯一Id + 指紋碼
優(yōu)勢:實現簡單
弊端:高并發(fā)下有數據庫寫入的性能瓶頸
解決方案:根據ID進行分庫分表進行算法路由
(2)利用Redis的原子性去實現
第一:是否要進行數據落庫,如果落庫的話,關鍵解決的問題是數據庫和緩存如何做到原子性?
第二:如果不進行落庫,那么都存儲到緩存中,如何設置定時同步的策略?
7.return機制
Return Listener用于處理一些不可路由的消息,也是生產階段添加的一個監(jiān)聽
消息生產者通過指定一個Exchange和Routing Key,把消息送達到某一個隊列中去,然后消費者監(jiān)聽隊列,進行消費處理操作
但是在某些情況下,如果在發(fā)送消息的時候,當前的Exchange不存在或者指定的路由key路由不到,這個時候如果需要監(jiān)聽這種不可達的消息,就要使用Return Listener
在API中有一個關鍵的配置項 Mandatory:如果為true,則監(jiān)聽器會接收到路由不可達的消息,然后進行后續(xù)處理,如果為false,那么Broker(服務器)會自動刪除該消息

8.消息的可靠性投遞
(1).什么是生產端的可靠性投遞?
保障消息的成功發(fā)出
保障MQ節(jié)點的成功接收
發(fā)送端收到MQ節(jié)點(Broker)確認應答
完善的消息進行補償機制(在大廠一般都不會加事務,都是進行補償操作)
在實際生產中,很難保障前三點的完全可靠,比如在 極端的環(huán)境中,生產者發(fā)送消息失敗了,發(fā)送端在接受確認應答時突然發(fā)生網絡閃斷等等情況,很難保障可靠性投遞,所以就需要有第四點完善的 消息補償機制
(2).解決方案
方案一 消息信息落庫,對消息狀態(tài)進行打標(常見方案)
將消息持久化到 DB中并設置狀態(tài)值,收到 Consumer 的應答就改變當前記錄的狀態(tài)
再輪詢重新發(fā)送沒接收到應答的消息,注意這里要設置重試次數

方案實現流程
比如下單成功
步驟1
對訂單數據入ORDER_DB 訂單庫,并對因此生成的業(yè)務消息入 MSG_DB 消息庫,此處由于采用了兩個數據庫,需要兩次持久化操作,為了保證數據的一致性,有人可能就想采用分布式事務,但在大廠實踐中,基本都是采用補償機制
這里一定要保證步驟1中消息都存儲成功了,沒有出現任問異常情況,然后生產端再進行消息發(fā)送.如果失敗了就進行快速失敗機制
對業(yè)務數據和消息入庫完畢就進入步驟2
步驟2
發(fā)送消息到MQ服務上,如果一切正常無誤消費者監(jiān)聽到該消息,進入步驟3
步驟3
生產端有一個 confi rm Listener ,異步監(jiān)聽 Broker(服務端) 回送的響應,從而判斷消息是否投遞成功
步驟4
如果成功,去數據庫查詢該消息.并將消息狀態(tài)更新為 1
步驟5
如果出現意外情況,消費者未接收到或者Listener 接收確認時發(fā)生網絡閃斷,導致生產端的Listener 就永遠收不到這條消息的 confi rm應答了,也就是說這條消息的狀態(tài)就一直為0 了,這時候就需要用到分布式定時任務來從 MSG_DB 數據庫抓取那些超時了還未被消費的消息,重新發(fā)送一遍。此時需要設置一個規(guī)則,比如說消息在入庫時候設置一個臨界值 timeout , 5 分鐘之后如果還是0的狀態(tài)那就需要把消息抽取出來。這里使用的是分布式定時任務,去定時抓取 MSG_DB中距離消息創(chuàng)建時間超過 5 分鐘的且狀態(tài)為0 的消息
步驟6
把抓取出來的消息進行重新投遞( Retry Send ) ,也就是從第二步開始繼續(xù)往下走
步驟7
當然有些消息可能就是由于一些實際的問題無法路由到 Broker ,比如Routing Key設置不對,對應的隊列被誤刪除了,那么這種消息即使重試多次也仍然無法投遞成功,所以需要對重復次數做限制,比如限制 3 次,如果投遞次數大于3次,那么就將消息狀態(tài)更新為 2 ,表示這個消息最終投遞失敗,然后通過補償機制,人工去處理,實際生產中.這種情況還是比較少的,但是不能沒有這個補償機制,要不然就做不到可靠性了
缺點
在第一步需要更新或者插入操作數據庫2次
優(yōu)化
不需要消息進行持久化 只需要業(yè)務持久化
方案二 消息的延遲投遞,做二次確認,回調檢查(不常用,大廠在用的高并發(fā)方案)

方案實現流程
步驟1
(上游服務: Upstream service )業(yè)務入庫,然后send 消息到broker,這兩步是有先后順序的
步驟2
進行消息延遲發(fā)送到新的隊列(延遲時間為 5 分鐘:業(yè)務決定)
步驟3
(下游服務: Downstream service )監(jiān)聽到消息然后處理消息
步驟4
下游服務 send confirm生成新的消息到 broker (這里是一個新的隊列 )
步驟5
callback service 去監(jiān)聽這個消息,并且入庫,如果監(jiān)聽到,表示這個消息已經消費成功
步驟6
callback service 去檢查 步驟2投遞的延遲消息是否 在msgDB里面是否消費成功,不存在或者消費失敗就會 Resend command
如果在第 1 , 2 , 4 步失敗,如果成功broker 會給一個 confirm ,失敗當然沒有,這是消息可靠性投遞的里要保障
9.注意
關于隊列大小: 如果所有的工作者都處理繁忙狀態(tài),隊列就會被填滿,需要留意這個問題,要么添加更多的工作者(workers),要么使用其他策略
六.RabbitMQ幾種常見的交換器模式
1.消息模型基本介紹
前面的教程中,講的是 發(fā)送消息到隊列并從中取出消息,現在介紹RabbitMQ中 完整的消息模型
簡單的概括一下之前講的:
發(fā)布者(producer)是發(fā)布消息的應用程序
隊列(queue)用于消息存儲的緩沖
消費者(consumer)是接收消息的應用程序
RabbitMQ消息模型的 核心理念是:發(fā)布者(producer) 不會直接發(fā)送任何消息給 隊列,事實上,發(fā)布者(producer)甚至不知道消息是否已經被投遞到隊列。發(fā)布者(producer)只需要 把消息發(fā)送給一個交換機(exchange),交換機非常簡單,它一邊從發(fā)布者方 接收消息,一邊把消息 推送到隊列,交換機 必須知道如何處理它接收到的消息,是應該 路由到 指定的隊列還是是 多個隊列,或者是直接 忽略消息,這些規(guī)則是通過交換機類型(exchange type)來定義的

有幾個可供選擇的交換器類型:direct, topic, headers和fanout
direct(直連/定向交換器) |
消息與一個特定的路由鍵完全匹配 |
topic(主題交換器) |
使用通配符*,#,讓路由鍵和某種模式進行匹配 |
headers(頭交換器) |
不處理路由鍵,而是根據發(fā)送的消息內容中的 headers 屬性進行匹配 |
fanout(扇型交換器) |
發(fā)布/ 訂閱模式可以理解為廣播模式:即exchange會將消息轉發(fā)到所有綁定到這個exchange的隊列上,這種類型在發(fā)送消息,queue bind時,都將忽略route key,也就是說不需要設置 route key |
Routing Key(路由鍵): 生產者將消息發(fā)送給交換器,一般都會指定一個Routing Key,用來指定這個消息的路由規(guī)則,而這個Routing Key需要與交換器類型和綁定鍵(Binding Key)聯(lián)合使用才能生效
Binding(綁定):它是Exchange與Queue之間的虛擬連接,通俗的講就是交換器和隊列之間的聯(lián)系(這個隊列(Queue)對這個交換機(Exchange)的消息感興趣),實現了根據不同的Routing Key(路由規(guī)則),交換機將消息路由(發(fā)送)到對應的Queue上
2.交換器核心方法
//試探性申請一個交換器,若該交換器存在,則跳過,不存在則創(chuàng)建
exchange_declare($exchange,$type,$passive = false,$durable = false,$auto_delete = true,$internal = false,$nowait = false,$arguments = array(),$ticket = null)
參數名 |
默認值 |
解釋 |
$exchange |
交換器名稱 |
|
$type |
交換器類型: ’’ 默認交換機 匿名交換器 未顯示聲明類型都是該類型 fanout 扇形交換器 會發(fā)送消息到它所知道的所有隊列,每個消費者獲取的消息都是一致的 headers 頭部交換器 direct 直連交換器,該交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配 topic 主題交換器 該交換機會對路由鍵正則匹配,必須是*(一個單詞)、#(多個單詞,以.分割) ,eg:user.key .abc.* 類型的key |
|
$passive |
false |
只判斷不創(chuàng)建(一般用于判斷交換器是否存在) true: 1.如果exchange已存在則直接連接并且不檢查配置比如已存在的exchange是fanout,新需要建立的是direct,也不會報錯; 2.如果exchange不存在則直接報錯 false: 1.如果exchange不存在則創(chuàng)建新的exchange 2.如果exchange已存在則判斷配置是否相同,如果配置不相同則直接報錯,比如已存在的exchange是fanout,新需要建立的是direct,會報錯。 |
$durable |
false |
設置是否持久化,設置為true,表示持久化,反之非持久化,持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關信息 |
$auto_delete |
true |
設置是否自動刪除,設置為true時,表示自動刪除。自動刪除的前提:至少有一個隊列或者交換器與這個交換器綁定,之后所有與這個交換器綁定的隊列或者交換器都與此解綁 |
$internal |
false |
設置是否為內置的,設置為true表示是內置的交換器,客戶端程序無法直接發(fā)送消息到這個交換器,只能通過交換器路由到這個交換器 |
$nowait |
false |
如果為true,表示不等待服務器回執(zhí)信息,函數將返回null,可以提高訪問速度 |
$arguments |
array() |
其他結構化參數 |
$ticket |
null |
3.fanout模式(廣播模式)
廣播模式可以理解為:發(fā)布/訂閱模式,即exchange會將消息轉發(fā)到所有綁定到這個exchange的隊列上。針對這種廣播模式,在發(fā)送消息,queue bind時,都將忽略route key,也就是說不需要設置 route key
案例一
一個生產者生產消息并發(fā)布消息到交換器上,多個消費者訂閱該交換器,并與之隊列綁定,消費消息
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 賬戶
'pass' => 123456, // 密碼
"v_host" => "order", // 對應Virtual Hosts
],
"exchange_name" => [
"name1" => "exch", // 交換器名稱
],
]
生產者
<?php
/**
* 交換器fanout(廣播)模式: 生產者生產消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明并初始化交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 聲明一個數據
$data = "this is a exchange message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 發(fā)布消息到交換器
$channel->basic_publish($msg, $exchangeName);
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者(多個)
消費者1
<?php
/**
* 交換器fanout(廣播)模式: 消費者消費消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
// $queueName = $rabbitMqConfig["queue_name"]["name4"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明對應的交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 聲明隊列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交換機與隊列綁定
$channel->queue_bind($queueName, $exchangeName);
// 消息回調處理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
$channel->basic_qos(null, 1, null);
// 消費者消費消息: 第四個參數: 需要ack確認
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 監(jiān)控
while ($channel->is_open()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者2
<?php
/**
* 交換器fanout(廣播)模式: 消費者消費消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer2Controller extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
// $queueName = $rabbitMqConfig["queue_name"]["name4"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明對應的交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 聲明隊列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交換機與隊列綁定
$channel->queue_bind($queueName, $exchangeName);
// 消息回調處理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
$channel->basic_qos(null, 1, null);
// 消費者消費消息: 第四個參數: 需要ack確認
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 監(jiān)控
while ($channel->is_open()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
案例二
舉個實際應用的場景:比方說用戶注冊(注銷,更改姓名等)新浪,同時需要開通微博、博客、郵箱等,如果不采用隊列,按照常規(guī)的線性處理,可能注冊用戶會特別的慢,因為在注冊的時候,需要調各種其他服務器接口,如果服務很多的話,可能客戶端就超時了。如果采用普通的隊列,可能在處理上也會特別的慢(不是最佳方案),如果采用訂閱模式,則是最優(yōu)的選擇
處理過程如下:
用戶提交username、pwd…等之類的基本信息,將數據提交register.php中
register.php對數據進行校驗,符合注冊要求,生成uid,并將和基本信息json后,發(fā)布一條消息到對應的交換器中,同時直接顯示用戶注冊成功
exchange中的多個隊列,如(queue.process、queue.boke、queue.weibo、queue.email)都訂閱了這個消息,根據各業(yè)務自身的邏輯來處理
總結:
1.不申明隊列,因為發(fā)布/訂閱模式下,是可以隨時添加新的訂閱隊列
2.exchange的Type指定為fanout(廣播模式)
3.隊列不需要指定route key,綁定exchange
代碼如下:
生產者
<?php
/**
* 交換器fanout(廣播)模式: 生產者生產消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public function actionIndex()
{
? ? ? ? /*
? ? ? ? ? ? 用戶注冊邏輯
? ? ? ? */
? ? ? ?
? ? ? ? //發(fā)送消息邏輯
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = "register";
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明并初始化交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 聲明一個數據
$data = "{uid:xxx,reg_time:xxx}";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 發(fā)布消息到交換器
$channel->basic_publish($msg, $exchangeName);
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者(多個)
可以創(chuàng)建多個不同類型的消費者(開通微博、博客、郵箱)等邏輯功能的消費者
<?php
/**
* 交換器fanout(廣播)模式: 消費者消費消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = "register";
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明對應的交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 聲明一個匿名隊列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交換機與隊列綁定
$channel->queue_bind($queueName, $exchangeName);
// 消息回調處理
$callback = function ($meg) {
? ? ? ? ? ? //處理邏輯
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
$channel->basic_qos(null, 1, null);
// 消費者消費消息: 第四個參數: 需要ack確認
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 監(jiān)控
while ($channel->is_open()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
4.Direct模式
Direct交換器將消息投遞到路由參數 完全匹配的隊列中

直接上代碼
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 賬戶
'pass' => 123456, // 密碼
"v_host" => "order", // 對應Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 隊列名稱
"name2" => "task_queue", // 隊列名稱
"name3" => "task_ack", // 隊列名稱
"name4" => "exchange_fanout_1", // 隊列名稱
],
"exchange_name" => [
"name1" => "exch", // 交換器名稱
"name2" => "exch_direct_log", // 交換器名稱
],
"routing_key" => [
"info_key" => "info", // 路由鍵
"error_key" => "error", // 路由鍵
"warn_key" => "warn", // 路由鍵
],
】
生產者
<?php
/**
* 交換器direct(routing_key-更詳細的bind模式)模式: 生產者生產消息
*/
namespace console\controllers\exchange\direct;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
$routingKey = $rabbitMqConfig["routing_key"]["error_key"];
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明并初始化交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 聲明一個數據
$data = "this is a ". $routingKey . " message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 發(fā)布消息到交換器, 并和路由鍵匹配
$channel->basic_publish($msg, $exchangeName, $routingKey);
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者
<?php
/**
* 交換器direct(routing_key-更詳細的bind模式)模式: 消費者消費消息
*/
namespace console\controllers\exchange\direct;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerWarnController extends Controller
{
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
$routingKey = $rabbitMqConfig["routing_key"]["warn_key"]; //路由鍵可以修改為其他key,與生產者bind的關聯(lián)
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明對應的交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 聲明一個匿名隊列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交換機與隊列綁定,并指定routing_key
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 消息回調處理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
$channel->basic_qos(null, 1, null);
// 消費者消費消息: 第四個參數: 需要ack確認
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 監(jiān)控
while ($channel->is_open()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
5.topic模式
發(fā)送到topic交換器的消息不可以攜帶隨意routing_key,它的routing_key必須是一個由 .分隔開的 詞語列表,這些單詞隨便是什么都可以,但是最好是跟攜帶它們的消息有關系的詞匯,以下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",詞語的個數可以隨意,但是 不要超過255字節(jié)。binding key也必須擁有同樣的格式,topic交換器背后的邏輯跟direct交換機很相似 : 一個攜帶著特定routing_key的消息會被topic交換機投遞給綁定鍵與之想匹配的隊列,但是它的binding key和routing_key有兩個特殊應用方式:
* (星號) 用來表示一個單詞
# (井號) 用來表示任意數量(零個或多個)單詞
下邊用圖說明:
這個例子里,發(fā)送的所有消息都是用來描述小動物的,發(fā)送的消息所攜帶的路由鍵是由三個單詞所組成的,這三個單詞被兩個.分割開,路由鍵里的第一個單詞

描述的是動物的手腳的利索程度,第二個單詞是動物的顏色,第三個是動物的種類,所以它看起來是這樣的: <celerity>.<colour>.<species>。
創(chuàng)建了三個綁定:Q1的綁定鍵為 *.orange.*,Q2的綁定鍵為 *.*.rabbit 和 lazy.# 。
Q1-->綁定的是
中間帶 orange 帶 3 個單詞的字符串 (*.orange.*)
Q2-->綁定的是
最后一個單詞是 rabbit 的 3 個單詞 (*.*.rabbit)
第一個單詞是 lazy 的多個單詞 (lazy.#)
這三個綁定鍵被可以總結為:
Q1 對所有的桔黃色動物都感興趣
Q2 則是對所有的兔子和所有懶惰的動物感興趣
一個攜帶有 quick.orange.rabbit 的消息將會被分別投遞給這兩個隊列,攜帶著 lazy.orange.elephant 的消息同樣也會給兩個隊列都投遞過去。另一方面攜帶有 quick.orange.fox 的消息會投遞給第一個隊列,攜帶有 lazy.brown.fox 的消息會投遞給第二個隊列。攜帶有 lazy.pink.rabbit 的消息只會被投遞給第二個隊列一次,即使它同時匹配第二個隊列的兩個綁定。攜帶著 quick.brown.fox 的消息不會投遞給任何一個隊列。
注意:
如果違反約定,發(fā)送了一個攜帶有一個單詞或者四個單詞("orange" or "quick.orange.male.rabbit")的消息時,發(fā)送的消息不會投遞給任何一個隊列,而且會丟 失掉
但是另一方面,即使 "lazy.orange.male.rabbit" 有四個單詞,他還是會匹配最后一個綁定,并且被投遞到第二個隊列中。
topic交換機是很強大的,它可以表現出跟其他交換機類似的行為 當一個隊列的binding key為 "#"(井號) 的時候,這個隊列將會無視消息的routing key,接收所有的消息。 當 * (星號) 和 # (井號) 這兩個特殊字符都未在binding key中出現的時候,此時Topic交換機就擁有的direct交換機的行為
代碼如下:
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 賬戶
'pass' => 123456, // 密碼
"v_host" => "order", // 對應Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 隊列名稱
"name2" => "task_queue", // 隊列名稱
"name3" => "task_ack", // 隊列名稱
"name4" => "exchange_fanout_1", // 隊列名稱
],
"exchange_name" => [
"name1" => "exch", // 交換器名稱
"name2" => "exch_direct_log", // 交換器名稱
"name3" => "exch_topic_log", // 交換器名稱
],
"routing_key" => [
"info_key" => "info", // 路由鍵
"error_key" => "error", // 路由鍵
"warn_key" => "warn", // 路由鍵
"all_key" => "#", // 所有的路由鍵
"user_info" => "user.info", // 路由鍵
"user_warn" => "user.warn", // 路由鍵
"user_all" => "user.*", // 匹配以user.開頭的路由鍵
],
]
】
生產者
<?php
/**
* 交換器topic(通配符)模式: 生產者生產消息
*/
namespace console\controllers\exchange\topic;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
$routingKey = $rabbitMqConfig["routing_key"]["user_info"];
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明并初始化交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
// 聲明一個數據
$data = "this is a ". $routingKey . " message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 發(fā)布消息到交換器, 并和路由鍵匹配
$channel->basic_publish($msg, $exchangeName, $routingKey);
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者
<?php
/**
* 交換器topic(通配符)模式: 消費者消費消息
*
*/
namespace console\controllers\exchange\topic;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
$routingKey = $rabbitMqConfig["routing_key"]["user_all"];
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明對應的交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
// 聲明隊列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交換機與隊列綁定,并指定routing_key
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 消息回調處理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
$channel->basic_qos(null, 1, null);
// 消費者消費消息: 第四個參數: 需要ack確認
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 監(jiān)控
while ($channel->is_open()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
七.死信隊列,延時隊列
1.死信隊列
死信( Dead Letter )是RabbitMQ 中的一種 消息機制,當在消費消息時,如果隊列里的消息出現以下情況:
消息被拒絕
消息在隊列的存活時間超過設置的 TTL 時間
TTL (Time To Live),即生存時間
RabbitMQ支持消息的過期時間,在消息發(fā)送時可以進行指定
RabbitMQ 支持為每個隊列設號消息的超時時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那么消息會被自動清除
消息隊列的消息數量已經超過最大隊列長度
那么該消息將成為“死信”,“死信”消息會被 RabbitMQ進行特殊處理,如果配置了死信隊列信息,那么該消息將會被丟進死信隊列中,如果沒有配置,則該消息將會被丟棄
RabbitMQ 中有一種交換器叫 DLX,全稱為 Dead 一 Letter 一 Exchange ,可以稱之為 死信交換器,當消息在一個隊列中變成死信( dead message )消息之后,它會被重新發(fā)送到另外一個交換器中,這個交換器就是DLX , 綁定在 DLX上的隊列就稱之為死信隊列, 程序就可以監(jiān)聽這個隊列中的消息,并做相應的處理,該特性 可以彌補RabbitMQ3.0以前支持的immediate參數的功能
消息變成死信有以下幾種情況:
消息被拒絕消息
TTL 過期(延遲隊列)
隊列達到最大長度


2.延時隊列
延時隊列就是用來存放需要在 指定時間被處理的元素的隊列.
一般可以利用 死信隊列的特性來 實現延遲隊列:只要給消息設置一個過期時間,消息過期就會自動進入死信隊列,消費者只要監(jiān)聽死信隊列就可以實現延遲隊列了
應用場景
訂單在十分鐘之內未支付則自動取消
賬單在一周內未支付,則自動結算
某個時間下發(fā)一條通知
用戶注冊成功后,如果三天內沒有登陸則進行短信提醒
用戶發(fā)起退款,如果三天內沒有得到處理則通知相關運營人員
下面通過一個案例來更進一步了解死信隊列,延時隊列
案例1
訂單在一段時間內未支付則自動取消,步驟:
(1).創(chuàng)建訂單操作
(2).訂單創(chuàng)建成功后,訂單相關數據json處理
(3).構建rabbitmq消息隊列,并設置消息過期時間為60秒,把訂單相關json數據發(fā)布到交換器, 并和路由鍵匹配,生產者生產消息60秒之后,消息會進入到死信隊列,消費者監(jiān)聽死信隊列,處理訂單
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 賬戶
'pass' => 123456, // 密碼
"v_host" => "order", // 對應Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 隊列名稱
"name2" => "task_queue", // 隊列名稱
"name3" => "task_ack", // 隊列名稱
"name4" => "exchange_fanout_1", // 隊列名稱
"name5" => "queue_pay", // 訂單支付隊列
],
"exchange_name" => [
"name1" => "exch", // 交換器名稱
"name2" => "exch_direct_log", // 交換器名稱
"name3" => "exch_topic_log", // 交換器名稱
"name4" => "exch_pay", // 訂單支付, 交換器名稱
],
"routing_key" => [
"info_key" => "info", // 路由鍵
"error_key" => "error", // 路由鍵
"warn_key" => "warn", // 路由鍵
"order_key" => "order_pay", // 訂單支付
"all_key" => "#", // 所有的路由鍵
"user_info" => "user.info", // 路由鍵
"user_warn" => "user.warn", // 路由鍵
"user_all" => "user.*", // 匹配以user.開頭的路由鍵
],
"dead_letter" => [ // 死信隊列
"exchange_name" => [ // 死信隊列交換機名稱
"pay" => "dead_exch_pay"
],
"queue_name" => [ // 死信隊列名稱
"pay" => "dead_queue_pay"
],
"routing_key" => [ // 死信隊列routing名稱
"pay" => "dead_routing_key_pay"
]
]
]
生產者
<?php
/**
* 死信隊列,延時隊列: 生產者推送消息到隊列,模擬訂單支付
*/
namespace console\controllers\exchange\dead;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$deadConfig = Yii::$app->params["rabbitMq"]["dead_letter"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name4"];
$queueName = $rabbitMqConfig["queue_name"]["name5"];
$routingKey = $rabbitMqConfig["routing_key"]["order_key"];
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明并初始化交換器, 交換機類型: routing_key-更詳細的bind模式
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 消息隊列的額外參數
$args = new AMQPTable([
'x-message-ttl' => 2000, // 消息的過期時間
"x-dead-letter-exchange" => $deadConfig["exchange_name"]["pay"], // 死信隊列交換機名稱
"x-dead-letter-routing-key" => $deadConfig["routing_key"]["pay"] // 死信隊列routing名稱
]);
// 聲明隊列
$channel->queue_declare($queueName, false, true, false, false, false, $args);
// 交換機和隊列綁定
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 聲明死信交換機
$channel->exchange_declare($deadConfig["exchange_name"]["pay"], AMQPExchangeType::DIRECT, false, false, false);
// 聲明死信隊列
$channel->queue_declare($deadConfig["queue_name"]["pay"], false, true, false, false, false);
// 死信交換機和隊列綁定
$channel->queue_bind($deadConfig["queue_name"]["pay"], $deadConfig["exchange_name"]["pay"], $deadConfig["routing_key"]["pay"]);
// 聲明一個數據:里面可以是用戶訂單相關json數據
$data = "this is a dead message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 發(fā)布消息到交換器, 并和路由鍵匹配
$channel->basic_publish($msg, $exchangeName, $routingKey);
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者
<?php
/**
* 死信隊列,延時隊列: 模擬訂單支付,消費者消費消息
*
*/
namespace console\controllers\exchange\dead;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["dead_letter"]["exchange_name"]["pay"];
$queueName = $rabbitMqConfig["dead_letter"]["queue_name"]["pay"];
$routingKey = $rabbitMqConfig["dead_letter"]["routing_key"]["pay"];
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明對應的交換器
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 交換機與隊列綁定,并指定routing_key
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 消息回調處理
$callback = function ($meg) {
? ? ? ? ? ? //處理訂單相關數據
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
$channel->basic_qos(null, 1, null);
// 消費者消費消息: 第四個參數: 需要ack確認
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 監(jiān)控
while ($channel->is_open()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
問題
通過上面的案例就可以實現死信隊列,延時隊列操作,上面看上去似乎沒什么問題,實測一下就會發(fā)現 消息不會“如期死亡 ”。當先生產一個TTL為60s的消息,再生產一個TTL為5s的消息,第二個消息并不會再5s后過期進入死信隊列,而是需要等到第一個消息TTL到期后,與第一個消息一同進入死信隊列, 這是因為RabbitMQ 只會判斷隊列中的第一個消息是否過期
那么怎么來解決這個問題呢?
通過 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件來解決
此插件的原理是將消息在交換機處暫存儲在mnesia(一個分布式數據系統(tǒng))表中,延遲投遞到隊列中,等到消息到期再投遞到隊列當中
rabbitmq_delayed_message_exchange插件安裝
(1).下載地址:https://www.rabbitmq.com/community-plugins.html
注意: 要下載與rabbitmq相對應的版本

(2).把下載的插件放到指定位置
下載的文件為zip格式,將zip格式解壓,插件格式為ez,將文件復制到插件目錄:
Linux
/usr/lib/rabbitmq/lib/rabbitmq_server-xxx/plugins
rabbitmq-plugins list
Windows
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.17\plugins
(3).啟動插件
Linux
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Windows文章來源:http://www.zghlxwxcb.cn/news/detail-430819.html

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
啟動信息:文章來源地址http://www.zghlxwxcb.cn/news/detail-430819.html

(4).查看
進入:http://localhost:15672/#/exchanges

重構代碼
生產者
生產者實現的關鍵點:
1.在聲明交換機時不在是direct類型,而是x-delayed-message類型,這是由插件提供的類型
2.交換機要增加"x-delayed-type": "direct"參數設置
3.發(fā)布消息時,要在 Headers 中設置x-delay參數,來控制消息從交換機過期時間
<?php
/**
* 死信隊列,延時隊列插件使用: 模擬訂單支付
*/
namespace console\controllers\exchange\delay;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$delayConfig = Yii::$app->params["rabbitMq"]["delay"];
$config = $rabbitMqConfig["base"];
// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明并初始化交換器, 交換機類型: 延時插件名稱(x-delayed-message)
$channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
// 消息隊列的額外參數
$args = new AMQPTable(["x-delayed-type" => "direct"]);
// 聲明隊列
$channel->queue_declare($delayConfig["queue_name"]["pay"], false, true, false, false, false, $args);
// 交換機和隊列綁定
$channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
// 聲明一個數據
$data = "this is a dead message";
// 初始化消息并持久化
$arr = [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT,
"application_headers" => new AMQPTable([
"x-delayed" => 2000 // 過期時間
])
];
$msg = new AMQPMessage($data, $arr);
// 發(fā)布消息到交換器, 并和路由鍵匹配
$channel->basic_publish($msg, $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
//關閉通道和連接
$channel->close();
$connection->close();
}
}
消費者
沒有啥特別的修改
<?php
/**
* 死信隊列,延時隊列插件使用: 模擬訂單支付
*
*/
namespace console\controllers\exchange\delay;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相關配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$delayConfig = Yii::$app->params["rabbitMq"]["delay"];
$config = $rabbitMqConfig["base"];// 創(chuàng)建連接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 創(chuàng)建channel
$channel = $connection->channel();
// 聲明并初始化交換器, 交換機類型: 延時插件名稱(x-delayed-message)
$channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
// 交換機和隊列綁定
$channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
// 消息回調處理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
$channel->basic_qos(null, 1, null);
// 消費者消費消息: 第四個參數: 需要ack確認
$channel->basic_consume($delayConfig["queue_name"]["pay"], '', false, false, false, false, $callback);
// 監(jiān)控
while ($channel->is_open()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
}
}
到了這里,關于一文讀懂RabbitMQ消息隊列的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!