国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

一文讀懂RabbitMQ消息隊列

這篇具有很好參考價值的文章主要介紹了一文讀懂RabbitMQ消息隊列。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一.什么是消息隊列

1.簡介

在介紹消息隊列之前,應該先了解什么是 AMQP(Advanced Message Queuing Protocol, 高級消息隊列協(xié)議,點擊查看)
消息(Message)是指在應用間 傳送的數據,消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象;而 消息隊列(Message Queue)是一種 應用間通信方式,消息發(fā)送后可以 立即返回,由 消息系統(tǒng)來確保消息的 可靠傳遞, 消息發(fā)布者只管把消息發(fā)布到 MQ 中而不用管誰來取, 消息使用者只管從 MQ 中取消息而不管是誰發(fā)布的,這樣發(fā)布者和使用者都不用知道對方的存在,它是典型的 生產者-消費者模型,生產者不斷向消息隊列生產消息,消費者不斷的從隊列獲取消息。因為消息的生產和消費都是 異步的,并且只關心消息的發(fā)送和接收,沒有業(yè)務邏輯的浸入,這樣就實現了生產者和消費者的 解耦

2.總結

(1).消息隊列是隊列結構的 中間件
(2).消息發(fā)送后,不需要立即處理,而是由消息系統(tǒng)來處理
(3).消息處理是 消息使用者(消費者) 按順序處理的

3.結構圖

一文讀懂RabbitMQ消息隊列

二.為什么要使用消息隊列

消息隊列是一種應用間的異步協(xié)作機制,是分布式系統(tǒng)中的重要的組件,主要目的是為了解決應用 藕合, 異步消息, 流城削鋒, 冗余,擴展性,排序保證等問題,實現 高性能, 高并發(fā), 可伸縮最終一致性架構,下面舉例說明
  1. 業(yè)務解耦

以常見的訂單系統(tǒng)為例,用戶點擊【下單】按鈕之后的業(yè)務邏輯可能包括:扣減庫存、生成相應單據、發(fā)紅包、發(fā)短信通知,商品配送等業(yè)務;在業(yè)務發(fā)展初期這些邏輯可能放在一起同步執(zhí)行,隨著業(yè)務的發(fā)展訂單量增長,需要提升系統(tǒng)服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執(zhí)行,或者單獨拆分出來作為一個獨立的系統(tǒng),比如生成相應單據為訂單系統(tǒng),扣減庫存為庫存系統(tǒng),發(fā)放紅包獨立為紅包系統(tǒng)、發(fā)短信通知為短信系統(tǒng),商品配送為配送系統(tǒng)等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發(fā)送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發(fā)現 MQ 中有發(fā)紅包或發(fā)短信或商品配送之類的消息時,執(zhí)行相應的業(yè)務系統(tǒng)邏輯,這樣各個業(yè)務系統(tǒng)相互獨立,就很方便進行分離部署,防止某一系統(tǒng)故障引起的連鎖故障

一文讀懂RabbitMQ消息隊列
  1. 流量削峰

流量削峰一般在秒殺或者團搶活動中廣泛使用

(1).由來

主要是還是來自于互聯(lián)網的業(yè)務場景,例如:春節(jié)火車票搶購,大量的用戶需要同一時間去搶購;以及雙11秒殺, 短時間上億的用戶涌入,瞬間流量巨大(高并發(fā)),比如:200萬人準備在凌晨12:00準備搶購一件商品,但是商品的數量缺是有限的100-500件左右。這樣真實能購買到該件商品的用戶也只有幾百人左右, 但是從業(yè)務上來說,秒殺活動是希望更多的人來參與,也就是搶購之前希望有越來越多的人來看購買商品。但是,在搶購時間達到后,用戶開始真正下單時,秒殺的服務器后端卻不希望同時有幾百萬人同時發(fā)起搶購請求。因為服務器的處理資源是有限的,所以出現峰值的時候,很容易導致服務器宕機,用戶無法訪問的情況出現。這就好比出行的時候存在早高峰和晚高峰的問題,為了解決這個問題,出行就有了錯峰限行的解決方案。同理,在線上的秒殺等業(yè)務場景,也需要類似的解決方案,需要平安度過同時搶購帶來的流量峰值的問題,這就是流量削峰的由來。

一文讀懂RabbitMQ消息隊列

(2).怎樣來實現流量削峰方案

削峰從本質上來說就是更多地延緩用戶請求,以及層層過濾用戶的訪問需求,遵從“最后落地到數據庫的請求數要盡量少”的原則。

1).消息隊列解決削峰

要對流量進行削峰,最容易想到的解決方案就是用消息隊列來緩沖瞬時流量,把同步的直接調用轉換成異步的間接推送,中間通過一個隊列在一端承接瞬時的流量洪峰,在另一端平滑地將消息推送出去。

一文讀懂RabbitMQ消息隊列

消息隊列中間件主要解決應用耦合,異步消息, 流量削鋒等問題;常用消息隊列系統(tǒng):目前在生產環(huán)境,使用較多的消息隊列有 ActiveMQ、RabbitMQ、 ZeroMQ、Kafka、MetaMQ、RocketMQ 等。

在這里,消息隊列就像“水庫”一樣,攔蓄上游的洪水,削減進入下游河道的洪峰流量,從而達到減免洪水災害的目的。

2).流量削峰漏斗:層層削峰

針對秒殺場景還有一種方法,就是對請求進行分層過濾,從而過濾掉一些無效的請求。分層過濾其實就是采用“漏斗”式設計來處理請求的,如下圖所示:

一文讀懂RabbitMQ消息隊列
這樣就像漏斗一樣,盡量把數據量和請求量一層一層地過濾和減少了
I.分層過濾的核心思想
  • 通過在不同的層次盡可能地過濾掉無效請求

  • 通過CDN過濾掉大量的圖片,靜態(tài)資源的請求

  • 再通過類似Redis這樣的分布式緩存,過濾請求等就是典型的在上游攔截讀請求

II.分層過濾的基本原則
  • 對寫數據進行基于時間合理分片,過濾掉過期的失效請求

  • 對寫請求做限流保護,將超出系統(tǒng)承載能力的請求過濾掉

  • 涉及到的讀數據不做強一致性校驗,減少因為一致性校驗產生瓶頸的問題

  • 對寫數據進行強一致性校驗,只保留最后有效的數據

最終,讓“漏斗”最末端(數據庫)的才是有效請求,例如:當用戶真實達到訂單和支付的流程,這個是需要數據強一致性的。

(3).總結

1).對于秒殺這樣的高并發(fā)場景業(yè)務,最基本的原則就是將請求攔截在系統(tǒng)上游,降低下游壓力。如果不在前端攔截很可能造成數據庫(mysql、oracle等)讀寫鎖沖突,甚至導致死鎖,最終還有可能出現雪崩等場景。

2).劃分好動靜資源,靜態(tài)資源使用CDN進行服務分發(fā)

3).充分利用緩存(redis等),增加QPS,從而加大整個集群的吞吐量

4).高峰值流量是壓垮系統(tǒng)很重要的原因,所以需要RabbitMQ等消息隊列在一端承接瞬時的流量洪峰,在另一端平滑地將消息推送出去

  1. 異步處理

用戶注冊后,需要發(fā)送注冊郵件和注冊短信
一文讀懂RabbitMQ消息隊列

三.RabbitMQ介紹

RabbitMQ是一個由 erlang語言開發(fā)的,實現了AMQP協(xié)議的標準的開源消息代理和隊列服務器( 消息隊列中間件)

1.常見的消息隊列中間件

一文讀懂RabbitMQ消息隊列

2.RabbitMQ特性

  • 可靠性(Reliability)
    RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發(fā)布確認

  • 靈活的路由(Flexible Routing)
    在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange

  • 消息集群(Clustering)
    多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker

  • 高可用(Highly Available Queues)
    隊列可以在集群中的機器上進行鏡像,使得在部分節(jié)點出問題的情況下隊列仍然可用

  • 多種協(xié)議(Multi-protocol)
    RabbitMQ 支持多種消息隊列協(xié)議,比如 STOMP、MQTT 等

  • 多語言客戶端(Many Clients)
    RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby ,PHP等

  • 管理界面(Management UI)
    RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監(jiān)控和管理消息 Broker 的許多方面

  • 跟蹤機制(Tracing)
    如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發(fā)生了什么

  • 插件機制(Plugin System)
    RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件

