国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者)

這篇具有很好參考價值的文章主要介紹了RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、MQ基本介紹

MQ(message queue):本質(zhì)上是個隊列,遵循FIFO原則,隊列中存放的是message,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。MQ提供“邏輯解耦+物理解耦”的消息通信服務(wù)。使用了MQ之后消息發(fā)送上游只需要依賴MQ,不需要依賴其它服務(wù)。

功能1:流量消峰

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

功能2:應(yīng)用解耦

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

功能3:異步處理

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

MQ的分類:

1.Kafka

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

2.RabbitMQ

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ概念:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

四大核心概念:

交換機(jī):

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

隊列:?

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

?六大核心模式:

1.簡單模式。2.工作模式。3.發(fā)布訂閱模式。4.路由模式。5.主題模式。6.發(fā)布確認(rèn)模式。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ工作原理:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

Channer:信道,發(fā)消息的通道。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

二、MQ下載安裝

下載:

1. 官網(wǎng)地址:https://www.rabbitmq.com/download.html。參考的下載地址如下:Linux下安裝RabbitMQ_rabbitmq下載_零碎de記憶的博客-CSDN博客

2.安裝Erlang環(huán)境

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers

3.下載Erlang,方式1:找到下面網(wǎng)址,在網(wǎng)址中下載rpm文件:

el/7/erlang-22.3.4.12-1.el7.x86_64.rpm - rabbitmq/erlang · packagecloud

或者直接輸入下面指令下載rpm文件:?

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm

然后輸入下面的命令安裝已下載的安裝包:

yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm

4.下載RabbitMQ,輸入下面的下載

wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm

?輸入下面的命令進(jìn)行本地安裝:

yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm

5. 下載socat,檢查是否已下載:

yum install socat -y

注意以下的操作都要在 /usr/local/software目錄下查看:?

6.添加開機(jī)啟動RabbitMQ服務(wù):chkconfig rabbitmq-server on。啟動rabbitmq /sbin/service rabbitmq-server start。

7.查看服務(wù)狀態(tài) /sbin/service rabbitmq-server status

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

8.停止服務(wù) /sbin/service rabbitmq-server stop。重新查看服務(wù)狀態(tài)。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

10.開啟web管理界面 sudo rabbitmq-plugins enable rabbitmq_management

11.查看防火墻狀態(tài):systemctl status firewalld。關(guān)閉防火墻:systemctl stop firewalld。關(guān)閉rabbit服務(wù)器輸入:sudo rabbitmqctl stop。開啟rabbit服務(wù)器輸入:sudo rabbitmq-server -detached。

12.在瀏覽器中輸入地址:Linux服務(wù)器ip地址:15672,可訪問web管理界面。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

13.用戶名guest,密碼默認(rèn),但無法登陸,無權(quán)限。

14.rabbitmqctl list_users查看用戶。創(chuàng)建賬號 rabbitmqctl add_user admin 123。設(shè)置用戶角色為管理員 rabbitmqctl set_user_tags admin administrator。設(shè)置用戶權(quán)限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

15.再經(jīng)嘗試可以重新登錄:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

三、創(chuàng)建Java開發(fā)環(huán)境

1.創(chuàng)建1個新項目,命名atguigu-rabbitmq,然后創(chuàng)建模塊Module。GroupId可以填寫:com.atguigu.rabbitmq,ArtifactId可以填rabbitmq-hello,選擇quickstart:

導(dǎo)入依賴如下:

  <dependencies>
    <!--rabbitmq依賴客戶端-->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
    <!--操作文件流的依賴-->
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
      <version>2.6</version>
    </dependency>
      <dependency>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version> <!-- 根據(jù)你的需求指定版本號 -->
      </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>8</source>
          <target>8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

在下圖中,P是生產(chǎn)者,C是消費者。中間的框是一個隊列-RabbitMQ代表使用者保留的消息緩存區(qū)。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

生產(chǎn)者代碼

