国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【RabbitMQ】RabbitMQ詳解(一)

這篇具有很好參考價(jià)值的文章主要介紹了【RabbitMQ】RabbitMQ詳解(一)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

RabbitMQ介紹

RabbitMQ是一種開(kāi)源的消息中間件軟件,它實(shí)現(xiàn)了高度可靠的消息傳遞機(jī)制。它基于AMQP(Advanced Message Queuing Protocol)協(xié)議,可以在分布式系統(tǒng)中傳遞、存儲(chǔ)和接收消息。

RabbitMQ提供了一個(gè)可靠的消息隊(duì)列系統(tǒng),用于在應(yīng)用程序之間進(jìn)行異步通信。它的主要特點(diǎn)包括:

消息隊(duì)列:消息發(fā)送者將消息發(fā)布到隊(duì)列中,然后接收者從隊(duì)列中獲取消息進(jìn)行處理。
可靠性:RabbitMQ使用持久化消息以確保消息不會(huì)丟失,并提供了消息確認(rèn)機(jī)制來(lái)確保消息被正確接收。
靈活的路由:RabbitMQ支持多種交換機(jī)類(lèi)型,例如直接交換機(jī)、主題交換機(jī)和扇形交換機(jī),可以根據(jù)消息的路由鍵將消息發(fā)送到指定的隊(duì)列。
高可用性:RabbitMQ支持集群部署,通過(guò)復(fù)制隊(duì)列和交換機(jī)數(shù)據(jù)來(lái)提供高可用性和故障恢復(fù)能力。
擴(kuò)展性:RabbitMQ可以通過(guò)添加多個(gè)節(jié)點(diǎn)來(lái)擴(kuò)展性能和容量,以滿足大規(guī)模應(yīng)用程序的需求。
插件系統(tǒng):RabbitMQ提供了豐富的插件系統(tǒng),可以擴(kuò)展其功能,例如支持消息轉(zhuǎn)換、身份驗(yàn)證和授權(quán)等。

RabbitMQ廣泛應(yīng)用于分布式系統(tǒng)、微服務(wù)架構(gòu)、消息驅(qū)動(dòng)的應(yīng)用程序和異步任務(wù)處理等場(chǎng)景,它提供了一種可靠、靈活和可擴(kuò)展的消息傳遞機(jī)制,幫助開(kāi)發(fā)者構(gòu)建高效的應(yīng)用程序。

四大核心概念

生產(chǎn)者:產(chǎn)生數(shù)據(jù)發(fā)送消息的程序

交換機(jī):是 RabbitMQ 非常重要的一個(gè)部件,一方面它接收來(lái)自生產(chǎn)者的消息,另一方面它將消息 推送到隊(duì)列中。交換機(jī)必須確切知道如何處理它接收到的消息,是將這些消息推送到特定隊(duì)列還是推送到多個(gè)隊(duì)列,亦或者是把消息丟棄,這個(gè)得有交換機(jī)類(lèi)型決定

隊(duì)列:是 RabbitMQ 內(nèi)部使用的一種數(shù)據(jù)結(jié)構(gòu),盡管消息流經(jīng) RabbitMQ 和應(yīng)用程序,但它們只能存儲(chǔ)在隊(duì)列中。隊(duì)列僅受主機(jī)的內(nèi)存和磁盤(pán)限制的約束,本質(zhì)上是一個(gè)大的消息緩沖區(qū)。許多生產(chǎn)者可以將消息發(fā)送到一個(gè)隊(duì)列,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。這就是我們使用隊(duì)列的方式

消費(fèi)者:消費(fèi)與接收具有相似的含義。消費(fèi)者大多時(shí)候是一個(gè)等待接收消息的程序。請(qǐng)注意生產(chǎn)者,消費(fèi)者和消息中間件很多時(shí)候并不在同一機(jī)器上。同一個(gè)應(yīng)用程序既可以是生產(chǎn)者又是可以是消費(fèi)者。

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
Broker:接收和分發(fā)消息的應(yīng)用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租戶(hù)和安全因素設(shè)計(jì)的,把 AMQP 的基本組件劃分到一個(gè)虛擬的分組中,類(lèi)似于網(wǎng)絡(luò)中的 namespace 概念。當(dāng)多個(gè)不同的用戶(hù)使用同一個(gè) RabbitMQ server 提供的服務(wù)時(shí),可以劃分出多個(gè) vhost,每個(gè)用戶(hù)在自己的 vhost 創(chuàng)建 exchange/queue 等

Connection:publisher/consumer 和 broker 之間的 TCP 連接

Channel:如果每一次訪問(wèn) RabbitMQ 都建立一個(gè) Connection,在消息量大的時(shí)候建立 TCP Connection 的開(kāi)銷(xiāo)將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè) thread 創(chuàng)建單獨(dú)的 channel 進(jìn)行通訊,AMQP method 包含了 channel id 幫助客 戶(hù)端和 message broker 識(shí)別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級(jí)的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開(kāi)銷(xiāo)

Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢(xún)表中的 routing key,分發(fā) 消息到 queue 中去。常用的類(lèi)型有:direct (point-to-point),topic (publish-subscribe) and fanout (multicast)

Queue:消息最終被送到這里等待 consumer 取走

Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查詢(xún)表中,用于 message 的分發(fā)依據(jù)

RabbitMQ 入門(mén)案例

Hello RabbitMQ

用 Java 編寫(xiě)兩個(gè)程序。發(fā)送單個(gè)消息的生產(chǎn)者和接收消息并打印出來(lái)的消費(fèi)者

在下圖中,“ P” 是我們的生產(chǎn)者,“ C” 是我們的消費(fèi)者。中間的框是一個(gè)隊(duì)列 RabbitMQ 代表使用者保留的消息緩沖區(qū)

Java 進(jìn)行連接的時(shí)候,需要 Linux 開(kāi)放 5672 端口,否則會(huì)連接超時(shí)
訪問(wèn) Web 界面的端口是 15672,連接服務(wù)器的端口是 5672

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

先創(chuàng)建好 Maven 工程,pom.xml 添入依賴(lài):

    <dependencies>
        <!--rabbitmq 依賴(lài)客戶(hù)端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一個(gè)依賴(lài)-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

    <!--指定 jdk 編譯版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

生產(chǎn)者

創(chuàng)建一個(gè)類(lèi)作為生產(chǎn)者,最終生產(chǎn)消息到 RabbitMQ 的隊(duì)列里

步驟:

  1. 創(chuàng)建 RabbitMQ 連接工廠
  2. 進(jìn)行 RabbitMQ 工廠配置信息
  3. 創(chuàng)建 RabbitMQ 連接
  4. 創(chuàng)建 RabbitMQ 信道
  5. 生成一個(gè)隊(duì)列
  6. 發(fā)送一個(gè)消息到交換機(jī),交換機(jī)發(fā)送到隊(duì)列?!啊?代表默認(rèn)交換機(jī)
package one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//生產(chǎn)者
public class Producer {
    //隊(duì)列名稱(chēng)
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創(chuàng)建鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("62.234.167.47");
        factory.setUsername("root");
        factory.setPassword("123456");

        //2.創(chuàng)建鏈接
        Connection connection = factory.newConnection();
        //3.獲取信道
        Channel channel = connection.createChannel();