3.RabbitMQ工作原理

內部實際上也是 AMQP 中的基本概念
一文讀懂RabbitMQ消息隊列

上圖各個模塊的說明:

  • Message: 消息,消息是不具名的,它由消息頭消息體組成,消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權)、delivery-mode(指出該消息可能需要持久性存儲)

  • Publisher: 消息的生產者,也是一個向交換器發(fā)布消息客戶端應用程序

  • Consumer:消息的消費者,表示一個從消息隊列中取得消息客戶端應用程序

  • Broker: 接收和分發(fā)消息的應用,表示消息隊列服務器實體,RabbitMQ Server就是Message Broker

  • Virtual host: 虛擬主機(共享相同的身份認證和加密環(huán)境的獨立服務器域),表示一批交換器、消息隊列和相關對象,類似于mysql的數據庫,當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創(chuàng)建exchange、queue等.每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制,vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 /

  • Connection: publisher、consumer和broker之間的網絡連接,比如:TCP連接,斷開連接的操作只會在client端進行,Broker不會斷開連接,除非出現網絡故障broker服務出現問題

  • Channel: 管道,多路復用連接中的一條獨立的雙向數據流通道,信道是建立在真實的TCP連接內虛擬連接(邏輯連接),如果應用程序支持多線程,通常每個多線程創(chuàng)建單獨的channel進行通訊, 因為AMQP 方法中包含了channel id幫助客戶端和broker識別channel,所以channel之間是完全隔離的,AMQP 命令都是通過管道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,這些動作都是通過管道完成。因為對于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低,所以引入了管道的概念,目的是為了減少操作系統(tǒng)建立TCP 連接的開銷,以復用一條 TCP 連接

  • Exchange: 交換器,用來接收生產者發(fā)送的消息并將這些消息路由給服務器中的隊列,message到達broker的第一站,根據分發(fā)規(guī)則,匹配查詢表中的路由鍵(routing key),分發(fā)消息到queue中去,常用的類型有:直連交換機-direct (point-to-point), 主題交換機-topic (publish-subscribe),扇型交換機-fanout (multicast), 頭交換機-headers(amq.match (and amq.headers in RabbitMQ))

  • Queue: 消息隊列,用來保存消息直到發(fā)送給消費者,它是消息的容器,也是消息的終點,一個消息可投入一個或多個隊列,消息最終被送到這里等待消費者連接到這個隊列將其取走

  • Binding: 綁定,消息隊列(queue)和交換器(exchange)之間的虛擬連接, binding中可以包含routing key, Binding信息被保存到exchange中的查詢表中,用于message的分發(fā)依據,一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規(guī)則,所以可以將交換器理解成一個由綁定構成的路由表

四.RabbitMQ安裝與啟動

見linux下,docker-compose 安裝nignx,php,mysql,redis,rabbitmq,mongo

端口說明:安裝完rabbitmq后,有幾個常見端口:
4369: epmd(Erlang Port Mapper Daemon),erlang服務端口
5672: client端通信端口
15672:http Api客戶端,管理UI(僅在啟用了管理插件的情況下使用)
25672:用于節(jié)點間通信(Erlang分發(fā)服務器端口)

五.RabbitMQ幾個重要特性概念講解

  • 隊列模式-簡單隊列模式,工作隊列模式

  • ACK&NACK消費確認機制&重回隊列機制

  • 消息持久化

  • 公平調度(限流機制)

  • 冪等性

  • return機制

  • 消息的可靠性投遞

下面是RabbitMQ和消息所涉及到的一些 術語
  • 生產(Producing)的意思就是發(fā)送:發(fā)送消息的程序就是一個生產者(producer),一般用"P"來表示:

一文讀懂RabbitMQ消息隊列
  • 隊列(queue)就是存在于RabbitMQ中郵箱的名稱:雖然消息的傳輸經過了RabbitMQ和你的應用程序,但是它只能被存儲于隊列當中。實質上隊列就是個巨大的消息緩沖區(qū),它的大小只受主機內存和硬盤限制。多個生產者(producers)可以把消息發(fā)送給同一個隊列,同樣,多個消費者(consumers)也能夠從同一個隊列(queue)中獲取數據。隊列可以繪制成這樣(圖上是隊列的名稱):

一文讀懂RabbitMQ消息隊列
  • 消費(Consuming)和接收(receiving)是同一個意思:一個消費者(consumer)就是一個等待獲取消息的程序。把它繪制為"C":

一文讀懂RabbitMQ消息隊列
以php框架yii2為參照
  1. 簡單隊列模式(simple queue)

發(fā)送 單個消息的生產者,以及 接收消息并將其 打印出來的消費者。將忽略RabbitMQ API中的一些細節(jié)。 在下圖中,“P”是生產者,“C”是消費者。中間的框是一個隊列(保存消息的地方)
一文讀懂RabbitMQ消息隊列

(1).生產者發(fā)布消息步驟

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 創(chuàng)建連接
$connection = new AMQPStreamConnection($host, $port, $user, $pass, $v_host="/");

// 創(chuàng)建channel
$channel = $connection->channel();

// 初始化隊列,并持久化(聲明隊列)
$channel->queue_declare($queue_name, false, true, false, false);

//消息內容
$data = "this is message2";
// 聲明消息,并持久化(創(chuàng)建消息)
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

// 把消息推到隊列里(發(fā)布消息)
$channel->basic_publish($mes, '', $queue_name);

//關閉通道和連接
$channel->close();
$connection->close();
上面聲明隊列方法queue_declare()參數詳解
一文讀懂RabbitMQ消息隊列

(2).消費者消費消息步驟

//核心代碼
basic_consume($queue = '', $consumer_tag = '', $no_local = false,$no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
上面消費消息方法basic_consume()參數詳解
一文讀懂RabbitMQ消息隊列

(3).具體代碼展示

rabbitmq配置
"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 賬戶
        'pass' => 123456,  // 密碼
        "v_host" => "order",  // 對應Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 隊列名稱
    ],
]
生產者代碼
<?php
/**
 * 生產者生產消息
 */

namespace console\controllers\simple;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 隊列名稱
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 初始化隊列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        //消息
        $data = "this is message2";
        // 聲明消息,并持久化
        $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
        // 把消息推到隊列里
        $channel->basic_publish($mes, '', $queue_name);
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
消費者代碼
<?php
/**
 * 消費者消費消息
 */

namespace console\controllers\simple;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation=false;
    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 隊列名稱
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 初始化隊列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        // 消費消息
        $callback = function ($msg) {
            echo "reviced: " . $msg->body . "\n";
        };
        $channel->basic_consume($queue_name, "", false, true, false, false,$callback);
        // 監(jiān)控
        while ($channel->is_open()){
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
  1. 工作隊列模式(worker queue)

創(chuàng)建一個 工作隊列(Work Queue),它會發(fā)送一些 耗時的任務多個工作者(Worker),工作隊列(又稱: 任務隊列——Task Queues)是為了 避免等待一些占用大量資源、時間的操作,當把任務(Task)當作消息發(fā)送到隊列中,一個運行在后臺的工作者(worker)進程就會取出任務然后處理,當運行多個工作者(workers),任務就會在它們之間 共享。這個概念在網絡應用中是非常有用的,它可以在短暫的HTTP請求中處理一些復雜的任務,使用工作隊列的一個 好處就是它能夠 并行的處理隊列。如果堆積了很多任務,只需要添加 更多的工作者(workers)就可以了,這就是所謂的 循環(huán)調度,擴展很簡單
一文讀懂RabbitMQ消息隊列

(1).具體代碼展示

rabbitmq配置
"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 賬戶
        'pass' => 123456,  // 密碼
        "v_host" => "order",  // 對應Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 隊列名稱
        "name2" => "task_queue",  // 隊列名稱
    ],
]
生產者代碼
<?php
/**
 * 生產者生產消息
 */