public class producer {
    //隊列名稱
    public static final String QUEUE_NAME = "hello";
    //發(fā)消息
    public static void main( String[] args ) throws IOException, TimeoutException {
        //第1步:創(chuàng)建一個連接工程
        ConnectionFactory factory = new ConnectionFactory();
        //第2步:輸入工廠IP,用戶名和密碼——連接RabbitMQd隊列
        factory.setHost("192.168.182.136");
        factory.setUsername("admin");
        factory.setPassword("123");
        //第3步:創(chuàng)建連接
        Connection connection = factory.newConnection();
        //第4步:獲取信道
        Channel channel = connection.createChannel();
        //第5步:生成一個隊列(隊列名稱,是否持久化,是否排他,自動刪除,隊列參數(shù))
        //持久化:是否存儲入磁盤,默認(rèn)是將消息存儲在內(nèi)存中
        //排他:隊列是否只供一個消費者消費,是否進(jìn)行消息共享,true可以供多個消費者消費
        //自動刪除:最后一個消費者斷開連接后,該隊列是否自動刪除
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //第6步:發(fā)消息,(交換機(jī),路由key本次是隊列名,參數(shù),發(fā)送的消息)
        String message = "hello world";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息發(fā)送成功!??!");
    }
}

消費者代碼

public class consumer {
    public static final String QUEUE_NAME = "hello";

    public static void main(String [] args) throws IOException, TimeoutException {
    //第1步:創(chuàng)建一個連接工程
    ConnectionFactory factory = new ConnectionFactory();
    //第2步:輸入工廠IP,用戶名和密碼——連接RabbitMQd隊列
        factory.setHost("192.168.182.136");
        factory.setUsername("admin");
        factory.setPassword("123");
    //第3步:創(chuàng)建連接
    Connection connection = factory.newConnection();
    //第4步:獲取信道
    Channel channel = connection.createChannel();
    //第5步:聲明,接收消息
    DeliverCallback deliverCallback = (consumerTag,message)->{
        System.out.println(new String(message.getBody()));
    };
    //第6步:取消消息時的回調(diào)
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消費被中斷");
        };
    //第7步:接收,(隊列名,自動or手動,接收消息,回調(diào))
    //1.消費哪個隊列;2.消費成功后是否要自動應(yīng)答true代表自動應(yīng)答,false表示手動應(yīng)答
    //3.消費者未成功消費的回調(diào)
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

注意幾點:1.確保rabbitmq處于開啟的狀態(tài)(開啟方式見前面)2.最好讓防火墻處于關(guān)閉的狀態(tài) 3.最好通過方法左側(cè)的開關(guān)按鈕進(jìn)行啟動,確保啟動是選擇Current File。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式?RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

四、工作隊列

工作隊列:又稱任務(wù)隊列,主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊列。在后臺運行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個工作線程時,這些工作線程將一起處理這些任務(wù)。

情況:生產(chǎn)者大量分發(fā)消息給隊列,工作線程接收隊列的消息,工作線程不止一個,三者關(guān)系時競爭關(guān)系,你一條我一條他一條,但要注意一個消息只能被處理一次,不能被處理多次。

重復(fù)性的代碼可以被抽取成為工具類。

在java — com — atguigu —?rabbitmq下創(chuàng)建utils包,工具類起名RabbitMqUtils,放入如下代碼:

public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
        //第1步:創(chuàng)建一個連接工程
        ConnectionFactory factory = new ConnectionFactory();
        //第2步:輸入工廠IP,用戶名和密碼——連接RabbitMQd隊列
        factory.setHost("192.168.182.137");
        factory.setUsername("admin");
        factory.setPassword("123");
        //第3步:創(chuàng)建連接
        Connection connection = factory.newConnection();
        //第4步:獲取信道
        Channel channel =  connection.createChannel();
        return channel;
    }
}

工作線程的更新后,worker01的代碼如下:

public static final String QUEUE_NAME = "hello";
    public static void main(String [] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);//聲明隊列,沒有會報錯
        //消息接收
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("接收到的消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag + "消息被取消消費接口回調(diào)邏輯");
        };
        System.out.println("c1等待接收消息......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

重復(fù)利用one包下的consumer類,將其更改為c2工作線程:

Task01作為生產(chǎn)者用于生產(chǎn)數(shù)據(jù),與前面不同的是,Task01支持從IDEA控制臺輸入數(shù)據(jù):

public class Task01 {
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //從控制臺當(dāng)中接收信息
        Scanner scanner = new Scanner(System.in); //掃描控制臺輸入內(nèi)容
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("發(fā)送消息完成..");
        }
    }
}

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式?RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式?RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

五、消息應(yīng)答

概念:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

自動應(yīng)答:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

手動應(yīng)答:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

手動應(yīng)答好處,建議不批量應(yīng)答,選擇false:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

?消息自動重新入隊:

原本正常傳輸,C1突然失去連接,檢測到C1斷開連接,于是會讓消息重新入隊,原本的消息交由C2進(jìn)行處理。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

實驗思路:寫1個生產(chǎn)者,2個消費者,當(dāng)關(guān)閉掉其中1個工作線程,消息不丟失,還被另一個工作線程接收。消費在手動應(yīng)答時不丟失、放回隊列中重新消費。

消息手動應(yīng)答(生產(chǎn)者):

public class Task2 {
public static final String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    channel.queueDeclare(task_queue_name,false,false,false,null);
    Scanner scanner = new Scanner(System.in);
    while(scanner.hasNext()){
        String message = scanner.next();
        channel.basicPublish("",task_queue_name,null,message.getBytes("UTF-8"));
        System.out.println("生產(chǎn)者發(fā)出消息:"+message);
    }
}
}

消息手動應(yīng)答(消費者):

public class Work03 {
    public static final String task_queue_name = "ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息處理時間較短");
        DeliverCallback deliverCallback = (consumerTag,message)->{
            SleepUtils.sleep(1);
            System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
            //1.消息的標(biāo)記tag 2.是否批量應(yīng)答
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //采用手動應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{
            System.out.println(consumerTag + "消費者取消消費接口回調(diào)邏輯");
        }));
    }
}
public class Work04 {
    public static final String task_queue_name = "ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息處理時間較短");
        DeliverCallback deliverCallback = (consumerTag,message)->{
            SleepUtils.sleep(30);
            System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
            //1.消息的標(biāo)記tag 2.是否批量應(yīng)答
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //采用手動應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{
            System.out.println(consumerTag + "消費者取消消費接口回調(diào)邏輯");
        }));
    }
}

實現(xiàn)效果:在生產(chǎn)者輸入AA BB CC DD EE等消息,消費者1接收速度快,會立即打印AA CC EE等消息,消費者2接收速度慢,會在一段時間后接收到BB,此時如果關(guān)閉消費者2,則消費者1輸出DD,表明消費在手動應(yīng)答時不丟失、放回隊列中重新消費。

六、持久化與分發(fā)

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

如果報錯,說明原本的隊列就是不持久化,此時無法設(shè)定持久化,只能先將隊列刪除然后再重新設(shè)定。

控制隊列持久化,需要修改生產(chǎn)者聲明函數(shù)的第2個參數(shù):

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

消息持久化:

隊列持久化和消息持久化不同,隊列是MQ里的一個組件,消息是生產(chǎn)者發(fā)送的消息。

如果要讓消息持久化,在發(fā)消息的時候就要通知隊列。

更改的是生產(chǎn)者的信道的basicPublish的第3個參數(shù),添加MessageProperties.PERSISTENT_TEXT_PLAIN

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

不公平分發(fā):?

消費者處理任務(wù)的速度不一致,為了不讓速度快的消費者長時間處于空閑狀態(tài),因此采用不公平分發(fā)。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

int prefetchCount = 1;
channel.basicQos(prefetchCount);

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

預(yù)取值:

前面N條數(shù)據(jù)分別交給誰處理,如下圖就是前7條數(shù)據(jù)中,2條給C1,5條給C2

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

七、發(fā)布確認(rèn)

原理:

1.設(shè)置要求隊列必須持久化:就算服務(wù)器宕機(jī),隊列也不至于消失。

2.設(shè)置要求隊列中的消息也必須持久化。

3. 發(fā)布確認(rèn),消息保存到磁盤上之后,隊列要告知生產(chǎn)者。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

Channel channel = connection.createChannel();
channel.confirmSelect();

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

public static void main(String[] args){
}

單個發(fā)布確認(rèn):

是一種同步確認(rèn)發(fā)布的方式,發(fā)布消息-確認(rèn)消息-發(fā)布消息...必須要確認(rèn)后才能繼續(xù)發(fā)布,類似于一手交錢一手交貨,缺點是發(fā)布速度很慢。

1. 創(chuàng)建com/atguigu/rabbitmq/four文件夾下的ConfirmMessage