        //4.生產(chǎn)一個(gè)隊(duì)列
        /**
         * 參數(shù)說(shuō)明
         * 1.queue 要聲明的隊(duì)列名稱(chēng)
         * 2.durable 是否將隊(duì)列標(biāo)記為持久化 true服務(wù)器重啟后仍然存在 默認(rèn)為false
         * 3.exclusive: 是否將隊(duì)列標(biāo)記為獨(dú)占 ture 只能由聲明它的連接使用 并且在連接關(guān)閉后自動(dòng)刪除 默認(rèn)為false
         * 4.auto_delete 是否將隊(duì)列標(biāo)記為自動(dòng)刪除 如果設(shè)置為T(mén)rue 則在最后一個(gè)消費(fèi)者斷開(kāi)連接后 隊(duì)列將被自動(dòng)刪除 默認(rèn)為false
         * 5 argument 自選參數(shù)
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //5.發(fā)消息
        String message = "Hello RabbitMQ";

        /**
         * 發(fā)送一個(gè)消息
         * 1.發(fā)送到哪個(gè)交換機(jī)
         * 2.路由的key值是哪個(gè)本次是隊(duì)列的名稱(chēng)
         * 3.其他參數(shù)信息
         * 4.發(fā)送消息的消息體
         */
        channel.basicPublish("", QUEUE_NAME,null,message.getBytes());
        System.out.println("消息發(fā)送完畢");
    }
}

消費(fèi)者