namespace console\controllers\worker;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 隊列名稱
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 初始化隊列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);

        // 生產多條消息
        for ($i = 0; $i <= 10; ++$i) {
            //消息
            $data = "this is " . $i. " message";
            // 聲明消息,并持久化
            $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
            // 把消息推到隊列里
            $channel->basic_publish($mes, '', $queue_name);
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
消費者代碼
當生產者生產了多條費時的消息時,一個消費者不能滿足需要,可以添加多個消費者處理生產者的消息,多個消費者之間采用 輪詢的方式獲取隊列的消息,并把該消息發(fā)送給對應的用戶
比如:可以對一個隊列的消息開多個消費者,這里我們開了兩個消費者,里面的代碼都是一致的
<?php
/**
 * 消費者消費消息
 */

namespace console\controllers\worker;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation=false;
    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 隊列名稱
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 初始化隊列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        // 消費消息
        $callback = function ($msg) {
            echo "reviced: " . $msg->body . "\n";
        };
        $channel->basic_consume($queue_name, "", false, true, false, false,$callback);
        // 監(jiān)控
        while ($channel->is_open()){
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
<?php
/**
 * 消費者2消費消息
 */

namespace console\controllers\worker;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class Consumer2Controller extends Controller
{
    public $enableCsrfValidation=false;
    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 隊列名稱
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 初始化隊列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        // 消費消息
        $callback = function ($msg) {
            echo "reviced: " . $msg->body . "\n";
        };
        $channel->basic_consume($queue_name, "", false, true, false, false,$callback);
        // 監(jiān)控
        while ($channel->is_open()){
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
  1. ACK消費確認機制&NACK&重回隊列機制

ACK消費確認機制

當處理一個 比較耗時得任務的時候,想知道消費者(consumers) 運行到一半就掛掉時, 正在處理的消息/發(fā)送給當前工作者的消息會怎樣,當消息在隊列中 沒有進行持久化操作時,消息被RabbitMQ發(fā)送給消費者(consumers)之后,馬上就會在內存中 移除。這種情況,只要把一個 工作者(worker)停止,正在處理的消息就會 丟失。同時,所有發(fā)送到這個工作者的還沒有處理的消息 都會丟失。所以,如果不想消息丟失,當一個工作者(worker)掛掉了,希望任務會重新發(fā)送給其他的工作者(worker),RabbitMQ就提供了 消息響應acknowledgments)。消費者會通過一個 ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然后RabbitMQ就會 釋放并刪除這條消息。如果消費者(consumer) 掛掉了, 沒有發(fā)送響應,RabbitMQ就會認為消息 沒有被完全處理,然后 重新發(fā)送其他消費者(consumer)。這樣,即使工作者(workers)偶爾的掛掉,也不會丟失消息。消息是沒有超時這個概念的,當工作者與它斷開連的時候,RabbitMQ會重新發(fā)送消息,這樣在處理一個耗時非常長的消息任務的時候就不會出問題了。在該講解中,將使用手動消息確認,通過為 no_ack參數傳遞 false,一旦有任務完成,使用d.ack()(false)向RabbitMQ服務器發(fā)送消費完成的確認(這個
確認消息是單次傳遞的)
一文讀懂RabbitMQ消息隊列
//核心代碼
basic_consume($queue = '', $consumer_tag = '', $no_local = false, $no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
// 消費消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
// 消費ack
$msg->ack();
};
// 第四個參數: 需要ack確認,這里我們在callback手動確認
$channel->basic_consume($queue_name, "", false, false, false, false,$callback);
一文讀懂RabbitMQ消息隊列
rabbitmq配置
"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 賬戶
        'pass' => 123456,  // 密碼
        "v_host" => "order",  // 對應Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 隊列名稱
        "name2" => "task_queue",  // 隊列名稱
        "name3" => "task_ack",  // 隊列名稱
    ],
]
生產者代碼
<?php
/**
 * 生產者生產消息
 */

namespace console\controllers\ack;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 隊列名稱
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 初始化隊列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);

        // 生產多條消息
        for ($i = 0; $i <= 10; ++$i) {
            //消息
            $data = "this is " . $i. " message";
            // 聲明消息,并持久化
            $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
            // 把消息推到隊列里
            $channel->basic_publish($mes, '', $queue_name);
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
消費者代碼
<?php
/**
 * 消費者消費消息
 */

namespace console\controllers\ack;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation=false;
    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 隊列名稱
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 初始化隊列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        // 消費消息
        $callback = function ($msg) {
            echo "reviced: " . $msg->body . "\n";
            // 消費ack
            $msg->ack();
        };
        // 第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者
        $channel->basic_qos(null, 1, null);
        // 第四個參數: 需要ack確認,這里我們在callback手動確認
        $channel->basic_consume($queue_name, "", false, false, false, false,$callback);
        // 監(jiān)控
        while ($channel->is_open()){
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}

NACK&重回隊列機制

當設置了方法 basic_consume$no_ack = false 時,使用手工 ACK 方式,除了ACK外,其實還有 NACK 方式,當手工 AcK 時,會發(fā)送給Broker( 服務器)一個應答,代表消息處理成功,Broker就可回送響應給生產端 .
NACK 則表示消息處理失敗,如果設設置了重回隊列, Broker 端就會將沒有成功處理的消息重新發(fā)送
通俗來講:
手工ACK:消費成功了,向發(fā)起者確認
NACK:消費失敗,讓生產者重新發(fā)
一般在實際應用中,都會關閉重回隊列,也就是設置為false
使用方式
  • 消費端消費時.如果由于業(yè)務異常,可以手工 NACK 記錄日志,然后進行補償

  • API :basic_nack($delivery_tag, $multiple = false, $requeue = false)

  • 如果由于服務器宕機等嚴里問題,就需要手工 ACK 保障消費端消費成功

  • API :basic_ack($delivery_tag, $multiple = false)

4.消息持久化

如果沒有特意告訴RabbitMQ,那么在它退出或者崩潰的時候,將會丟失所有隊列和消息,為了確保消息不會丟失,有兩個事情是需要注意的:必須把“隊列”“消息”設為持久化首先,為了不讓隊列消失,需要把隊列聲明為持久化(durable),但這里面會有一定的問題,它會返回一個錯誤,可以使用一個快捷的解決方法——用不同名字的隊列,例如task_queue

代碼如上面生產者/消費者所示:

// 初始化隊列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
消息持久化配置: " delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT

5.公平調度(限流機制)

為什么要限流

  • 當工作者處理消息時,會出現這么一個問題:比如有兩個工作者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕松,然而RabbitMQ并不知道這些,它仍然一如既往的派發(fā)消息,這時因為RabbitMQ只管分發(fā)進入隊列的消息,不會關心有多少消費者(consumer)沒有作出響應,它盲目的把第n-th條消息發(fā)給第n-th個消費者

  • 假設還有這樣的場景:RabbitMQ服務器有上萬條未處理的消息,隨便打開一個消費者Client ,會造成巨量的消息瞬間全部推送過來,然而單個客戶端無法同時處理這么多數據,此時很有可能導致服務器崩潰,嚴重的可能導致線上的故障

  • 還有一些其他的場景:比如說單個 生產端一分鐘產生了幾百條數據,但是單個消費端 一分鐘可能只能處理 60 條,這個時候生產端-消費端肯定是不平衡的,通常生產端是沒辦法做限制的,所以消費端肯定需要做一些限流措施,否則如果超出最大負載,可能導致 消費端 性能下降,服務器卡頓甚至崩潰等一系列嚴重后果

RabbitMQ 提供了一種 qos (服務質質量保證)功能,即在 非自動確認消息的前提下,如果一定數目的消息 ( 通過基于 生產者或者 channel設置 Qos 的值) 未被確認前,不消費新的消息
不能設置自動簽收功能( auto_ack = false ),如果消息未被確認,就不會到達 消費端 ,目的就是給 生產端 減壓
一文讀懂RabbitMQ消息隊列

這是可以設置預取計數值為1,告訴RabbitMQ一次只向一個worker發(fā)送一條消息,換句話說,在處理并確認前一個消息之前,不要向工作人員發(fā)送新消息

如上面ACK消費確認機制中消費者代碼:

// 第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者
//basic_qos($prefetch_size, $prefetch_count, $a_global)
$channel->basic_qos(null, 1, null);
參數說明:
$prefetch_size:單條消息的大小限制, 通常設置為 0 ,表示不做限制
$prefetch_count:一次最多能處理多少條消息
$a_global:是否將上面設置: true 應用于 channel 級別, false 代表消費者級別
$prefetch_size,$a_global這兩項, RabbitMQ 沒有實現,暫且不研究.$prefetch_count 在auto_ack = f alse 的情況下生效,即在自動應答的情況下該值無效
一文讀懂RabbitMQ消息隊列

6.冪等性概念

一句話概括: 用戶對于同一操作發(fā)起的一次請求或者多次請求的結果是一致的
比如:對一個SQL執(zhí)行100次1000次,我們可以借鑒數據庫的樂觀鎖機制:比如我們執(zhí)行一條更新庫存的SQL語句:update T_reps set count = count -1, version = version +1 where version = 1,數據庫的樂觀鎖在執(zhí)行更新操作前一先去數據庫查詢此version ,然后執(zhí)行更新語句,以此version作為條件,如果執(zhí)行更新時有其他人先更新了這張表的數據,那么這個條件就不生效了,也就不會執(zhí)行操作了,通過這種 樂觀鎖的機制來保障幕等性

消費端 - 冪等性保障

在海量訂單產生的業(yè)務高峰期,如何避免 消息的重復消費問題?
在業(yè)務高峰期:容易產生 消息重復消費問題,當消費端消費完消息時,在給生產者端返回ack時由于 網絡中斷,導致生產端 未收到確認信息,該條消息就會 重新發(fā)送被消費端消費,但實際上該消費端已成功消費了該條消息,這就造成了重復消費.而 消費端實現 冪等性,就意味著:消息不會被多次消費,即使收到了很多一樣的消息

業(yè)界主流的冪等性操作解決方案:

(1)唯一Id + 指紋碼 機制,核心:利用數據庫主鍵去重
  • 唯一Id: 業(yè)務表主鍵

  • 指紋碼: 為了區(qū)別每次正常操作的碼,每次操作時生成指紋碼;可以用時間截+業(yè)務編號或者標志位(具體視業(yè)務場景而定 )

select count(1) from t_order where id = 唯一Id + 指紋碼

  • 優(yōu)勢:實現簡單

  • 弊端:高并發(fā)下有數據庫寫入的性能瓶頸

  • 解決方案:根據ID進行分庫分表進行算法路由

(2)利用Redis的原子性去實現
  • 第一:是否要進行數據落庫,如果落庫的話,關鍵解決的問題是數據庫和緩存如何做到原子性?

  • 第二:如果不進行落庫,那么都存儲到緩存中,如何設置定時同步的策略?

7.return機制

  • Return Listener用于處理一些不可路由的消息,也是生產階段添加的一個監(jiān)聽

  • 消息生產者通過指定一個Exchange和Routing Key,把消息送達到某一個隊列中去,然后消費者監(jiān)聽隊列,進行消費處理操作

  • 但是在某些情況下,如果在發(fā)送消息的時候,當前的Exchange不存在或者指定的路由key路由不到,這個時候如果需要監(jiān)聽這種不可達的消息,就要使用Return Listener

  • 在API中有一個關鍵的配置項 Mandatory:如果為true,則監(jiān)聽器會接收到路由不可達的消息,然后進行后續(xù)處理,如果為false,那么Broker(服務器)會自動刪除該消息

一文讀懂RabbitMQ消息隊列

8.消息的可靠性投遞

(1).什么是生產端的可靠性投遞?

  • 保障消息的成功發(fā)出

  • 保障MQ節(jié)點的成功接收

  • 發(fā)送端收到MQ節(jié)點(Broker)確認應答

  • 完善的消息進行補償機制(在大廠一般都不會加事務,都是進行補償操作)

在實際生產中,很難保障前三點的完全可靠,比如在 極端的環(huán)境中,生產者發(fā)送消息失敗了,發(fā)送端在接受確認應答時突然發(fā)生網絡閃斷等等情況,很難保障可靠性投遞,所以就需要有第四點完善的 消息補償機制

(2).解決方案

方案一 消息信息落庫,對消息狀態(tài)進行打標(常見方案
將消息持久化到 DB中并設置狀態(tài)值,收到 Consumer 的應答就改變當前記錄的狀態(tài)
再輪詢重新發(fā)送沒接收到應答的消息,注意這里要設置重試次數
一文讀懂RabbitMQ消息隊列
方案實現流程
比如下單成功

步驟1

對訂單數據入ORDER_DB 訂單庫,并對因此生成的業(yè)務消息入 MSG_DB 消息庫,此處由于采用了兩個數據庫,需要兩次持久化操作,為了保證數據的一致性,有人可能就想采用分布式事務,但在大廠實踐中,基本都是采用補償機制

這里一定要保證步驟1中消息都存儲成功了,沒有出現任問異常情況,然后生產端再進行消息發(fā)送.如果失敗了就進行快速失敗機制

對業(yè)務數據和消息入庫完畢就進入步驟2

步驟2

發(fā)送消息到MQ服務上,如果一切正常無誤消費者監(jiān)聽到該消息,進入步驟3

步驟3

生產端有一個 confi rm Listener ,異步監(jiān)聽 Broker(服務端) 回送的響應,從而判斷消息是否投遞成功

步驟4

如果成功,去數據庫查詢該消息.并將消息狀態(tài)更新為 1

步驟5

如果出現意外情況,消費者未接收到或者Listener 接收確認時發(fā)生網絡閃斷,導致生產端的Listener 就永遠收不到這條消息的 confi rm應答了,也就是說這條消息的狀態(tài)就一直為0 了,這時候就需要用到分布式定時任務來從 MSG_DB 數據庫抓取那些超時了還未被消費的消息,重新發(fā)送一遍。此時需要設置一個規(guī)則,比如說消息在入庫時候設置一個臨界值 timeout , 5 分鐘之后如果還是0的狀態(tài)那就需要把消息抽取出來。這里使用的是分布式定時任務,去定時抓取 MSG_DB中距離消息創(chuàng)建時間超過 5 分鐘的且狀態(tài)為0 的消息

步驟6

把抓取出來的消息進行重新投遞( Retry Send ) ,也就是從第二步開始繼續(xù)往下走

步驟7

當然有些消息可能就是由于一些實際的問題無法路由到 Broker ,比如Routing Key設置不對,對應的隊列被誤刪除了,那么這種消息即使重試多次也仍然無法投遞成功,所以需要對重復次數做限制,比如限制 3 次,如果投遞次數大于3次,那么就將消息狀態(tài)更新為 2 ,表示這個消息最終投遞失敗,然后通過補償機制,人工去處理,實際生產中.這種情況還是比較少的,但是不能沒有這個補償機制,要不然就做不到可靠性了

缺點
在第一步需要更新或者插入操作數據庫2次
優(yōu)化
不需要消息進行持久化 只需要業(yè)務持久化
方案二 消息的延遲投遞,做二次確認,回調檢查(不常用,大廠在用的高并發(fā)方案)
一文讀懂RabbitMQ消息隊列
方案實現流程

步驟1

(上游服務: Upstream service )業(yè)務入庫,然后send 消息到broker,這兩步是有先后順序的

步驟2

進行消息延遲發(fā)送到新的隊列(延遲時間為 5 分鐘:業(yè)務決定)

步驟3

(下游服務: Downstream service )監(jiān)聽到消息然后處理消息

步驟4

下游服務 send confirm生成新的消息到 broker (這里是一個新的隊列 )

步驟5

callback service 去監(jiān)聽這個消息,并且入庫,如果監(jiān)聽到,表示這個消息已經消費成功

步驟6

callback service 去檢查 步驟2投遞的延遲消息是否 在msgDB里面是否消費成功,不存在或者消費失敗就會 Resend command

如果在第 1 , 2 , 4 步失敗,如果成功broker 會給一個 confirm ,失敗當然沒有,這是消息可靠性投遞的里要保障

9.注意

關于隊列大小: 如果所有的工作者都處理繁忙狀態(tài),隊列就會被填滿,需要留意這個問題,要么添加更多的工作者(workers),要么使用其他策略

六.RabbitMQ幾種常見的交換器模式

1.消息模型基本介紹

前面的教程中,講的是 發(fā)送消息到隊列并從中取出消息,現在介紹RabbitMQ中 完整的消息模型

簡單的概括一下之前講的:

  • 發(fā)布者(producer)是發(fā)布消息的應用程序

  • 隊列(queue)用于消息存儲的緩沖

  • 消費者(consumer)是接收消息的應用程序

RabbitMQ消息模型的 核心理念是:發(fā)布者(producer) 不會直接發(fā)送任何消息給 隊列,事實上,發(fā)布者(producer)甚至不知道消息是否已經被投遞到隊列。發(fā)布者(producer)只需要 把消息發(fā)送給一個交換機(exchange),交換機非常簡單,它一邊從發(fā)布者方 接收消息,一邊把消息 推送到隊列,交換機 必須知道如何處理它接收到的消息,是應該 路由指定的隊列還是是 多個隊列,或者是直接 忽略消息,這些規(guī)則是通過交換機類型(exchange type)來定義的
一文讀懂RabbitMQ消息隊列

有幾個可供選擇的交換器類型:direct, topic, headers和fanout

direct(直連/定向交換器)

消息與一個特定的路由鍵完全匹配

topic(主題交換器)

使用通配符*,#,讓路由鍵和某種模式進行匹配

headers(頭交換器)

不處理路由鍵,而是根據發(fā)送的消息內容中的 headers 屬性進行匹配

fanout(扇型交換器)

發(fā)布/ 訂閱模式可以理解為廣播模式:即exchange會將消息轉發(fā)到所有綁定到這個exchange的隊列上,這種類型在發(fā)送消息,queue bind時,都將忽略route key,也就是說不需要設置 route key

Routing Key(路由鍵): 生產者將消息發(fā)送給交換器,一般都會指定一個Routing Key,用來指定這個消息的路由規(guī)則,而這個Routing Key需要與交換器類型和綁定鍵(Binding Key)聯(lián)合使用才能生效

Binding(綁定):它是Exchange與Queue之間的虛擬連接,通俗的講就是交換器和隊列之間的聯(lián)系(這個隊列(Queue)對這個交換機(Exchange)的消息感興趣),實現了根據不同的Routing Key(路由規(guī)則),交換機將消息路由(發(fā)送)到對應的Queue上

2.交換器核心方法

//試探性申請一個交換器,若該交換器存在,則跳過,不存在則創(chuàng)建
exchange_declare($exchange,$type,$passive = false,$durable = false,$auto_delete = true,$internal = false,$nowait = false,$arguments = array(),$ticket = null)

參數名

默認值

解釋

$exchange

交換器名稱

$type

交換器類型:

’’ 默認交換機 匿名交換器 未顯示聲明類型都是該類型

fanout 扇形交換器 會發(fā)送消息到它所知道的所有隊列,每個消費者獲取的消息都是一致的

headers 頭部交換器

direct 直連交換器,該交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配

topic 主題交換器 該交換機會對路由鍵正則匹配,必須是*(一個單詞)、#(多個單詞,以.分割) ,eg:user.key .abc.* 類型的key

$passive

false

只判斷不創(chuàng)建(一般用于判斷交換器是否存在)

true:

1.如果exchange已存在則直接連接并且不檢查配置比如已存在的exchange是fanout,新需要建立的是direct,也不會報錯;

2.如果exchange不存在則直接報錯

false:

1.如果exchange不存在則創(chuàng)建新的exchange

2.如果exchange已存在則判斷配置是否相同,如果配置不相同則直接報錯,比如已存在的exchange是fanout,新需要建立的是direct,會報錯。

$durable

false

設置是否持久化,設置為true,表示持久化,反之非持久化,持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關信息

$auto_delete

true

設置是否自動刪除,設置為true時,表示自動刪除。自動刪除的前提:至少有一個隊列或者交換器與這個交換器綁定,之后所有與這個交換器綁定的隊列或者交換器都與此解綁

$internal

false

設置是否為內置的,設置為true表示是內置的交換器,客戶端程序無法直接發(fā)送消息到這個交換器,只能通過交換器路由到這個交換器

$nowait

false

如果為true,表示不等待服務器回執(zhí)信息,函數將返回null,可以提高訪問速度

$arguments

array()

其他結構化參數

$ticket

null

3.fanout模式(廣播模式)

廣播模式可以理解為:發(fā)布/訂閱模式,即exchange會將消息轉發(fā)到所有綁定到這個exchange的隊列上。針對這種廣播模式,在發(fā)送消息,queue bind時,都將忽略route key,也就是說不需要設置 route key

案例一

一個生產者生產消息并發(fā)布消息到交換器上,多個消費者訂閱該交換器,并與之隊列綁定,消費消息
rabbitmq配置
 "rabbitMq" => [
        "base" => [
            'host' => '192.168.0.5',  // host地址
            'port' => 5672,  // 端口
            "user" => "user",  // 賬戶
            'pass' => 123456,  // 密碼
            "v_host" => "order",  // 對應Virtual Hosts
        ],
        "exchange_name" => [
            "name1" => "exch", // 交換器名稱
        ],
]
生產者
<?php
/**
 * 交換器fanout(廣播)模式: 生產者生產消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明并初始化交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 聲明一個數據
        $data = "this is a exchange message";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 發(fā)布消息到交換器
        $channel->basic_publish($msg, $exchangeName);

        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
消費者(多個)

消費者1

<?php
/**
 *  交換器fanout(廣播)模式: 消費者消費消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
//        $queueName = $rabbitMqConfig["queue_name"]["name4"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明對應的交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 聲明隊列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交換機與隊列綁定
        $channel->queue_bind($queueName, $exchangeName);
        // 消息回調處理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
        $channel->basic_qos(null, 1, null);
        // 消費者消費消息: 第四個參數: 需要ack確認
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 監(jiān)控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}

消費者2

<?php
/**
 *  交換器fanout(廣播)模式: 消費者消費消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class Consumer2Controller extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
//        $queueName = $rabbitMqConfig["queue_name"]["name4"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明對應的交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 聲明隊列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交換機與隊列綁定
        $channel->queue_bind($queueName, $exchangeName);
        // 消息回調處理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
        $channel->basic_qos(null, 1, null);
        // 消費者消費消息: 第四個參數: 需要ack確認
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 監(jiān)控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}

案例二

舉個實際應用的場景:比方說用戶注冊(注銷,更改姓名等)新浪,同時需要開通微博、博客、郵箱等,如果不采用隊列,按照常規(guī)的線性處理,可能注冊用戶會特別的慢,因為在注冊的時候,需要調各種其他服務器接口,如果服務很多的話,可能客戶端就超時了。如果采用普通的隊列,可能在處理上也會特別的慢(不是最佳方案),如果采用訂閱模式,則是最優(yōu)的選擇
處理過程如下:
  1. 用戶提交username、pwd…等之類的基本信息,將數據提交register.php中

  1. register.php對數據進行校驗,符合注冊要求,生成uid,并將和基本信息json后,發(fā)布一條消息到對應的交換器中,同時直接顯示用戶注冊成功

  1. exchange中的多個隊列,如(queue.process、queue.boke、queue.weibo、queue.email)都訂閱了這個消息,根據各業(yè)務自身的邏輯來處理

總結:

1.不申明隊列,因為發(fā)布/訂閱模式下,是可以隨時添加新的訂閱隊列

2.exchange的Type指定為fanout(廣播模式)

3.隊列不需要指定route key,綁定exchange

代碼如下:
生產者
<?php
/**
 * 交換器fanout(廣播)模式: 生產者生產消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public function actionIndex()
    {
? ? ? ? /*
? ? ? ? ? ? 用戶注冊邏輯
? ? ? ? */
? ? ? ? 
? ? ? ? //發(fā)送消息邏輯
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = "register";
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明并初始化交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 聲明一個數據
        $data = "{uid:xxx,reg_time:xxx}";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 發(fā)布消息到交換器
        $channel->basic_publish($msg, $exchangeName);

        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
消費者(多個)
可以創(chuàng)建多個不同類型的消費者(開通微博、博客、郵箱)等邏輯功能的消費者
<?php
/**
 *  交換器fanout(廣播)模式: 消費者消費消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = "register";
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明對應的交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 聲明一個匿名隊列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交換機與隊列綁定
        $channel->queue_bind($queueName, $exchangeName);
        // 消息回調處理
        $callback = function ($meg) {
? ? ? ? ? ? //處理邏輯
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
        $channel->basic_qos(null, 1, null);
        // 消費者消費消息: 第四個參數: 需要ack確認
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 監(jiān)控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}

4.Direct模式

Direct交換器將消息投遞到路由參數 完全匹配的隊列中
一文讀懂RabbitMQ消息隊列

直接上代碼

rabbitmq配置

 "rabbitMq" => [
        "base" => [
            'host' => '192.168.0.5',  // host地址
            'port' => 5672,  // 端口
            "user" => "user",  // 賬戶
            'pass' => 123456,  // 密碼
            "v_host" => "order",  // 對應Virtual Hosts
        ],
        "queue_name" => [
            "name1" => "goods",  // 隊列名稱
            "name2" => "task_queue",  // 隊列名稱
            "name3" => "task_ack",  // 隊列名稱
            "name4" => "exchange_fanout_1",  // 隊列名稱
        ],
        "exchange_name" => [
            "name1" => "exch", // 交換器名稱
            "name2" => "exch_direct_log", // 交換器名稱
        ],
        "routing_key" => [
            "info_key" => "info",  // 路由鍵
            "error_key" => "error",  // 路由鍵
            "warn_key" => "warn",  // 路由鍵
        ],
】
生產者
<?php
/**
 * 交換器direct(routing_key-更詳細的bind模式)模式: 生產者生產消息
 */

namespace console\controllers\exchange\direct;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
        $routingKey = $rabbitMqConfig["routing_key"]["error_key"];
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明并初始化交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
        // 聲明一個數據
        $data = "this is a ". $routingKey . " message";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 發(fā)布消息到交換器, 并和路由鍵匹配
        $channel->basic_publish($msg, $exchangeName, $routingKey);

        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
消費者
<?php
/**
 *  交換器direct(routing_key-更詳細的bind模式)模式: 消費者消費消息
 */

namespace console\controllers\exchange\direct;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerWarnController extends Controller
{
   

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
        $routingKey = $rabbitMqConfig["routing_key"]["warn_key"]; //路由鍵可以修改為其他key,與生產者bind的關聯(lián)
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明對應的交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
        // 聲明一個匿名隊列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交換機與隊列綁定,并指定routing_key
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
        // 消息回調處理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
        $channel->basic_qos(null, 1, null);
        // 消費者消費消息: 第四個參數: 需要ack確認
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 監(jiān)控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}

5.topic模式

發(fā)送到topic交換器的消息不可以攜帶隨意routing_key,它的routing_key必須是一個由 .分隔開的 詞語列表,這些單詞隨便是什么都可以,但是最好是跟攜帶它們的消息有關系的詞匯,以下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",詞語的個數可以隨意,但是 不要超過255字節(jié)。binding key也必須擁有同樣的格式,topic交換器背后的邏輯跟direct交換機很相似 : 一個攜帶著特定routing_key的消息會被topic交換機投遞給綁定鍵與之想匹配的隊列,但是它的binding key和routing_key有兩個特殊應用方式:
  • * (星號) 用來表示一個單詞

  • # (井號) 用來表示任意數量(零個或多個)單詞

下邊用圖說明:

這個例子里,發(fā)送的所有消息都是用來描述小動物的,發(fā)送的消息所攜帶的路由鍵是由三個單詞所組成的,這三個單詞被兩個.分割開,路由鍵里的第一個單詞

一文讀懂RabbitMQ消息隊列

描述的是動物的手腳的利索程度,第二個單詞是動物的顏色,第三個是動物的種類,所以它看起來是這樣的: <celerity>.<colour>.<species>。

創(chuàng)建了三個綁定:Q1的綁定鍵為 *.orange.*,Q2的綁定鍵為 *.*.rabbit 和 lazy.# 。

  • Q1-->綁定的是

  • 中間帶 orange 帶 3 個單詞的字符串 (*.orange.*)

  • Q2-->綁定的是

  • 最后一個單詞是 rabbit 的 3 個單詞 (*.*.rabbit)

  • 第一個單詞是 lazy 的多個單詞 (lazy.#)

這三個綁定鍵被可以總結為:

  • Q1 對所有的桔黃色動物都感興趣

  • Q2 則是對所有的兔子所有懶惰的動物感興趣

一個攜帶有 quick.orange.rabbit 的消息將會被分別投遞給這兩個隊列,攜帶著 lazy.orange.elephant 的消息同樣也會給兩個隊列都投遞過去。另一方面攜帶有 quick.orange.fox 的消息會投遞給第一個隊列,攜帶有 lazy.brown.fox 的消息會投遞給第二個隊列。攜帶有 lazy.pink.rabbit 的消息只會被投遞給第二個隊列一次,即使它同時匹配第二個隊列的兩個綁定。攜帶著 quick.brown.fox 的消息不會投遞給任何一個隊列。

注意:

如果違反約定,發(fā)送了一個攜帶有一個單詞或者四個單詞("orange" or "quick.orange.male.rabbit")的消息時,發(fā)送的消息不會投遞給任何一個隊列,而且會丟 失掉

但是另一方面,即使 "lazy.orange.male.rabbit" 有四個單詞,他還是會匹配最后一個綁定,并且被投遞到第二個隊列中。

topic交換機是很強大的,它可以表現出跟其他交換機類似的行為 當一個隊列的binding key為 "#"(井號) 的時候,這個隊列將會無視消息的routing key,接收所有的消息。 當 * (星號) 和 # (井號) 這兩個特殊字符都未在binding key中出現的時候,此時Topic交換機就擁有的direct交換機的行為

代碼如下:

rabbitmq配置

"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 賬戶
        'pass' => 123456,  // 密碼
        "v_host" => "order",  // 對應Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 隊列名稱
        "name2" => "task_queue",  // 隊列名稱
        "name3" => "task_ack",  // 隊列名稱
        "name4" => "exchange_fanout_1",  // 隊列名稱
    ],
    "exchange_name" => [
        "name1" => "exch", // 交換器名稱
        "name2" => "exch_direct_log", // 交換器名稱
        "name3" => "exch_topic_log", // 交換器名稱
    ],
    "routing_key" => [
        "info_key" => "info",  // 路由鍵
        "error_key" => "error",  // 路由鍵
        "warn_key" => "warn",  // 路由鍵
        "all_key" => "#",  // 所有的路由鍵
        "user_info" => "user.info", // 路由鍵
        "user_warn" => "user.warn", // 路由鍵
        "user_all" => "user.*", // 匹配以user.開頭的路由鍵
    ],
]
】
生產者
<?php
/**
 * 交換器topic(通配符)模式: 生產者生產消息
 */

namespace console\controllers\exchange\topic;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
  
    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
        $routingKey = $rabbitMqConfig["routing_key"]["user_info"];
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明并初始化交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
        // 聲明一個數據
        $data = "this is a ". $routingKey . " message";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 發(fā)布消息到交換器, 并和路由鍵匹配
        $channel->basic_publish($msg, $exchangeName, $routingKey);

        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
消費者
<?php
/**
 *  交換器topic(通配符)模式: 消費者消費消息
 *
 */

namespace console\controllers\exchange\topic;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
        $routingKey = $rabbitMqConfig["routing_key"]["user_all"];
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明對應的交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
        // 聲明隊列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交換機與隊列綁定,并指定routing_key
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
        // 消息回調處理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
        $channel->basic_qos(null, 1, null);
        // 消費者消費消息: 第四個參數: 需要ack確認
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 監(jiān)控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}