public static void publishMessageIndividually() throws Exception{
        Channel channel = RabbitMqUtils.getChannel(); //獲取信道
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        channel.confirmSelect();//開啟發(fā)布確認(rèn)
        long begin = System.currentTimeMillis();
        for(int i=0;i<MESSAGE_COUNT;i++){
            String message = i +"";
            channel.basicPublish("",queueName,null,message.getBytes());
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("消息發(fā)送成功");
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("發(fā)布"+MESSAGE_COUNT+"個單獨確認(rèn)消息,耗時:"+(end-begin)+"ms");
    }

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

批量發(fā)布確認(rèn):

public static void publishMessageBatch() throws Exception{
        Channel channel = RabbitMqUtils.getChannel(); //獲取信道
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        channel.confirmSelect();//開啟發(fā)布確認(rèn)
        long begin = System.currentTimeMillis();
        int batchSize = 100; //批量確認(rèn)消息的大小
        //批量發(fā)送消息,批量發(fā)布確認(rèn)
        for(int i=0;i<MESSAGE_COUNT;i++){
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //判斷達(dá)到100條消息的時候,批量確認(rèn)一次
            if(i%batchSize==0) channel.waitForConfirms();
        }

        long end = System.currentTimeMillis();
        System.out.println("發(fā)布"+MESSAGE_COUNT+"個批量確認(rèn)消息,耗時:"+(end-begin)+"ms");
}

異步發(fā)布確認(rèn):

map序列,key是消息序號(deliveryTag是消息的標(biāo)識,multiple是是否為批量),value是消息內(nèi)容,將消息每一條都編號,broker會對消息進(jìn)行應(yīng)答,分為兩種一種是確認(rèn)應(yīng)答,另一種是未確認(rèn)應(yīng)答。消息生產(chǎn)者不需要等待接收方的消息,只需要負(fù)責(zé)發(fā)送消息即可,消息是否應(yīng)答最終會以異步的形式回傳,也就是說確認(rèn)的時間可以是稍后的。

addConfirmListener單參數(shù)的是只能監(jiān)聽成功的,多參數(shù)的是可以監(jiān)聽成功也可以監(jiān)聽失敗的,都是接口需要自己來寫。

public static void publishMessageAsync() throws Exception{
    Channel channel = RabbitMqUtils.getChannel(); //獲取信道
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,false,false,false,null);
    channel.confirmSelect();//開啟發(fā)布確認(rèn)
    long begin = System.currentTimeMillis();
    //消息確認(rèn)成功,回調(diào)函數(shù)
    ConfirmCallback ackCallback = (deliveryTag, multiple)->{
        System.out.println("確認(rèn)的消息:"+deliveryTag);
    };
    //消息確認(rèn)失敗回調(diào)函數(shù)
    ConfirmCallback nackCallback = (deliveryTag, multiple)->{
        System.out.println("未確認(rèn)的消息:"+deliveryTag);
    };
    //準(zhǔn)備消息的監(jiān)聽器,監(jiān)聽哪些消息成功了,哪些消息失敗了
    channel.addConfirmListener(ackCallback,nackCallback);
    //批量發(fā)送消息
    for(int i=0;i<MESSAGE_COUNT;i++){
        String message="消息"+i;
        channel.basicPublish("",queueName,null,message.getBytes());
        //發(fā)布確認(rèn)
    }
    long end = System.currentTimeMillis();
    System.out.println("發(fā)布"+MESSAGE_COUNT+"個異步確認(rèn)消息,耗時:"+(end-begin)+"ms");
}

?處理異步未確認(rèn)消息:

最好的解決方案就是把未確認(rèn)的消息放到一個基于內(nèi)存的能被發(fā)布線程訪問的隊列,比如說用ConcurrentLinkedQueue這個隊列在confirm callbacks與發(fā)布線程之間進(jìn)行消息的傳遞。