package one;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    //隊(duì)列的名稱(chēng)
    public static final String QUEUE_NAME = "hello";
    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("62.234.167.47");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //聲明接收消息
        //聲明接收消息
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };
        //取消消息時(shí)的回調(diào)
        CancelCallback cancelCallback = consumerTag ->{
            System.out.println("消息消費(fèi)被中斷");
        };

        /**
         * 消費(fèi)者消費(fèi)消息
         * 1.消費(fèi)哪個(gè)隊(duì)列
         * 2.消費(fèi)成功之后是否要自動(dòng)應(yīng)答true:代表自動(dòng)應(yīng)答false:代表手動(dòng)應(yīng)答
         * 3.消費(fèi)者未成功消費(fèi)的回調(diào)
         * 4.消費(fèi)者取消消費(fèi)的回調(diào)
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

值得一提的是,basicConsume 的參數(shù)中,第三個(gè)和第四個(gè)參數(shù)都是接口,所以需要實(shí)現(xiàn)該接口的方法
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

Work Queues

Work Queues 是工作隊(duì)列(又稱(chēng)任務(wù)隊(duì)列)的主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個(gè)工作線程時(shí),這些工作線程將一起處理這些任務(wù)。

輪詢(xún)消費(fèi)

輪詢(xún)消費(fèi)消息指的是輪流消費(fèi)消息,即每個(gè)工作隊(duì)列都會(huì)獲取一個(gè)消息進(jìn)行消費(fèi),并且獲取的次數(shù)按照順序依次往下輪流。

案例中生產(chǎn)者叫做 Task,一個(gè)消費(fèi)者就是一個(gè)工作隊(duì)列,啟動(dòng)兩個(gè)工作隊(duì)列消費(fèi)消息,這個(gè)兩個(gè)工作隊(duì)列會(huì)以輪詢(xún)的方式消費(fèi)消息。

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
首先把 RabbitMQ 的配置參數(shù)封裝為一個(gè)工具類(lèi):RabbitMQUtils

package utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtils {

    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("62.234.167.47");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();

        return connection.createChannel();
    }
}

先創(chuàng)建工作隊(duì)列

package two;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Work01 {
    //隊(duì)列的名稱(chēng)
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        //回調(diào)函數(shù) 消息的接收
        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("接收到的消息:" + new String(message.getBody()));
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消息被消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };

        /**
         * 消費(fèi)者消費(fèi)消息
         * 1.消費(fèi)哪個(gè)隊(duì)列
         * 2.消費(fèi)成功之后是否要自動(dòng)應(yīng)答true:代表自動(dòng)應(yīng)答false:代表手動(dòng)應(yīng)答
         * 3.消費(fèi)者未成功消費(fèi)的回調(diào)
         * 4.消費(fèi)者取消消費(fèi)的回調(diào)
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

啟動(dòng)兩個(gè)工作線程

創(chuàng)建生產(chǎn)者

package two;

import com.rabbitmq.client.Channel;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Task01 {

    public static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        //隊(duì)列的聲明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //發(fā)送消息
        //從控制臺(tái)獲取信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.nextLine();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息發(fā)送完成:" + message);
        }
    }
}

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
通過(guò)程序執(zhí)行發(fā)現(xiàn)生產(chǎn)者總共發(fā)送 6 個(gè)消息,消費(fèi)者 first 和消費(fèi)者 second 分別分得兩個(gè)消息,并且是按照有序的一個(gè)接收一次消息

RabbitMQ消息應(yīng)答與發(fā)布

消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成了部分突然它掛掉了,會(huì)發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消息標(biāo)記為刪除。在這種情況下,突然有個(gè)消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù)發(fā)送給該消費(fèi)者的消息,因?yàn)樗鼰o(wú)法接收到。

為了保證消息在發(fā)送過(guò)程中不丟失,引入消息應(yīng)答機(jī)制,消息應(yīng)答就是:消費(fèi)者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。

自動(dòng)應(yīng)答

消息發(fā)送后立即被認(rèn)為已經(jīng)傳送成功,這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)衡,因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了,當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過(guò)載的消息,沒(méi)有對(duì)傳遞的消息數(shù)量進(jìn)行限制,當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來(lái)不及處理的消息,導(dǎo)致這些消息的積壓,最終使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并以 某種速率能夠處理這些消息的情況下使用。

手動(dòng)應(yīng)答的方法

首先先將自動(dòng)應(yīng)答關(guān)閉
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
將此處的true改成false

肯定確認(rèn)應(yīng)答

Channel.basicAck (肯定確認(rèn)應(yīng)答):

basicAck(long deliveryTag, boolean multiple);

第一個(gè)參數(shù)是消息的標(biāo)記,第二個(gè)參數(shù)表示是否應(yīng)用于多消息,RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了

否定確認(rèn)應(yīng)答

Channel.basicReject (否定確認(rèn)應(yīng)答)

basicReject(long deliveryTag, boolean requeue);

第一個(gè)參數(shù)表示拒絕 deliveryTag 對(duì)應(yīng)的消息,第二個(gè)參數(shù)表示是否 requeue:true 則重新入隊(duì)列,false 則丟棄或者進(jìn)入死信隊(duì)列。

拒絕處理該消息

Channel.basicNack (用于否定確認(rèn)):示己拒絕處理該消息,可以將其丟棄了

basicNack(long deliveryTag, boolean multiple, boolean requeue);

第一個(gè)參數(shù)表示拒絕 deliveryTag 對(duì)應(yīng)的消息,第二個(gè)參數(shù)是表示否應(yīng)用于多消息,第三個(gè)參數(shù)表示是否 requeue,與 basicReject 區(qū)別就是同時(shí)支持多個(gè)消息,可以 拒絕簽收 該消費(fèi)者先前接收未 ack 的所有消息。拒絕簽收后的消息也會(huì)被自己消費(fèi)到。

恢復(fù)到消息隊(duì)列

Channel.basicRecover

basicRecover(boolean requeue);

是否恢復(fù)消息到隊(duì)列,參數(shù)是是否 requeue,true 則重新入隊(duì)列,并且盡可能的將之前 recover 的消息投遞給其他消費(fèi)者消費(fèi),而不是自己再次消費(fèi)。false 則消息會(huì)重新被投遞給自己。

Multiple的解釋

手動(dòng)應(yīng)答的好處是可以批量應(yīng)答并且減少網(wǎng)絡(luò)擁堵

  • true 代表批量應(yīng)答 channel 上未應(yīng)答的消息

比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時(shí) 5-8
的這些還未應(yīng)答的消息都會(huì)被確認(rèn)收到消息應(yīng)答

  • false 同上面相比只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

消息重新入隊(duì)

如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確保不會(huì)丟失任何消息。

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

手動(dòng)應(yīng)答案例

默認(rèn)消息采用的是自動(dòng)應(yīng)答,所以我們要想實(shí)現(xiàn)消息消費(fèi)過(guò)程中不丟失,需要把自動(dòng)應(yīng)答改為手動(dòng)應(yīng)答

消費(fèi)者啟用兩個(gè)線程,消費(fèi) 1 一秒消費(fèi)一個(gè)消息,消費(fèi)者 2 十秒消費(fèi)一個(gè)消息,然后在消費(fèi)者 2
消費(fèi)消息的時(shí)候,停止運(yùn)行,這時(shí)正在消費(fèi)的消息是否會(huì)重新進(jìn)入隊(duì)列,而后給消費(fèi)者 1 消費(fèi)呢?

工具類(lèi)

package utils;

public class SleepUtils {
    public static void sleep(int second){
        try {
            Thread.sleep(1000L * second);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

生產(chǎn)者

public class Task02 {

    //隊(duì)列名稱(chēng)
    public static final String TASK_QUEUE_NAME = "ACK_QUEUE";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        //聲明隊(duì)列
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        //在控制臺(tái)中輸入信息
        Scanner scanner = new Scanner(System.in);
        System.out.println("請(qǐng)輸入信息:");
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息:"+ message);
        }

    }

}

消費(fèi)者1

public class Work03 {

    //隊(duì)列名稱(chēng)
    public static final String TASK_QUEUE_NAME = "ACK_QUEUE";

    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C1等待接受消息處理時(shí)間較短");

        DeliverCallback deliverCallback =(consumerTag,message) ->{

            //沉睡1S
            SleepUtils.sleep(1);
            System.out.println("接受到的消息:"+new String(message.getBody(),"UTF-8"));
            //手動(dòng)應(yīng)答
            /**
             * 1.消息的標(biāo)記Tag
             * 2.是否批量應(yīng)答 false表示不批量應(yīng)答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback = (consumerTag -> {
            System.out.println(consumerTag + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

        });
        //采用手動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

消費(fèi)者2

public class Work04 {

    //隊(duì)列名稱(chēng)
    public static final String TASK_QUEUE_NAME = "ACK_QUEUE";

    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C1等待接受消息處理時(shí)間較短");

        DeliverCallback deliverCallback =(consumerTag,message) ->{

            //沉睡1S
            SleepUtils.sleep(10);
            System.out.println("接受到的消息:"+new String(message.getBody(),"UTF-8"));
            //手動(dòng)應(yīng)答
            /**
             * 1.消息的標(biāo)記Tag
             * 2.是否批量應(yīng)答 false表示不批量應(yīng)答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback = (consumerTag -> {
            System.out.println(consumerTag + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

        });
        //采用手動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

兩個(gè)消費(fèi)者的區(qū)別就是睡眠時(shí)間不同 消費(fèi)者1為1秒 消費(fèi)者2為10秒

RabbitMQ持久化

當(dāng) RabbitMQ 服務(wù)停掉以后,消息生產(chǎn)者發(fā)送過(guò)來(lái)的消息不丟失要如何保障? 默認(rèn)情況下 RabbitMQ 退出或由于某種原因崩潰時(shí),它忽視隊(duì)列和消息,除非告知它不要這樣做。確保消息不會(huì)丟失需要做兩件事:我們需要將隊(duì)列消息都標(biāo)記為持久化。

隊(duì)列持久化

之前我們創(chuàng)建的隊(duì)列都是非持久化的,RabbitMQ 如果重啟的化,該隊(duì)列就會(huì)被刪除掉,**如果要隊(duì)列實(shí)現(xiàn)持久化需要在聲明隊(duì)列的時(shí)候把 durable 參數(shù)設(shè)置為true,代表開(kāi)啟持久化

public class Task02 {

    //隊(duì)列名稱(chēng)
    public static final String TASK_QUEUE_NAME = "ACK_QUEUE";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        //開(kāi)啟持久化
        boolean durable = true;
        //聲明隊(duì)列
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
        //在控制臺(tái)中輸入信息
        Scanner scanner = new Scanner(System.in);
        System.out.println("請(qǐng)輸入信息:");
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息:"+ message);
        }

    }
}

如果之前聲明的隊(duì)列不是持久化的,需要把原先隊(duì)列先刪除,或者重新創(chuàng)建一個(gè)持久化的隊(duì)列 否則會(huì)出現(xiàn)報(bào)錯(cuò)

消息持久化

需要在消息生產(chǎn)者發(fā)布消息的時(shí)候,開(kāi)啟消息的持久化

在 basicPublish 方法的第二個(gè)參數(shù)添加這個(gè)屬性: MessageProperties.PERSISTENT_TEXT_PLAIN

channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));

將消息標(biāo)記為持久化并不能完全保證不會(huì)丟失消息。盡管它告訴 RabbitMQ 將消息保存到磁盤(pán),但是這里依然存在當(dāng)消息剛準(zhǔn)備存儲(chǔ)在磁盤(pán)的時(shí)候 但是還沒(méi)有存儲(chǔ)完,消息還在緩存的一個(gè)間隔點(diǎn)。此時(shí)并沒(méi) 有真正寫(xiě)入磁盤(pán)。持久性保證并不強(qiáng),但是對(duì)于我們的簡(jiǎn)單任務(wù)隊(duì)列而言,這已經(jīng)綽綽有余了。

不公平分發(fā)

在最開(kāi)始的時(shí)候我們學(xué)習(xí)到 RabbitMQ 分發(fā)消息采用的輪詢(xún)分發(fā),但是在某種場(chǎng)景下這種策略并不是很好,比方說(shuō)有兩個(gè)消費(fèi)者在處理任務(wù),其中有個(gè)消費(fèi)者 1 處理任務(wù)的速度非???,而另外一個(gè)消費(fèi)者 2 處理速度卻很慢,這個(gè)時(shí)候我們還是采用輪詢(xún)分發(fā)的化就會(huì)到這處理速度快的這個(gè)消費(fèi)者很大一部分時(shí)間處于空閑狀態(tài),而處理慢的那個(gè)消費(fèi)者一直在干活,這種分配方式在這種情況下其實(shí)就不太好,但是 RabbitMQ 并不知道這種情況它依然很公平的進(jìn)行分發(fā)。

為了避免這種情況,在消費(fèi)者中消費(fèi)消息之前,設(shè)置參數(shù) channel.basicQos(1);

不公平分發(fā)思想:如果一個(gè)工作隊(duì)列還沒(méi)有處理完或者沒(méi)有應(yīng)答簽收一個(gè)消息,則不拒絕 RabbitMQ 分配新的消息到該工作隊(duì)列。此時(shí) RabbitMQ 會(huì)優(yōu)先分配給其他已經(jīng)處理完消息或者空閑的工作隊(duì)列。如果所有的消費(fèi)者都沒(méi)有完成手上任務(wù),隊(duì)列還在不停的添加新任務(wù),隊(duì)列有可能就會(huì)遇到隊(duì)列被撐滿的情況,這個(gè)時(shí)候就只能添加新的 worker (工作隊(duì)列)或者改變其他存儲(chǔ)任務(wù)的策略。

package three;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMQUtils;
import utils.SleepUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Consumer02 {
    public static final String QUEUE_NAME = "ACK_QUEUE";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("c2等待接收消息處理時(shí)間較長(zhǎng)");

        DeliverCallback deliverCallback =(consumerTag,message)->{
            SleepUtils.sleep(10);
            System.out.println("接收到的消息:"+new String(message.getBody(), StandardCharsets.UTF_8));
            //手動(dòng)應(yīng)答
            //第二個(gè)參數(shù) false 表示只確認(rèn)當(dāng)前指定的 delivery tag 對(duì)應(yīng)的消息。
            //如果該參數(shù)為 true,將會(huì)確認(rèn)所有小于等于當(dāng)前 delivery tag 的消息。在這里,false 表示只確認(rèn)當(dāng)前的消息。
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (consumerTag -> {
            System.out.println(consumerTag + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        });

        //設(shè)置不公平分發(fā)
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

        //采用手動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

package three;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMQUtils;
import utils.SleepUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    public static final String QUEUE_NAME = "ACK_QUEUE";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("c1等待接收消息處理時(shí)間較短");

        DeliverCallback deliverCallback =(consumerTag,message)->{
            SleepUtils.sleep(1);
            System.out.println("接收到的消息:"+new String(message.getBody(), StandardCharsets.UTF_8));
            //手動(dòng)應(yīng)答
            //第二個(gè)參數(shù) false 表示只確認(rèn)當(dāng)前指定的 delivery tag 對(duì)應(yīng)的消息。
            //如果該參數(shù)為 true,將會(huì)確認(rèn)所有小于等于當(dāng)前 delivery tag 的消息。在這里,false 表示只確認(rèn)當(dāng)前的消息。
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (consumerTag -> {
            System.out.println(consumerTag + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        });
        //設(shè)置不公平分發(fā) 設(shè)置完成之后 此時(shí)他的工作做完 還有其他線程的工作沒(méi)有做完 就會(huì)做其他線程的工作
        //不公平分發(fā)和預(yù)取值分發(fā)都用到 basic.qos 方法,
        //如果取值為 1,代表不公平分發(fā),取值不為1,代表預(yù)取值分發(fā)
//        int prefetchCount = 1;
//        channel.basicQos(prefetchCount);

        //采用手動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

package three;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Task02 {
    public static final String QUEUE_NAME = "ACK_QUEUE";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        //聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //在控制臺(tái)輸入信息
        Scanner scanner =new Scanner(System.in);
        System.out.println("請(qǐng)輸入信息:");
        while (scanner.hasNext()){
            String message = scanner.nextLine();
            //第二個(gè)參數(shù) 消息持久化

            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生產(chǎn)者發(fā)出消息" + message);
        }

    }
}

**將消費(fèi)時(shí)間長(zhǎng)的消費(fèi)者設(shè)置為channel.basicQos(1); 消費(fèi)時(shí)間短的不做設(shè)置 **

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
此時(shí)就是不公平分發(fā) 消費(fèi)時(shí)間長(zhǎng)的只收到了一條消息 消費(fèi)時(shí)間短的收到了其他所有消息

預(yù)取值分發(fā)

帶權(quán)的消息分發(fā)

默認(rèn)消息的發(fā)送是異步發(fā)送的,所以在任何時(shí)候,channel 上不止只有一個(gè)消息來(lái)自消費(fèi)者的手動(dòng)確認(rèn),所以本質(zhì)上是異步的。因此這里就存在一個(gè)未確認(rèn)的消息緩沖區(qū),因此希望開(kāi)發(fā)人員能限制此緩沖區(qū)的大小,以避免緩沖區(qū)里面無(wú)限制的未確認(rèn)消息問(wèn)題。這個(gè)時(shí)候就可以通過(guò)使用 basic.qos 方法設(shè)置「預(yù)取計(jì)數(shù)」值來(lái)完成的。

該值定義通道上允許的未確認(rèn)消息的最大數(shù)量。一旦數(shù)量達(dá)到配置的數(shù)量, RabbitMQ
將停止在通道上傳遞更多消息,除非至少有一個(gè)未處理的消息被確認(rèn),例如,假設(shè)在通道上有未確認(rèn)的消息 5、6、7,8,并且通道的預(yù)取計(jì)數(shù)設(shè)置為4,此時(shí) RabbitMQ 將不會(huì)在該通道上再傳遞任何消息,除非至少有一個(gè)未應(yīng)答的消息被 ack。比方說(shuō) tag=6 這個(gè)消息剛剛被確認(rèn)ACK,RabbitMQ 將會(huì)感知這個(gè)情況到并再發(fā)送一條消息。消息應(yīng)答和 QoS 預(yù)取值對(duì)用戶(hù)吞吐量有重大影響。

通常,增加預(yù)取將提高向消費(fèi)者傳遞消息的速度。雖然自動(dòng)應(yīng)答傳輸消息速率是最佳的,但是,在這種情況下已傳遞但尚未處理的消息的數(shù)量也會(huì)增加,從而增加了消費(fèi)者的 RAM 消耗(隨機(jī)存取存儲(chǔ)器)應(yīng)該小心使用具有無(wú)限預(yù)處理的自動(dòng)確認(rèn)模式或手動(dòng)確認(rèn)模式,消費(fèi)者消費(fèi)了大量的消息如果沒(méi)有確認(rèn)的話,會(huì)導(dǎo)致消費(fèi)者連接節(jié)點(diǎn)的內(nèi)存消耗變大,所以找到合適的預(yù)取值是一個(gè)反復(fù)試驗(yàn)的過(guò)程,不同的負(fù)載該值取值也不同 100 到 300 范圍內(nèi)的值通??商峁┳罴训耐掏铝?,并且不會(huì)給消費(fèi)者帶來(lái)太大的風(fēng)險(xiǎn)。

預(yù)取值為 1 是最保守的。當(dāng)然這將使吞吐量變得很低,特別是消費(fèi)者連接延遲很?chē)?yán)重的情況下,特別是在消費(fèi)者連接等待時(shí)間較長(zhǎng)的環(huán)境 中。對(duì)于大多數(shù)應(yīng)用來(lái)說(shuō),稍微高一點(diǎn)的值將是最佳的。

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
將剛才消費(fèi)時(shí)間長(zhǎng)的消費(fèi)者的basic.qos 修改為2
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

發(fā)布確認(rèn)

生產(chǎn)者發(fā)布消息到 RabbitMQ 后,需要 RabbitMQ 返回「ACK(已收到)」給生產(chǎn)者,這樣生產(chǎn)者才知道自己生產(chǎn)的消息成功發(fā)布出去。

發(fā)布確認(rèn)邏輯

生產(chǎn)者將信道設(shè)置成 confirm 模式,一旦信道進(jìn)入 confirm 模式,所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的 ID(從 1 開(kāi)始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker 就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫(xiě)入磁盤(pán)之后發(fā)出,broker 回傳給生產(chǎn)者的確認(rèn)消息中 delivery-tag 域包含了確認(rèn)消息的序列號(hào),此外 broker 也可以設(shè)置 basic.ack 的 multiple 域,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理。

confirm 模式最大的好處在于是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過(guò)回調(diào)方法來(lái)處理該確認(rèn)消息,如果RabbitMQ 因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條 nack 消息, 生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該 nack 消息。

開(kāi)啟確認(rèn)發(fā)布

發(fā)布確認(rèn)默認(rèn)是沒(méi)有開(kāi)啟的,如果要開(kāi)啟需要調(diào)用方法 confirmSelect,每當(dāng)你要想使用發(fā)布確認(rèn),都需要在 channel 上調(diào)用該方法

//開(kāi)啟發(fā)布確認(rèn)
channel.confirmSelect();

單個(gè)確認(rèn)發(fā)布

這是一種簡(jiǎn)單的確認(rèn)方式,它是一種同步確認(rèn)發(fā)布的方式,也就是發(fā)布一個(gè)消息之后只有它被確認(rèn)發(fā)布,后續(xù)的消息才能繼續(xù)發(fā)布,waitForConfirmsOrDie(long) 這個(gè)方法只有在消息被確認(rèn)的時(shí)候才返回,如果在指定時(shí)間范圍內(nèi)這個(gè)消息沒(méi)有被確認(rèn)那么它將拋出異常。

這種確認(rèn)方式有一個(gè)最大的缺點(diǎn)就是:發(fā)布速度特別的慢,因?yàn)槿绻麤](méi)有確認(rèn)發(fā)布的消息就會(huì)阻塞所有后續(xù)消息的發(fā)布,這種方式最多提供每秒不超過(guò)數(shù)百條發(fā)布消息的吞吐量。當(dāng)然對(duì)于某些應(yīng)用程序來(lái)說(shuō)這可能已經(jīng)足夠了。

package four;

import com.rabbitmq.client.Channel;
import utils.RabbitMQUtils;


import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ConfirmMessage {

    //單個(gè)發(fā)消息的個(gè)數(shù)
    public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 變大寫(xiě)

    public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        publishMessageIndividually();//發(fā)布1000個(gè)單獨(dú)確認(rèn)消息,耗時(shí):599ms
    }

    //單個(gè)確認(rèn)
    public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //隊(duì)列的聲明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, true, false, null);

        //開(kāi)啟發(fā)布確認(rèn)
        channel.confirmSelect();

        //開(kāi)始時(shí)間
        long begin = System.currentTimeMillis();

        //批量發(fā)消息
        for (int i = 0; i < 1000; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            //單個(gè)消息就馬上進(jìn)行發(fā)布確認(rèn)
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("消息發(fā)送成功: " + i);
            }
        }
        //結(jié)束時(shí)間
        long end = System.currentTimeMillis();
        System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)單獨(dú)確認(rèn)消息,耗時(shí):" + (end - begin) + "ms");

    }
}

確認(rèn)發(fā)布指的是成功發(fā)送到了隊(duì)列,并不是消費(fèi)者消費(fèi)了消息

批量確認(rèn)發(fā)布

package four;

import com.rabbitmq.client.Channel;
import utils.RabbitMQUtils;


import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ConfirmMessage {

    //單個(gè)發(fā)消息的個(gè)數(shù)
    public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 變大寫(xiě)

    public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        publishMessageIndividually();//發(fā)布1000個(gè)單獨(dú)確認(rèn)消息,耗時(shí):599ms
        publishMessageBatch();
    }

    public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMQUtils.getChannel();
        //隊(duì)列的聲明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, true, false, null);

        //開(kāi)啟發(fā)布確認(rèn)
        channel.confirmSelect();
        //開(kāi)始時(shí)間
        long begin = System.currentTimeMillis();

        //批量確認(rèn)消息大小
        int batchSize = 100;

        //批量發(fā)送消息,批量發(fā)布確認(rèn)
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());

            //判斷達(dá)到100條消息的時(shí)候,批量確認(rèn)一次
            if ((i + 1) % batchSize == 0) {
                //發(fā)布確認(rèn)
                channel.waitForConfirms();
            }
        }
        //結(jié)束時(shí)間
        long end = System.currentTimeMillis();
        System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)批量確認(rèn)消息,耗時(shí):" + (end - begin) + "ms");
    }
}

異步確認(rèn)發(fā)布

異步確認(rèn)雖然編程邏輯比上兩個(gè)要復(fù)雜,但是性?xún)r(jià)比最高,無(wú)論是可靠性還是效率都很好,利用了回調(diào)函數(shù)來(lái)達(dá)到消息可靠性傳遞的,這個(gè)中間件也是通過(guò)函數(shù)回調(diào)來(lái)保證是否投遞成功,下面詳細(xì)講解異步確認(rèn)是怎么實(shí)現(xiàn)的。
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

    public static void publicMessageAsync() throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        //聲明隊(duì)列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,true,false,null);

        //開(kāi)啟發(fā)布確認(rèn)
        channel.confirmSelect();

        //開(kāi)始時(shí)間
        long begin = System.currentTimeMillis();

        //消息確認(rèn)的回調(diào)函數(shù)
        ConfirmCallback ackCallback = (deliveryTag, multiple) ->{
            System.out.println("確認(rèn)的消息" + deliveryTag);
        };
        //消息確認(rèn)失敗的回調(diào)函數(shù)
        ConfirmCallback nackCallback = (deliveryTag, multiple) ->{
            System.out.println("未確認(rèn)的消息" + deliveryTag);
        };
        //準(zhǔn)備消息的監(jiān)聽(tīng)器
        //異步通知
        channel.addConfirmListener(ackCallback,nackCallback);

        //批量發(fā)送消息
        for (int i = 0; i < 1000; i++) {
            String message = String.valueOf(i);
            channel.basicPublish("",queueName,null, message.getBytes(StandardCharsets.UTF_8));
        }
        long end = System.currentTimeMillis();

        System.out.println("發(fā)布"+MESSAGE_COUNT+"個(gè)異步發(fā)送確認(rèn)此消息,耗時(shí):"+(end-begin)+"ms");
    }
    

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
如何處理異步未確認(rèn)消息?

最好的解決的解決方案就是把未確認(rèn)的消息放到一個(gè)基于內(nèi)存的能被發(fā)布線程訪問(wèn)的隊(duì)列,比如說(shuō)用 ConcurrentLinkedQueue 這個(gè)隊(duì)列在 confirm callbacks 與發(fā)布線程之間進(jìn)行消息的傳遞。

此時(shí)修改代碼結(jié)構(gòu) 重寫(xiě)兩個(gè)回調(diào)函數(shù)和發(fā)消息的邏輯

    public static void publicMessageAsync() throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        //聲明隊(duì)列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,true,false,null);

        //開(kāi)啟發(fā)布確認(rèn)
        channel.confirmSelect();

        //開(kāi)始時(shí)間
        long begin = System.currentTimeMillis();

        /**
         * 線程安全有序的一個(gè)哈希表,適用于高并發(fā)的情況下
         * 1.輕松的將序號(hào)與消息進(jìn)行關(guān)聯(lián)
         * 2.輕松批量刪除條目 只要給到序號(hào)
         * 3.支持高并發(fā)(多線程)
         */
        ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();

        //消息確認(rèn)的回調(diào)函數(shù)
        ConfirmCallback ackCallback = (deliveryTag, multiple) ->{
            //如果是批量處理
            if(multiple){
                //刪除已確認(rèn)的消息
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = outstandingConfirms.headMap(deliveryTag);
                longStringConcurrentNavigableMap.clear();;
            }else {
                //不是批量刪除 就在原表上刪除一條
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("確認(rèn)的消息:" + deliveryTag);
        };
        //消息確認(rèn)失敗的回調(diào)函數(shù)
        ConfirmCallback nackCallback = (deliveryTag, multiple) ->{
            //打印未確認(rèn)的消息有哪些
            String message = outstandingConfirms.remove(deliveryTag);
            System.out.println("未確認(rèn)的消息是:" + message + "### 未確認(rèn)的消息tag:" + deliveryTag);
        };
        //準(zhǔn)備消息的監(jiān)聽(tīng)器
        //異步通知
        channel.addConfirmListener(ackCallback,nackCallback);

        //批量發(fā)送消息
        for (int i = 0; i < 1000; i++) {
            String message = String.valueOf(i);
            channel.basicPublish("",queueName,null, message.getBytes(StandardCharsets.UTF_8));
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }
        long end = System.currentTimeMillis();

        System.out.println("發(fā)布"+MESSAGE_COUNT+"個(gè)異步發(fā)送確認(rèn)此消息,耗時(shí):"+(end-begin)+"ms");
    }