七.死信隊列,延時隊列

1.死信隊列

死信( Dead Letter )是RabbitMQ 中的一種 消息機制,當在消費消息時,如果隊列里的消息出現以下情況:
  • 消息被拒絕

  • 消息在隊列的存活時間超過設置的 TTL 時間

  • TTL (Time To Live),即生存時間

  • RabbitMQ支持消息的過期時間,在消息發(fā)送時可以進行指定

  • RabbitMQ 支持為每個隊列設號消息的超時時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那么消息會被自動清除

  • 消息隊列的消息數量已經超過最大隊列長度

那么該消息將成為“死信”,“死信”消息會被 RabbitMQ進行特殊處理,如果配置了死信隊列信息,那么該消息將會被丟進死信隊列中,如果沒有配置,則該消息將會被丟棄

RabbitMQ 中有一種交換器叫 DLX,全稱為 Dead 一 Letter 一 Exchange ,可以稱之為 死信交換器,當消息在一個隊列中變成死信( dead message )消息之后,它會被重新發(fā)送到另外一個交換器中,這個交換器就是DLX , 綁定在 DLX上的隊列就稱之為死信隊列, 程序就可以監(jiān)聽這個隊列中的消息,并做相應的處理,該特性 可以彌補RabbitMQ3.0以前支持的immediate參數的功能