public static void publishMessageAsync() throws Exception{
    Channel channel = RabbitMqUtils.getChannel(); //獲取信道
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,false,false,false,null);
    channel.confirmSelect();//開啟發(fā)布確認(rèn)
    /*線程安全有序的一個哈希表,適用于高并發(fā)的情況下
    1.輕松地將序號與消息進(jìn)行關(guān)聯(lián)
    2.輕松地批量刪除條目只要給到序號
    3.支持高并發(fā)(多線程)*/
    ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
    //消息確認(rèn)成功,回調(diào)函數(shù)
    ConfirmCallback ackCallback = (deliveryTag, multiple)->{
        if(multiple){
            //2.刪除掉已經(jīng)確認(rèn)的消息,剩下的就是未確認(rèn)的消息
            ConcurrentNavigableMap<Long, String> confirmd =
                    outstandingConfirms.headMap(deliveryTag);
        }else{
            outstandingConfirms.remove(deliveryTag);
        }

        System.out.println("確認(rèn)的消息:"+deliveryTag);
    };
    //消息確認(rèn)失敗回調(diào)函數(shù)
    ConfirmCallback nackCallback = (deliveryTag, multiple)->{
        //3.打印一下未確認(rèn)的消息都有哪些
        String message = outstandingConfirms.get(deliveryTag);
        System.out.println("未確認(rèn)的消息是:"+message+"未確認(rèn)的消息:"+deliveryTag);
    };
    //準(zhǔn)備消息的監(jiān)聽器,監(jiān)聽哪些消息成功了,哪些消息失敗了
    channel.addConfirmListener(ackCallback,nackCallback);
    long begin = System.currentTimeMillis();
    //批量發(fā)送消息
    for(int i=0;i<MESSAGE_COUNT;i++){
        String message="消息"+i;
        channel.basicPublish("",queueName,null,message.getBytes());
        //1.此處記錄下所有發(fā)送的消息,消息的總和
        outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
    }
    long end = System.currentTimeMillis();
    System.out.println("發(fā)布"+MESSAGE_COUNT+"個異步確認(rèn)消息,耗時:"+(end-begin)+"ms");
    }
}

三種方式對比:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

八、交換機(jī)

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

一個消息可以被消費多次,需要通過交換機(jī),仍舊遵循隊列中的消息只能被消費一次。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

?生產(chǎn)者生產(chǎn)的消息從不會直接發(fā)送到隊列。生產(chǎn)者將消息發(fā)送到交換機(jī)。交換機(jī)負(fù)責(zé)接收來自生產(chǎn)者的消息,將消息推入隊列。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

Exchanges的類型:直接(direct),主題(topic),標(biāo)題(headers),扇出(fanout)

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

消息能路由發(fā)送到隊列中其實是由routingKey(bindingkey)綁定key指定的。

創(chuàng)建臨時隊列:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

String queueName = channel.queueDedare().getQueue();

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

綁定:

根據(jù)Routing key來確定消息要發(fā)給哪個隊列,如果Routing Key相同消息就可以發(fā)送給多個隊列。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

先添加一個隊列queue1,再添加一個交換機(jī)exchange1,最后點擊exchange1交換機(jī),進(jìn)入綁定菜單,然后輸入綁定的隊列是queue1,然后Routing key隨便設(shè)置為123。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

Fanout交換機(jī)(廣播)

Fanout(扇出)是將接收到的所有消息廣播到它知道的所有隊列中。如果Routing Key相同則發(fā)送給隊列以相同消息。

生產(chǎn)者:

public class EmitLog {
    public static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息"+message);
        }
    }
}

消費者:

public class ReceiveLogs01 {
    public static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//聲明一個交換機(jī)
        //聲明一個隊列臨時隊列,隊列的名稱是隨機(jī)的,當(dāng)消費者斷開與隊列的連接時候,隊列就刪除了
        String queueName = channel.queueDeclare().getQueue();
        //綁定交換機(jī)與隊列
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("ReceiveLogs01控制臺打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
    }
}

效果:實現(xiàn)廣播的功能

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

Direct路由交換機(jī)

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

消費者1:

public class ReceiveLogsDirect01 {
    public static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare("console",false,false,false,null);
        channel.queueBind("console",EXCHANGE_NAME,"info"); //隊列名稱,交換機(jī)名稱,Routingkey
        DeliverCallback deliverCallback =(consumerTag,message)->{
            System.out.println("ReceiveLogsDirect01控制臺打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume("console",true,deliverCallback,consumerTag->{});
    }
}

消費者2:

public class ReceiveLogsDirect02 {
    public static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare("disk",false,false,false,null);
        channel.queueBind("disk",EXCHANGE_NAME,"error"); //隊列名稱,交換機(jī)名稱,Routingkey
        DeliverCallback deliverCallback =(consumerTag,message)->{
            System.out.println("ReceiveLogsDirect02控制臺打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume("disk",true,deliverCallback,consumerTag->{});
    }
}

生產(chǎn)者:?

public class DirectLogs {
    public static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息"+message);
        }
    }
}

效果:

如果【channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));】的第2個參數(shù)填info就只會發(fā)送消息給消費者1,填寫error就只會發(fā)送消息給消費者2。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

Topics主題交換機(jī)

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