以上 3 種發(fā)布確認(rèn)速度對(duì)比:

單獨(dú)發(fā)布消息

同步等待確認(rèn),簡(jiǎn)單,但吞吐量非常有限。

批量發(fā)布消息

批量同步等待確認(rèn),簡(jiǎn)單,合理的吞吐量,一旦出現(xiàn)問(wèn)題但很難推斷出是那條消息出現(xiàn)了問(wèn)題。

異步處理

最佳性能和資源使用,在出現(xiàn)錯(cuò)誤的情況下可以很好地控制,但是實(shí)現(xiàn)起來(lái)稍微難些

應(yīng)答和發(fā)布區(qū)別

應(yīng)答功能屬于消費(fèi)者,消費(fèi)完消息告訴 RabbitMQ 已經(jīng)消費(fèi)成功。

發(fā)布功能屬于生產(chǎn)者,生產(chǎn)消息到 RabbitMQ,RabbitMQ 需要告訴生產(chǎn)者已經(jīng)收到消息。

RabbitMQ交換機(jī)

Exchanges

RabbitMQ 消息傳遞模型的核心思想是: 生產(chǎn)者生產(chǎn)的消息從不會(huì)直接發(fā)送到隊(duì)列。實(shí)際上,通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊(duì)列中。

相反,生產(chǎn)者只能將消息發(fā)送到交換機(jī)(exchange),交換機(jī)工作的內(nèi)容非常簡(jiǎn)單,一方面它接收來(lái)自生產(chǎn)者的消息,另一方面將它們推入隊(duì)列。交換機(jī)必須確切知道如何處理收到的消息。是應(yīng)該把這些消息放到特定隊(duì)列還是說(shuō)把他們到許多隊(duì)列中還是說(shuō)應(yīng)該丟棄它們。這就的由交換機(jī)的類(lèi)型來(lái)決定。
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