消息變成死信有以下幾種情況:

  • 消息被拒絕消息

  • TTL 過期(延遲隊列)

  • 隊列達到最大長度

一文讀懂RabbitMQ消息隊列
一文讀懂RabbitMQ消息隊列

2.延時隊列

延時隊列就是用來存放需要在 指定時間被處理的元素的隊列.
一般可以利用 死信隊列的特性實現延遲隊列:只要給消息設置一個過期時間,消息過期就會自動進入死信隊列,消費者只要監(jiān)聽死信隊列就可以實現延遲隊列了

應用場景

  • 訂單在十分鐘之內未支付則自動取消

  • 賬單在一周內未支付,則自動結算

  • 某個時間下發(fā)一條通知

  • 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒

  • 用戶發(fā)起退款,如果三天內沒有得到處理則通知相關運營人員

下面通過一個案例來更進一步了解死信隊列,延時隊列

案例1

訂單在一段時間內未支付則自動取消,步驟:
(1).創(chuàng)建訂單操作
(2).訂單創(chuàng)建成功后,訂單相關數據json處理
(3).構建rabbitmq消息隊列,并設置消息過期時間為60秒,把訂單相關json數據發(fā)布到交換器, 并和路由鍵匹配,生產者生產消息60秒之后,消息會進入到死信隊列,消費者監(jiān)聽死信隊列,處理訂單