發(fā)布(生產(chǎn)者)訂閱(消費者)模式:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

消費者1:

public class ReceiveLogsTopic01 {
    public static final String EXCHANGE_NAME="topic_logs";//交換機(jī)名稱
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String queueName="Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收隊列:"+queueName+" 綁定鍵:"+message.getEnvelope().getRoutingKey());
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
    }
}

消費者2:

public class ReceiveLogsTopic02 {
    public static final String EXCHANGE_NAME="topic_logs";//交換機(jī)名稱
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String queueName="Q2";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收隊列:"+queueName+" 綁定鍵:"+message.getEnvelope().getRoutingKey());
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
    }
}

?生產(chǎn)者1:

public class EmitLogTopic {
    public static final String EXCHANGE_NAME="topic_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        Map<String,String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit","被隊列Q1Q2接收到");
        bindingKeyMap.put("lazy.orange.elephant","被隊列Q1Q2接收到");
        bindingKeyMap.put("quick.orange.fox","被隊列Q1接收到");
        bindingKeyMap.put("lazy.brown.fox","被隊列Q2接收到");
        bindingKeyMap.put("lazy.pink.rabbit","雖然滿足兩個綁定但只被隊列Q2接收一次");
        bindingKeyMap.put("quick.brown.fox","不匹配任何綁定不會被任何隊列接收到會被丟棄");
        bindingKeyMap.put("quick.orange.male.rabbit","是四個單詞不匹配任何綁定會被丟棄");
        bindingKeyMap.put("lazy.orange.male.rabbit","是四個單詞但匹配Q2");
        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message =  bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息:"+message);
        }
    }
}

結(jié)果:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

九、死信

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

無法被消費消息被稱為死信

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

死信的來源:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

死信實戰(zhàn)架構(gòu)圖:

1個生產(chǎn)者2個消費者。生產(chǎn)者原本走正常交換機(jī),消息走正常隊列,被C1消費。當(dāng)滿足消息被拒絕,消息TTL過期,隊列達(dá)到最大長度三者其一時,消息成為死信,會進(jìn)入dead_exchange交換機(jī),進(jìn)入dead_queue死信隊列,死信隊列的信息由C2消費。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

消費者1:

public class Consumer01 {
    //普通交換機(jī)的名稱
    public static final String NORMAL_EXCHANGE ="normal_exchange";
    //死信交換機(jī)的名稱
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通隊列的名稱
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信隊列的名稱
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws Exception {
        Channel channel1 = RabbitMqUtils.getChannel();
        //聲明死信和普通交換機(jī),類型為direct
        channel1.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel1.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        Map<String,Object> arguments = new HashMap<>(); //設(shè)置參數(shù)
        //正常隊列設(shè)置死信交換機(jī)
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //****相當(dāng)于正常的C1不能消費掉就通過這個交換機(jī)進(jìn)行轉(zhuǎn)發(fā)
        //設(shè)置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        //聲明普通隊列
        channel1.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); //正常交換機(jī)不正常,需要將死信轉(zhuǎn)發(fā)給死信隊列
        //聲明死信隊列
        channel1.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //綁定普通的交換機(jī)與隊列
        channel1.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //綁定死信的交換機(jī)與死信的隊列
        channel1.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("Consumer01接收的消息是:" + new String(message.getBody(),"UTF-8"));
        };
        channel1.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{});
    }
}

消費者2:

public class Consumer02 {
    //死信隊列的名稱
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws Exception {
        Channel channel1 = RabbitMqUtils.getChannel();
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("Consumer02接收的消息是:" + new String(message.getBody(),"UTF-8"));
        };
        channel1.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});
    }
}