交換機(jī)的類(lèi)型

  1. 直接(direct):處理路由鍵。需要將一個(gè)隊(duì)列綁定到交換機(jī)上,要求該消息與一個(gè)特定的路由鍵完全匹配。這是一個(gè)完整的匹配。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵 abc ,則只有被標(biāo)記為 abc 的消息才被轉(zhuǎn)發(fā),不會(huì)轉(zhuǎn)發(fā) abc.def,也不會(huì)轉(zhuǎn)發(fā) dog.ghi,只會(huì)轉(zhuǎn)發(fā) abc。

  2. 主題(topic):將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上。符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào) * 匹配不多不少一個(gè)詞。因此 abc.# 能夠匹配到 abc.def.ghi,但是 abc.* 只會(huì)匹配到 abc.def。

  3. 標(biāo)題(headers):不處理路由鍵。而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配。在綁定 Queue 與 Exchange 時(shí)指定一組鍵值對(duì);當(dāng)消息發(fā)送到RabbitMQ 時(shí)會(huì)取到該消息的 headers 與 Exchange 綁定時(shí)指定的鍵值對(duì)進(jìn)行匹配;如果完全匹配則消息會(huì)路由到該隊(duì)列,否則不會(huì)路由到該隊(duì)列。headers 屬性是一個(gè)鍵值對(duì),可以是 Hashtable,鍵值對(duì)的值可以是任何類(lèi)型。而 fanout,direct,topic 的路由鍵都需要要字符串形式的。