rabbitmq配置

"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 賬戶
        'pass' => 123456,  // 密碼
        "v_host" => "order",  // 對應Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 隊列名稱
        "name2" => "task_queue",  // 隊列名稱
        "name3" => "task_ack",  // 隊列名稱
        "name4" => "exchange_fanout_1",  // 隊列名稱
        "name5" => "queue_pay",  // 訂單支付隊列
    ],
    "exchange_name" => [
        "name1" => "exch", // 交換器名稱
        "name2" => "exch_direct_log", // 交換器名稱
        "name3" => "exch_topic_log", // 交換器名稱
        "name4" => "exch_pay", // 訂單支付, 交換器名稱
    ],
    "routing_key" => [
        "info_key" => "info",  // 路由鍵
        "error_key" => "error",  // 路由鍵
        "warn_key" => "warn",  // 路由鍵
        "order_key" => "order_pay",  // 訂單支付
        "all_key" => "#",  // 所有的路由鍵
        "user_info" => "user.info", // 路由鍵
        "user_warn" => "user.warn", // 路由鍵
        "user_all" => "user.*", // 匹配以user.開頭的路由鍵
    ],
    "dead_letter" => [  // 死信隊列
        "exchange_name" => [  // 死信隊列交換機名稱
            "pay" => "dead_exch_pay"
        ],
        "queue_name" => [ // 死信隊列名稱
            "pay" => "dead_queue_pay"
        ],
        "routing_key" => [  // 死信隊列routing名稱
            "pay" => "dead_routing_key_pay"
        ]
    ]
]
生產者
<?php
/**
 * 死信隊列,延時隊列: 生產者推送消息到隊列,模擬訂單支付
 */