生產(chǎn)者:

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息,設(shè)置TTL時間,time to live
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        for (int i = 1; i < 11; i++) {
            String message = "info"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

實驗步驟:首先確保rabbitMq網(wǎng)頁中normal_queue和dead_queue隊列都被刪除干凈,然后先啟動Consumer01將交換機(jī)和隊列聲明,然后關(guān)閉Consumer01模擬無法接收的場景,然后啟動Producer會發(fā)送10條消息,此時啟動Consumer02,最終發(fā)送的消息會在10秒后變成死信,Consumer02會接收到這些信息。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

**易錯點:假如隊列已存在,會報錯,我們可以點進(jìn)隊列,

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

然后點擊Delete Queue即可:?

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

隊列達(dá)到最大長度:

更改Producer代碼:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

更改Consumer01代碼:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

實驗步驟:首先確保rabbitMq網(wǎng)頁中normal_queue和dead_queue隊列都被刪除干凈,然后先啟動Consumer01將交換機(jī)和隊列聲明,然后關(guān)閉Consumer01模擬假死,此時啟動Consumer02和Producer,因為隊列的最大容量為6,所以有4條消息會被發(fā)送到死信隊列。

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式

拒絕應(yīng)答:

在Producer中確保設(shè)置TTL時間注釋調(diào),然后basicProperties改為null,

在Consumer01中把長度限制注釋掉,更改DeliverCallback后的代碼,如下:

 DeliverCallback deliverCallback = (consumerTag,message)->{ //接收消息
     String msg = new String(message.getBody(), "UTF-8");
     if(msg.equals("info5")){
         System.out.println("此消息是被Consumer01拒絕的消息是:" +msg);
         channel1.basicReject(message.getEnvelope().getDeliveryTag(),false);
      } else {
         System.out.println("Consumer01接收的消息是:" +msg);
         channel1.basicAck(message.getEnvelope().getDeliveryTag(),false);
      }
};
//開啟手動應(yīng)答,如果自動應(yīng)答了根本不存在拒絕的問題
channel1.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag->{});

注意一定要開啟手動應(yīng)答,手動拒絕消息info5,效果如下:

RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者),rabbitmq,ruby,centos,筆記,學(xué)習(xí),分布式文章來源地址http://www.zghlxwxcb.cn/news/detail-728024.html