匹配規(guī)則
x-match 有下列兩種類(lèi)型:
x-match = all :表示所有的鍵值對(duì)都匹配才能接受到消息
x-match = any:表示只要有鍵值對(duì)匹配就能接受到消息

  1. 扇出(fanout):不處理路由鍵。你只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。Fanout 交換機(jī)轉(zhuǎn)發(fā)消息是最快的。

默認(rèn)交換機(jī)

通過(guò)空字符串(“”)進(jìn)行標(biāo)識(shí)的交換機(jī)是默認(rèn)交換

channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

臨時(shí)隊(duì)列

之前的章節(jié)我們使用的是具有特定名稱(chēng)的隊(duì)列(還記得 hello 和 ack_queue 嗎?)。隊(duì)列的名稱(chēng)我們來(lái)說(shuō)至關(guān)重要,我們需要指定我們的消費(fèi)者去消費(fèi)哪個(gè)隊(duì)列的消息。

每當(dāng)我們連接到 Rabbit 時(shí),我們都需要一個(gè)全新的空隊(duì)列,為此我們可以創(chuàng)建一個(gè)具有隨機(jī)名稱(chēng)的隊(duì)列,或者能讓服務(wù)器為我們選擇一個(gè)隨機(jī)隊(duì)列名稱(chēng)那就更好了。其次一旦我們斷開(kāi)了消費(fèi)者的連接,隊(duì)列將被自動(dòng)刪除。

創(chuàng)建臨時(shí)隊(duì)列的方式如下:

String queueName = channel.queueDeclare().getQueue();

綁定bindings

什么是 bingding 呢,binding 其實(shí)是 exchange 和 queue 之間的橋梁,它告訴我們 exchange 和那個(gè)隊(duì)列進(jìn)行了綁定關(guān)系。比如說(shuō)下面這張圖告訴我們的就是 X 與 Q1 和 Q2 進(jìn)行了綁定
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

Fanout交換機(jī)

為了說(shuō)明這種模式,我們將構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。它將由兩個(gè)程序組成:第一個(gè)程序?qū)l(fā)出日志消息,第二個(gè)程序是消費(fèi)者。其中啟動(dòng)兩個(gè)消費(fèi)者,其中一個(gè)消費(fèi)者接收到消息后把日志存儲(chǔ)在磁盤(pán)

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
ReceiveLogs01

package five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogs01 {

    //交換機(jī)名稱(chēng)
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //聲明一個(gè)交換機(jī)
        //交換機(jī)名稱(chēng),交換機(jī)類(lèi)型
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //聲明一個(gè)臨時(shí)隊(duì)列
        String queueName = channel.queueDeclare().getQueue();
        //綁定交換機(jī)和隊(duì)列
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("等待接收消息 把接收到的消息打印在屏幕上");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("控制臺(tái)打印接收到的消息:" + new String(message.getBody()));
        };
        //接收消息
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});

    }
}