namespace console\controllers\exchange\dead;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $deadConfig = Yii::$app->params["rabbitMq"]["dead_letter"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name4"];
        $queueName = $rabbitMqConfig["queue_name"]["name5"];
        $routingKey = $rabbitMqConfig["routing_key"]["order_key"];
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明并初始化交換器, 交換機類型: routing_key-更詳細的bind模式
        $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
        // 消息隊列的額外參數
        $args = new AMQPTable([
            'x-message-ttl' => 2000, // 消息的過期時間
            "x-dead-letter-exchange" => $deadConfig["exchange_name"]["pay"], // 死信隊列交換機名稱
            "x-dead-letter-routing-key" => $deadConfig["routing_key"]["pay"]  // 死信隊列routing名稱
        ]);
        // 聲明隊列
        $channel->queue_declare($queueName, false, true, false, false, false, $args);
        // 交換機和隊列綁定
        $channel->queue_bind($queueName, $exchangeName, $routingKey);

        // 聲明死信交換機
        $channel->exchange_declare($deadConfig["exchange_name"]["pay"], AMQPExchangeType::DIRECT, false, false, false);
        // 聲明死信隊列
        $channel->queue_declare($deadConfig["queue_name"]["pay"], false, true, false, false, false);
         // 死信交換機和隊列綁定
        $channel->queue_bind($deadConfig["queue_name"]["pay"], $deadConfig["exchange_name"]["pay"], $deadConfig["routing_key"]["pay"]);

        // 聲明一個數據:里面可以是用戶訂單相關json數據
        $data = "this is a dead message";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 發(fā)布消息到交換器, 并和路由鍵匹配
        $channel->basic_publish($msg, $exchangeName, $routingKey);

        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
消費者
<?php
/**
 *  死信隊列,延時隊列: 模擬訂單支付,消費者消費消息
 *
 */

namespace console\controllers\exchange\dead;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{  

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["dead_letter"]["exchange_name"]["pay"];
        $queueName = $rabbitMqConfig["dead_letter"]["queue_name"]["pay"];
        $routingKey = $rabbitMqConfig["dead_letter"]["routing_key"]["pay"];
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明對應的交換器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
        // 交換機與隊列綁定,并指定routing_key
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
        // 消息回調處理
        $callback = function ($meg) {
? ? ? ? ? ? //處理訂單相關數據
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
        $channel->basic_qos(null, 1, null);
        // 消費者消費消息: 第四個參數: 需要ack確認
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 監(jiān)控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}

問題

通過上面的案例就可以實現死信隊列,延時隊列操作,上面看上去似乎沒什么問題,實測一下就會發(fā)現 消息不會“如期死亡 。當先生產一個TTL為60s的消息,再生產一個TTL為5s的消息,第二個消息并不會再5s后過期進入死信隊列,而是需要等到第一個消息TTL到期后,與第一個消息一同進入死信隊列, 這是因為RabbitMQ 只會判斷隊列中的第一個消息是否過期

那么怎么來解決這個問題呢?

通過 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件來解決
此插件的原理是將消息在交換機處暫存儲在mnesia(一個分布式數據系統(tǒng))表中,延遲投遞到隊列中,等到消息到期再投遞到隊列當中
rabbitmq_delayed_message_exchange插件安裝
(1).下載地址:https://www.rabbitmq.com/community-plugins.html
注意: 要下載與rabbitmq相對應的版本
一文讀懂RabbitMQ消息隊列
(2).把下載的插件放到指定位置
下載的文件為zip格式,將zip格式解壓,插件格式為ez,將文件復制到插件目錄:
  • Linux

/usr/lib/rabbitmq/lib/rabbitmq_server-xxx/plugins
rabbitmq-plugins list
  • Windows

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.17\plugins
(3).啟動插件
  • Linux

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • Windows

一文讀懂RabbitMQ消息隊列
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

啟動信息:文章來源地址http://www.zghlxwxcb.cn/news/detail-430819.html

一文讀懂RabbitMQ消息隊列
(4).查看
進入:http://localhost:15672/#/exchanges
一文讀懂RabbitMQ消息隊列

重構代碼

生產者
生產者實現的關鍵點:
1.在聲明交換機時不在是direct類型,而是x-delayed-message類型,這是由插件提供的類型
2.交換機要增加"x-delayed-type": "direct"參數設置
3.發(fā)布消息時,要在 Headers 中設置x-delay參數,來控制消息從交換機過期時間
<?php
/**
 * 死信隊列,延時隊列插件使用: 模擬訂單支付
 */

namespace console\controllers\exchange\delay;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $delayConfig = Yii::$app->params["rabbitMq"]["delay"];
        $config = $rabbitMqConfig["base"];
        // 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明并初始化交換器, 交換機類型: 延時插件名稱(x-delayed-message)
        $channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
        // 消息隊列的額外參數
        $args = new AMQPTable(["x-delayed-type" => "direct"]);
        // 聲明隊列
        $channel->queue_declare($delayConfig["queue_name"]["pay"], false, true, false, false, false, $args);
        // 交換機和隊列綁定
        $channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);

        // 聲明一個數據
        $data = "this is a dead message";
        // 初始化消息并持久化
        $arr = [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT,
            "application_headers" => new AMQPTable([
                "x-delayed" => 2000  // 過期時間
            ])
        ];
        $msg = new AMQPMessage($data, $arr);
        // 發(fā)布消息到交換器, 并和路由鍵匹配
        $channel->basic_publish($msg,  $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);

        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}