到了這里,關(guān)于RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 消息隊列-RabbitMQ:發(fā)布確認(rèn)—發(fā)布確認(rèn)邏輯和發(fā)布確認(rèn)的策略

    消息隊列-RabbitMQ:發(fā)布確認(rèn)—發(fā)布確認(rèn)邏輯和發(fā)布確認(rèn)的策略

    生產(chǎn)者將信道設(shè)置成 confirm 模式,一旦信道進(jìn)入 confirm 模式,所有在該信道上面發(fā)布的消息都將會被指派一個唯一的 ID (從 1 開始),一旦消息被投遞到所有匹配的隊列之后,broker 就會發(fā)送一個確認(rèn)給生產(chǎn)者 (包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊列

    2024年02月21日
    瀏覽(27)
  • 【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_消息可靠性_死信交換機(jī)_惰性隊列_MQ集群

    【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_消息可靠性_死信交換機(jī)_惰性隊列_MQ集群

    消息隊列在使用過程中,面臨著很多實際問題需要思考: 消息從發(fā)送,到消費者接收,會經(jīng)歷多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收

    2024年02月11日
    瀏覽(98)
  • 【RabbitMQ筆記10】消息隊列RabbitMQ之死信隊列的介紹

    【RabbitMQ筆記10】消息隊列RabbitMQ之死信隊列的介紹

    這篇文章,主要介紹消息隊列RabbitMQ之死信隊列。 目錄 一、RabbitMQ死信隊列 1.1、什么是死信隊列 1.2、設(shè)置過期時間TTL 1.3、配置死信交換機(jī)和死信隊列(代碼配置) (1)設(shè)置隊列過期時間 (2)設(shè)置單條消息過期時間 (3)隊列設(shè)置死信交換機(jī) (4)配置的基本思路 1.4、配置

    2024年02月16日
    瀏覽(95)
  • (七)「消息隊列」之 RabbitMQ 發(fā)布者確認(rèn)(使用 .NET 客戶端)

    (七)「消息隊列」之 RabbitMQ 發(fā)布者確認(rèn)(使用 .NET 客戶端)

    發(fā)布者確認(rèn) 是一個 RabbitMQ 擴(kuò)展,用于實現(xiàn)可靠的發(fā)布。當(dāng)在通道上啟用發(fā)布者確認(rèn)時,客戶端發(fā)布的消息將由代理 異步確認(rèn) ,這意味著它們已在服務(wù)器端得到處理。 先決條件 本教程假設(shè) RabbitMQ 已安裝并且正在 本地主機(jī) 的標(biāo)準(zhǔn)端口( 5672 )上運行。如果您使用了不同的主

    2024年02月16日
    瀏覽(19)
  • 【RabbitMQ筆記08】消息隊列RabbitMQ之防止消息丟失的三種方式(生產(chǎn)者消息確認(rèn)、消費者消息確認(rèn)、消息持久化)

    【RabbitMQ筆記08】消息隊列RabbitMQ之防止消息丟失的三種方式(生產(chǎn)者消息確認(rèn)、消費者消息確認(rèn)、消息持久化)

    這篇文章,主要介紹消息隊列RabbitMQ之防止消息丟失的三種方式(生產(chǎn)者消息確認(rèn)、消費者消息確認(rèn)、消息持久化)。 目錄 一、防止消息丟失 1.1、消息確認(rèn)機(jī)制(生產(chǎn)者) (1)生產(chǎn)者丟失消息 (2)生產(chǎn)者消息確認(rèn)機(jī)制 1.2、消息確認(rèn)機(jī)制(消費者) (1)消費者丟失消息

    2024年02月02日
    瀏覽(28)
  • springboot整合rabbitmq的發(fā)布確認(rèn),消費者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

    springboot整合rabbitmq的發(fā)布確認(rèn),消費者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

    目錄 1.生產(chǎn)者發(fā)消息到交換機(jī)時候的消息確認(rèn) 2.交換機(jī)給隊列發(fā)消息時候的消息確認(rèn) 3.備用隊列 3.消費者手動ack ? rabbitmq的發(fā)布確認(rèn)方式,可以有效的保證我們的數(shù)據(jù)不丟失。 ? 消息正常發(fā)送的流程是:生產(chǎn)者發(fā)送消息到交換機(jī),然后交換機(jī)通過路由鍵把消息發(fā)送給對應(yīng)的隊

    2024年02月09日
    瀏覽(28)
  • RabbitMQ實現(xiàn)延遲消息,RabbitMQ使用死信隊列實現(xiàn)延遲消息,RabbitMQ延時隊列插件

    RabbitMQ實現(xiàn)延遲消息,RabbitMQ使用死信隊列實現(xiàn)延遲消息,RabbitMQ延時隊列插件

    假設(shè)有一個業(yè)務(wù)場景:超過30分鐘未付款的訂單自動關(guān)閉,這個功能應(yīng)該怎么實現(xiàn)? RabbitMQ使用死信隊列,可以實現(xiàn)消息的延遲接收。 隊列有一個消息過期屬性。就像豐巢超過24小時就收費一樣,通過設(shè)置這個屬性,超過了指定事件的消息將會被丟棄。 這個屬性交:x-message

    2024年02月13日
    瀏覽(103)
  • RabbitMQ實現(xiàn)延遲消息的方式-死信隊列、延遲隊列和惰性隊列

    當(dāng)一條消息因為一些原因無法被成功消費,那么這這條消息就叫做死信,如果包含死信的隊列配置了dead-letter-exchange屬性指定了一個交換機(jī),隊列中的死信都會投遞到這個交換機(jī)內(nèi),這個交換機(jī)就叫死信交換機(jī),死信交換機(jī)再綁定一個隊列,死信最終會進(jìn)入到這個存放死信的

    2024年02月19日
    瀏覽(94)
  • RabbitMQ(二) - RabbitMQ與消息發(fā)布確認(rèn)與返回、消費確認(rèn)

    SpringBoot與RabbitMQ整合后,對RabbitClient的“確認(rèn)”進(jìn)行了封裝、使用方式與RabbitMQ官網(wǎng)不一致; 生產(chǎn)者給交換機(jī)發(fā)送消息后、若是不管了,則會出現(xiàn)消息丟失; 解決方案1: 交換機(jī)接受到消息、給生產(chǎn)者一個答復(fù)ack, 若生產(chǎn)者沒有收到ack, 可能出現(xiàn)消息丟失,因此重新發(fā)送消息;

    2024年02月14日
    瀏覽(21)
  • 【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機(jī),消息的 TTL 以及延遲隊列

    【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機(jī),消息的 TTL 以及延遲隊列

    消息隊列是現(xiàn)代分布式應(yīng)用中的關(guān)鍵組件,用于實現(xiàn)異步通信、解耦系統(tǒng)組件以及處理高并發(fā)請求。消息隊列可以用于各種應(yīng)用場景,包括任務(wù)調(diào)度、事件通知、日志處理等。在消息隊列的應(yīng)用中,有時需要實現(xiàn)消息的延遲處理、處理未能成功消費的消息等功能。 本文將介紹

    2024年02月05日
    瀏覽(95)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包