ReceiveLogs02

package five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogs02 {
    private static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        //聲明一個(gè)隊(duì)列
        String queueName = channel.queueDeclare().getQueue();

        //綁定
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("等待接收消息,把接收到的消息打印在屏幕上...");
        //接收消息
        //消費(fèi)者取消消息時(shí)回調(diào)接口
        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("控制臺(tái)打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});


    }
}

EmitLog

package five;

import com.rabbitmq.client.Channel;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class EmitLog {
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生產(chǎn)者發(fā)出消息" + message);
        }
    }
}

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

一個(gè)發(fā)送,多個(gè)接受,發(fā)布/訂閱模式

Direct exchange

在上一節(jié)中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志記錄系統(tǒng)。我們能夠向許多接收者廣播日志消息。在本節(jié)我們將向其中添加一些特別的功能——讓某個(gè)消費(fèi)者訂閱發(fā)布的部分消息。例如我們只把嚴(yán)重錯(cuò)誤消息定向存儲(chǔ)到日志文件(以節(jié)省磁盤(pán)空間),同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息。

我們?cè)俅蝸?lái)回顧一下什么是 bindings,綁定是交換機(jī)和隊(duì)列之間的橋梁關(guān)系。也可以這么理解: 隊(duì)列只對(duì)它綁定的交換機(jī)的消息感興趣。綁定用參數(shù):routingKey 來(lái)表示也可稱(chēng)該參數(shù)為 binding key, 創(chuàng)建綁定我們用代碼:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);

綁定之后的意義由其交換類(lèi)型決定。

上一節(jié)中的我們的日志系統(tǒng)將所有消息廣播給所有消費(fèi)者,對(duì)此我們想做一些改變,例如我們希望將日志消息寫(xiě)入磁盤(pán)的程序僅接收嚴(yán)重錯(cuò)誤(errros),而不存儲(chǔ)哪些警告(warning)或信息(info)日志 消息避免浪費(fèi)磁盤(pán)空間。Fanout 這種交換類(lèi)型并不能給我們帶來(lái)很大的靈活性-它只能進(jìn)行無(wú)意識(shí)的廣播,在這里我們將使用 direct 這種類(lèi)型來(lái)進(jìn)行替換,這種類(lèi)型的工作方式是,消息只去到它綁定的 routingKey 隊(duì)列中去。
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
多重綁定
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

當(dāng)然如果 exchange 的綁定類(lèi)型是direct,但是它綁定的多個(gè)隊(duì)列的 key 如果都相同,在這種情況下雖然綁定類(lèi)型是 direct 但是它表現(xiàn)的就和 fanout 有點(diǎn)類(lèi)似了,就跟廣播差不多,如上圖所示。

實(shí)現(xiàn)
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
生產(chǎn)者

package six;

import com.rabbitmq.client.Channel;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class DirectLogs {

    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String message = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生產(chǎn)者發(fā)出消息:" + message);
        }
    }
}

消費(fèi)者

package six;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsDirect01 {
    private static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        channel.queueDeclare("disk",true,false,false,null);
        channel.queueBind("disk",EXCHANGE_NAME,"error");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("消費(fèi)者消費(fèi)信息:" + message);
        };
        //接收消息
        channel.basicConsume("disk",true, deliverCallback, consumerTag -> {});

    }
}

package six;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsDirect02 {
    private static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        channel.queueDeclare("console",true,false,false,null);
        channel.queueBind("console",EXCHANGE_NAME,"info");
        channel.queueBind("console",EXCHANGE_NAME,"warning");
        //消費(fèi)信息

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("消費(fèi)者接收到消息:" + message);
        };
        channel.basicConsume("console",true,deliverCallback,consumerTag -> {});
    }
}

Topics exchange

在上一個(gè)小節(jié)中,我們改進(jìn)了日志記錄系統(tǒng)。我們沒(méi)有使用只能進(jìn)行隨意廣播的 fanout 交換機(jī),而是使用了 direct 交換機(jī),從而有能實(shí)現(xiàn)有選擇性地接收日志。

盡管使用 direct 交換機(jī)改進(jìn)了我們的系統(tǒng),但是它仍然存在局限性——比方說(shuō)我們想接收的日志類(lèi)型有 info.base 和 info.advantage,某個(gè)隊(duì)列只想 info.base 的消息,那這個(gè)時(shí)候direct 就辦不到了。這個(gè)時(shí)候就只能使用 topic 類(lèi)型

Topic 的要求

發(fā)送到類(lèi)型是 topic 交換機(jī)的消息的 routing_key 不能隨意寫(xiě),必須滿足一定的要求,它必須是一個(gè)單詞列表,以點(diǎn)號(hào)分隔開(kāi)。這些單詞可以是任意單詞

比如說(shuō):“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit” 這種類(lèi)型的。

當(dāng)然這個(gè)單詞列表最多不能超過(guò) 255 個(gè)字節(jié)。

在這個(gè)規(guī)則列表中,其中有兩個(gè)替換符是大家需要注意的:

*(星號(hào))可以代替一個(gè)位置
#(井號(hào))可以替代零個(gè)或多個(gè)位置

案例
8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式
Q1–>綁定的是

  • 中間帶 orange 帶 3 個(gè)單詞的字符串 (.orange.)
    Q2–>綁定的是
  • 最后一個(gè)單詞是 rabbit 的 3 個(gè)單詞 (..rabbit)
  • 第一個(gè)單詞是 lazy 的多個(gè)單詞 (lazy.#)

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式

當(dāng)一個(gè)隊(duì)列綁定鍵是 #,那么這個(gè)隊(duì)列將接收所有數(shù)據(jù),就有點(diǎn)像 fanout 了

如果隊(duì)列綁定鍵當(dāng)中沒(méi)有 # 和 * 出現(xiàn),那么該隊(duì)列綁定類(lèi)型就是 direct 了

生產(chǎn)者

package seven;

import com.rabbitmq.client.Channel;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class EmitLogTopic {

    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        HashMap<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit", "被隊(duì)列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被隊(duì)列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "被隊(duì)列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "被隊(duì)列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "雖然滿足兩個(gè)綁定但只被隊(duì)列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何綁定不會(huì)被任何隊(duì)列接收到會(huì)被丟棄");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四個(gè)單詞不匹配任何綁定會(huì)被丟棄");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四個(gè)單詞但匹配 Q2");
        for (Map.Entry<String,String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();

            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
            System.out.println("生產(chǎn)者發(fā)出消息: " + message);
        }

    }
}

消費(fèi)者

package seven;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsTopic01 {
    //交換機(jī)名稱(chēng)
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String queueName = "Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        //關(guān)鍵方法 設(shè)置匹配信息
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        System.out.println("等待接收消息");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("接收隊(duì)列: " + queueName + "綁定鍵:" + message.getEnvelope().getRoutingKey());
        };
        //接收消息
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
    }
}

package seven;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsTopic02 {

    //交換機(jī)的名稱(chēng)
    public static final String EXCHANGE_NAME = "topic_logs";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //聲明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //聲明隊(duì)列
        String queueName = "Q2";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");

        System.out.println("等待接收消息...");
        
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收隊(duì)列:"+queueName+"  綁定鍵:"+message.getEnvelope().getRoutingKey());
        };
        //接收消息
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }
}