消費者
沒有啥特別的修改
<?php
/**
 *  死信隊列,延時隊列插件使用: 模擬訂單支付
 *
 */

namespace console\controllers\exchange\delay;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相關配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $delayConfig = Yii::$app->params["rabbitMq"]["delay"];
        $config = $rabbitMqConfig["base"];// 創(chuàng)建連接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 創(chuàng)建channel
        $channel = $connection->channel();
        // 聲明并初始化交換器, 交換機類型: 延時插件名稱(x-delayed-message)
        $channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
        // 交換機和隊列綁定
        $channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
        // 消息回調處理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 設置消費者處理消息限制,第二個參數:同一時刻服務器只會發(fā)送1條消息給消費者消費
        $channel->basic_qos(null, 1, null);
        // 消費者消費消息: 第四個參數: 需要ack確認
        $channel->basic_consume($delayConfig["queue_name"]["pay"], '', false, false, false, false, $callback);
        // 監(jiān)控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //關閉通道和連接
        $channel->close();
        $connection->close();
    }
}

到了這里,關于一文讀懂RabbitMQ消息隊列的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • RabbitMq消息模型-隊列消息

    RabbitMq消息模型-隊列消息

    基本模型(SimpleQueue)、工作模型(WorkQueue) 隊列消息特點: 消息不會丟失 并且 有先進先出的順序。 消息接收是有順序的,不是隨機的,僅有一個消費者能拿到數據,而且不同消費者拿不到同一份數據。 基本模型: SimpleQueue 在上圖的模型中,有以下幾個概念: P:為生產

    2024年02月09日
    瀏覽(30)
  • 【RabbitMQ】消息隊列-RabbitMQ篇章

    【RabbitMQ】消息隊列-RabbitMQ篇章

    RabbitMQ是一個開源的 遵循AMQP協(xié)議 實現的基于Erlang語言編寫,支持多種客戶端(語言)。用于在分布式系統(tǒng)中 存儲消息,轉發(fā)消息 ,具有 高可用 , 高可擴性 , 易用性 等特征。 1.1、RabbitMQ—使用場景 一般場景 像一般的下訂單業(yè)務如下圖: 將訂單信息寫入數據庫成功后,發(fā)

    2024年02月12日
    瀏覽(20)
  • 【RabbitMQ筆記10】消息隊列RabbitMQ之死信隊列的介紹

    【RabbitMQ筆記10】消息隊列RabbitMQ之死信隊列的介紹

    這篇文章,主要介紹消息隊列RabbitMQ之死信隊列。 目錄 一、RabbitMQ死信隊列 1.1、什么是死信隊列 1.2、設置過期時間TTL 1.3、配置死信交換機和死信隊列(代碼配置) (1)設置隊列過期時間 (2)設置單條消息過期時間 (3)隊列設置死信交換機 (4)配置的基本思路 1.4、配置

    2024年02月16日
    瀏覽(95)
  • 消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現延遲隊列、整合SpringBoot

    消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現延遲隊列、整合SpringBoot

    1、延遲隊列概念 延時隊列內部是有序的 , 最重要的特性 就體現在它的 延時屬性 上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說, 延時隊列就是用來存放需要在指定時間被處理的元素的隊列。 延遲隊列使用場景: 訂單在十分鐘之內未支付則

    2024年02月22日
    瀏覽(20)
  • RabbitMQ 消息中間件 消息隊列

    RabbitMQ 消息中間件 消息隊列

    RabbitMQ 1、RabbitMQ簡介 RabbiMQ是?Erang開發(fā)的,集群?常?便,因為Erlang天?就是??分布式語?,但其本身并 不?持負載均衡。支持高并發(fā),支持可擴展。支持AJAX,持久化,用于在分布式系統(tǒng)中存儲轉發(fā)消息,在易用性、擴展性、高可用性等方面表現不俗。 2、RabbitMQ 特點 可

    2024年02月03日
    瀏覽(93)
  • 3.精通RabbitMQ—消息隊列、RabbitMQ

    3.精通RabbitMQ—消息隊列、RabbitMQ

    RabbitMQ面試題 (總結最全面的面試題) 入門RabbitMQ消息隊列,看這篇文章就夠了 消息隊列 是一種基于 隊列 ,用于解決 不同進程或應用 之間 通訊 的 消息中間件 。 支持多種 消息傳遞模式 ,如 隊列模型 、 發(fā)布/訂閱模型 等。 業(yè)務解耦 :通過 發(fā)布/訂閱 模式,減少系統(tǒng)的 耦

    2024年02月15日
    瀏覽(19)
  • 消息隊列-RabbitMQ:workQueues—工作隊列、消息應答機制、RabbitMQ 持久化、不公平分發(fā)(能者多勞)

    消息隊列-RabbitMQ:workQueues—工作隊列、消息應答機制、RabbitMQ 持久化、不公平分發(fā)(能者多勞)

    Work Queues— 工作隊列 (又稱任務隊列) 的主要思想是避免立即執(zhí)行資源密集型任務,而不得不等待它完成 。 我們把任務封裝為消息并將其發(fā)送到隊列,在后臺運行的工作進程將彈出任務并最終執(zhí)行作業(yè)。當有多個工作線程時,這些工作線程將一起處理這些任務 。 輪訓分發(fā)消

    2024年02月21日
    瀏覽(23)
  • 一文讀懂什么是軟件供應鏈安全

    一文讀懂什么是軟件供應鏈安全

    今天的大部分軟件并不是完全從頭進行開發(fā)設計的。相反,現在的開發(fā)人員頻繁的依賴一系列第三方組件來創(chuàng)建他們的應用程序。通過使用預構建的庫,開發(fā)人員不需要重新發(fā)明輪子。他們可以使用已經存在的工具,花更多的時間在專有代碼上。這些工具有助于區(qū)分他們的軟

    2024年02月05日
    瀏覽(27)
  • 一文讀懂什么是 Web3 架構

    一文讀懂什么是 Web3 架構

    最近看了一些Web3.0的文章,總結了一些個人的理解: Web3.0 通過區(qū)塊鏈基礎設施管理用戶數據,重構用戶和互聯(lián)網平臺之間的關系和交互,重新定義了互聯(lián)網應用的架構方式和交互模式。 Web 1.0 中,以瀏覽性的網站(只能看)為代表,如Baidu、搜狐、163等。 Web 2.0 中,以可讀

    2024年02月06日
    瀏覽(28)
  • 一文讀懂OSS、NAS、EBS有什么區(qū)別?

    一文讀懂OSS、NAS、EBS有什么區(qū)別?

    近期,AIGC、GPT大模型、數據中臺等熱點話題備受關注,那么具體在不同的行業(yè)場景下,如何選擇對應的存儲介質呢?選型的時候該考慮哪些因素呢? 通過本文主要介紹常用的存儲類型及它們之間的對比差異,輔助幫助大家在不同需求和場景下選擇合適的存儲類型。 存儲類型

    2024年02月16日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包