8:/樓^mq#rddrtvm^%,Java,Spring,RabbitMQ,rabbitmq,ruby,分布式文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-827681.html

到了這里,關(guān)于【RabbitMQ】RabbitMQ詳解(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • MQ 簡(jiǎn)介-RabbitMQ

    消息隊(duì)列作為高并發(fā)系統(tǒng)的核心組件之一,能夠幫助業(yè)務(wù)系統(tǒng)結(jié)構(gòu)提升開(kāi)發(fā)效率和系統(tǒng) 穩(wěn)定性,消息隊(duì)列主要具有以下特點(diǎn): 削峰填谷 :主要解決瞬時(shí)寫(xiě)壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失、系統(tǒng)奔潰等問(wèn)題 系統(tǒng)解耦 :解決不同重要程度、不同能力級(jí)別系統(tǒng)之間依賴(lài)導(dǎo)致一死

    2024年02月11日
    瀏覽(16)
  • MQ學(xué)習(xí)筆記--(RabbitMQ)

    初識(shí)MQ RabbitMQ快速入門(mén) SpringAMQP 同步通訊 異步通訊 MQ常見(jiàn)框架 同步通訊和異步通訊 同步通訊:比如微信視頻,同一時(shí)間只能跟一個(gè)人視頻,其他人想跟你視頻的話,得等你這個(gè)視頻結(jié)束之后才可以 異步通信:比如微信發(fā)消息,發(fā)了一個(gè)人后,別人可能還沒(méi)回你,但你還可以

    2024年02月08日
    瀏覽(15)
  • 整合MQ-----RabbitMQ

    整合MQ-----RabbitMQ

    應(yīng)用場(chǎng)景: 異步處理 。把消息放入消息中間件中,等到需要的時(shí)候再去處理。 流量削峰 例如秒殺活動(dòng),在短時(shí)間內(nèi)訪問(wèn)量急劇增加,使用消息隊(duì)列,當(dāng)消息隊(duì)列滿了就拒絕響應(yīng),跳轉(zhuǎn)到錯(cuò)誤頁(yè)面,這樣就可以使得系統(tǒng)不會(huì)因?yàn)槌?fù)載而崩潰 安裝rabbitMQ 管理后臺(tái) :http://IP

    2024年02月03日
    瀏覽(12)
  • MQ-消息隊(duì)列-RabbitMQ

    MQ-消息隊(duì)列-RabbitMQ

    MQ(Message Queue) 消息隊(duì)列 ,是基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)中“ 先進(jìn)先出 ”的一種 數(shù)據(jù)結(jié)構(gòu) 。指把要傳輸?shù)臄?shù)據(jù)(消息)放在隊(duì)列中,用隊(duì)列機(jī)制來(lái)實(shí)現(xiàn)消息傳遞——生產(chǎn)者產(chǎn)生消息并把消息放入隊(duì)列,然后由消費(fèi)者去處理。消費(fèi)者可以到指定隊(duì)列拉取消息,或者訂閱相應(yīng)的隊(duì)列,由

    2024年02月09日
    瀏覽(27)
  • MQ消息隊(duì)列,以及RabbitMQ詳細(xì)(中1)五種rabbitMQ實(shí)用模型

    MQ消息隊(duì)列,以及RabbitMQ詳細(xì)(中1)五種rabbitMQ實(shí)用模型

    書(shū)接上文,展示一下五種模型我使用的是spring could 微服務(wù)的框架 文章說(shuō)明: ? ? ? ? 本文章我會(huì)分享總結(jié)5種實(shí)用的rabbitMQ的實(shí)用模型 1、hello world簡(jiǎn)單模型 2、work queues工作隊(duì)列 3、Publish/Subscribe發(fā)布訂閱模型 4、Routing路由模型 5、Topics 主題模型 (贈(zèng)送) 6、消息轉(zhuǎn)換器 Rabbi

    2024年02月05日
    瀏覽(40)
  • RabbitMQ --- 惰性隊(duì)列、MQ集群

    RabbitMQ --- 惰性隊(duì)列、MQ集群

    當(dāng)生產(chǎn)者發(fā)送消息的速度超過(guò)了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問(wèn)題。 解決消息堆積有三種思路: 增加更多消費(fèi)者,提高消費(fèi)速度。也就是我們之前說(shuō)的work

    2024年02月03日
    瀏覽(39)
  • 【mq】RabbitMq批量刪除隊(duì)列

    ?由于部分公司同事使用RabbitMq時(shí),沒(méi)有將Client設(shè)置為autodelete,導(dǎo)致大量冗余隊(duì)列。其中這些隊(duì)列又是無(wú)routekey隊(duì)列,收到了批量的訂閱消息,占用服務(wù)器內(nèi)存。 ?如何將這些無(wú)用的隊(duì)列刪除成為一個(gè)問(wèn)題?經(jīng)過(guò)多次摸索,在rabbitmq management api里面找到了方案:

    2024年01月25日
    瀏覽(17)
  • 消息隊(duì)列-RabbitMQ:MQ作用分類(lèi)、RabbitMQ核心概念及消息生產(chǎn)消費(fèi)調(diào)試

    消息隊(duì)列-RabbitMQ:MQ作用分類(lèi)、RabbitMQ核心概念及消息生產(chǎn)消費(fèi)調(diào)試

    1)什么是 MQ MQ (message queue),從字面意思上看, 本質(zhì)是個(gè)隊(duì)列,F(xiàn)IFO 先入先出 ,只不過(guò)隊(duì)列中存放的內(nèi)容是 message 而已,還是一種 跨進(jìn)程的通信機(jī)制 , 用于上下游傳遞消息 。在互聯(lián)網(wǎng)架構(gòu)中,MQ 是一種非常常見(jiàn)的上下游 “ 邏輯解耦 + 物理解耦” 的消息通信服務(wù) 。 使用了

    2024年02月20日
    瀏覽(28)
  • RabbitMQ之MQ可靠性

    RabbitMQ之MQ可靠性

    RabbitMQ實(shí)現(xiàn)數(shù)據(jù)持久化包括3個(gè)方面 (1)交換機(jī)持久化 (2)隊(duì)列持久化 (3)消息持久化 注:開(kāi)啟持久化和生產(chǎn)者確認(rèn)時(shí),RabbitMQ只有在消息持久化完成后才會(huì)給生產(chǎn)者返回ACK回執(zhí) 從RabbitMQ的3.6.0版本開(kāi)始,就增加了Lazy Queue的概念,也就是惰性隊(duì)列 注:從3.12版本后,所有隊(duì)

    2024年01月21日
    瀏覽(25)
  • MQ消息隊(duì)列(主要介紹RabbitMQ)

    MQ消息隊(duì)列(主要介紹RabbitMQ)

    消息隊(duì)列概念:是在消息的傳輸過(guò)程中保存消息的容器。 作用:異步處理、應(yīng)用解耦、流量控制..... RabbitMQ: ? ? SpringBoot繼承RabbitMQ步驟: ? ? ? ? 1.加入依賴(lài) ? ? ? ? ?2.配置 ? ? ? ? 3.開(kāi)啟(如果不需要監(jiān)聽(tīng)消息也就是不消費(fèi)就不需要該注解開(kāi)啟) ? ? ? ? 4.創(chuàng)建隊(duì)列、

    2024年02月11日
    瀏覽(33)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包