国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章

這篇具有很好參考價(jià)值的文章主要介紹了【RabbitMQ】消息隊(duì)列-RabbitMQ篇章。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1、RabbitMQ是什么

RabbitMQ是一個(gè)開(kāi)源的遵循AMQP協(xié)議實(shí)現(xiàn)的基于Erlang語(yǔ)言編寫,支持多種客戶端(語(yǔ)言)。用于在分布式系統(tǒng)中存儲(chǔ)消息,轉(zhuǎn)發(fā)消息,具有高可用,高可擴(kuò)性,易用性等特征。

1.1、RabbitMQ—使用場(chǎng)景

一般場(chǎng)景

像一般的下訂單業(yè)務(wù)如下圖:

將訂單信息寫入數(shù)據(jù)庫(kù)成功后,發(fā)送注冊(cè)郵件,再發(fā)送注冊(cè)短信。以上三個(gè)任務(wù)全部完成后,返回給客戶端,
像這樣耗時(shí)就很大 = 所有服務(wù)操作的耗時(shí)總和,而且若是這一整條執(zhí)行鏈某個(gè)環(huán)節(jié)出了問(wèn)題觸發(fā)回滾,得不償失

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

public void makeOrder(){
    // 1 :保存訂單 
    orderService.saveOrder();
    // 2: 發(fā)送短信服務(wù)
    messageService.sendSMS("order");//1-2 s
    // 3: 發(fā)送email服務(wù)
    emailService.sendEmail("order");//1-2 s
    // 4: 發(fā)送APP服務(wù)
    appService.sendApp("order");    
}

那么當(dāng)我們開(kāi)辟一個(gè)線程池去異步處理的話,也存在缺點(diǎn):(最大的原因就是自己去實(shí)現(xiàn)起來(lái),因素過(guò)多,實(shí)現(xiàn)復(fù)雜)
存在問(wèn)題:
1:耦合度高
2:需要自己寫線程池自己維護(hù)成本太高
3:出現(xiàn)了消息可能會(huì)丟失,需要你自己做消息補(bǔ)償
4:如何保證消息的可靠性你自己寫
5:如果服務(wù)器承載不了,你需要自己去寫高可用
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

所以MQ就誕生了
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
只管下單,下單后直接就給用戶提示下單成功,別的事交給mq去派發(fā),讓別的服務(wù)去mq拿消息處理

用戶響應(yīng)耗時(shí) = 下單(主要)耗時(shí)(50ms) 別的(次要)處理服務(wù)全放到消息隊(duì)列當(dāng)中等待處理

public void makeOrder(){
    // 1 :保存訂單 
    orderService.saveOrder();   
    rabbitTemplate.convertSend("ex","2","消息內(nèi)容");
}
解耦

發(fā)送方將消息發(fā)送到消息隊(duì)列中,接收方從隊(duì)列中獲取消息進(jìn)行處理。這種松耦合的通信模式可以提高系統(tǒng)的可擴(kuò)展性和靈活性。

這樣使得下單服務(wù)并不受,發(fā)短信、發(fā)郵件、等等服務(wù)的影響(前提是下單不依賴任何一個(gè)服務(wù)的返回值)
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

削峰

當(dāng)系統(tǒng)面臨突然的請(qǐng)求高峰時(shí),消息隊(duì)列可以起到緩沖的作用。請(qǐng)求先進(jìn)入消息隊(duì)列排隊(duì),然后逐個(gè)被處理,使得系統(tǒng)能夠逐漸消化高峰期的請(qǐng)求壓力,避免過(guò)載和故障。

也就是如果某個(gè)時(shí)刻有大量的請(qǐng)求,此時(shí)都會(huì)到mq里面去,而不會(huì)瞬間開(kāi)啟很多線程去異步執(zhí)行,從而達(dá)到銷峰的效果,使得即便大量的用戶請(qǐng)求來(lái)了,那系統(tǒng)處理請(qǐng)求還是非常平滑的
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

異步

消息隊(duì)列支持異步處理,即發(fā)送方發(fā)送消息后,并不需要等待接收方立即處理完成,而是繼續(xù)執(zhí)行其他任務(wù)。接收方在合適的時(shí)間從隊(duì)列中獲取消息進(jìn)行處理。這種異步處理可以提高系統(tǒng)的性能和響應(yīng)速度,尤其適用于處理耗時(shí)的操作。

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

好處
1:完全解耦,用MQ建立橋接
2:有獨(dú)立的線程池和運(yùn)行模型
3:出現(xiàn)了消息可能會(huì)丟失,MQ有持久化功能
4:如何保證消息的可靠性,死信隊(duì)列和消息轉(zhuǎn)移的等
5:如果服務(wù)器承載不了,你需要自己去寫高可用,HA鏡像模型高可用。
按照以上約定,用戶的響應(yīng)時(shí)間相當(dāng)于是訂單信息寫入數(shù)據(jù)庫(kù)的時(shí)間,也就是50毫秒。注冊(cè)郵件,發(fā)送短信寫入消息隊(duì)列后,直接返回,因此寫入消息隊(duì)列的速度很快,基本可以忽略,因此用戶的響應(yīng)時(shí)間可能是50毫秒。因此架構(gòu)改變后,系統(tǒng)的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了兩倍

2、Dokcer安裝RabbitMQ

2.1安裝Dokcer

  1. yum 包更新到最新
> yum update
  1. 安裝軟件包,yum-util提供yum-config-manager功能,另外兩個(gè)是devicemapper驅(qū)動(dòng)依賴的
> yum install -y yum-utils device-mapper-persistent-data lvm2
  1. 設(shè)置yum源為阿里云
> yum-config-manager --add-repo
> http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
  1. 安裝docker
> yum install docker-ce-y
  1. 安裝后查看docker版本
> docker-v
  1. 安裝加速鏡像

從阿里云獲取鏡像加速器:
https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors

sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
  "registry-mirrors": ["https://spukdfwp.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docke

2.2安裝rabbitmq

  1. 路徑:https://www.rabbitmq.com/download.html

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  1. 點(diǎn)擊上圖中標(biāo)紅線的 community Docker
    image,跳轉(zhuǎn)到如下地址:https://registry.hub.docker.com/_/rabbitmq/

當(dāng)前可以看到安裝鏡像的時(shí)候可以設(shè)置用戶名,密碼,ip。就不用安裝完進(jìn)入容器內(nèi)部設(shè)置
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
3. 官網(wǎng)給的安裝案例

$ docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management

4.命令講解

docker run -id --hostname my-rabbit --name=myrabbit -p 15672:15672 rabbitmq:3-management

--hostname:指定容器主機(jī)名稱
--name:指定容器名稱
-p:將mq端口號(hào)映射到本地
-e 設(shè)置

5.修改命令創(chuàng)建并安裝

docker run -di  --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:3-management

6.阿里云開(kāi)放上方命令 設(shè)置的端口號(hào)

-p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

7.安裝成功

[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker images
REPOSITORY   TAG            IMAGE ID       CREATED        SIZE
rabbitmq     3-management   6c3c2a225947   7 months ago   253MB
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID   IMAGE                   COMMAND                  CREATED         STATUS         PORTS                                                                                                                                                                                                                                                                               NAMES
1de1f1e10cb0   rabbitmq:3-management   "docker-entrypoint.s…"   6 minutes ago   Up 6 minutes   4369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp, 0.0.0.0:25672->25672/tcp, :::25672->25672/tcp, 0.0.0.0:61613->61613/tcp, :::61613->61613/tcp, 15691-15692/tcp   myrabbit
[root@iZbp1av1izm1qqcdfa0nndZ ~]#

8.停掉手動(dòng)安裝的rabbimq

systemctl stop rabbitmq-server

9.啟動(dòng)docker的rabbitmq容器

##查看容器
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID   IMAGE                   COMMAND                  CREATED         STATUS         PORTS                                                                                                                                                                                                                                                                               NAMES
1de1f1e10cb0   rabbitmq:3-management   "docker-entrypoint.s…"   9 minutes ago   Up 9 minutes   4369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp, 0.0.0.0:25672->25672/tcp, :::25672->25672/tcp, 0.0.0.0:61613->61613/tcp, :::61613->61613/tcp, 15691-15692/tcp   myrabbit
##啟動(dòng)容器 docker start 容器id(CONTAINER ID)
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker start 1de1f1e10cb0
1de1f1e10cb0
[root@iZbp1av1izm1qqcdfa0nndZ ~]#

10.通過(guò)服務(wù)器(虛擬機(jī)ip+端口號(hào)(15672))訪問(wèn)RabbitMQ主頁(yè)http://192.168.157.128:15672

默認(rèn)登錄賬號(hào)和密碼都是admin
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

并且在admin賬號(hào)下可以通過(guò)增加用戶,給用戶不同角色,也就對(duì)應(yīng)不同的操作權(quán)限:
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
詳情如下:
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

3、RabbitMQ入門案例 - Simple 簡(jiǎn)單模式

1.實(shí)現(xiàn)步驟:

1:jdk1.8
2:構(gòu)建一個(gè)maven工程
3:導(dǎo)入rabbitmq的maven依賴
4:?jiǎn)?dòng)rabbitmq-server服務(wù)
5:定義生產(chǎn)者
6:定義消費(fèi)者
7:觀察消息的在rabbitmq-server服務(wù)中的過(guò)程

2.構(gòu)建一個(gè)maven工程
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
3.導(dǎo)入rabbitmq的maven依賴

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>


<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>



<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>



4.啟動(dòng)rabbitmq-server服務(wù)

systemctl start rabbitmq-server
或者
docker start myrabbit

5、定義生產(chǎn)者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
    public static void main(String[] args) {
        // 1: 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 設(shè)置連接屬性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 從連接工廠中獲取連接
            connection = connectionFactory.newConnection("生產(chǎn)者");
            // 4: 從連接中獲取通道channel
            channel = connection.createChannel();
            // 5: 申明隊(duì)列queue存儲(chǔ)消息
            /*
             *  如果隊(duì)列不存在,則會(huì)創(chuàng)建
             *  Rabbitmq不允許創(chuàng)建兩個(gè)相同的隊(duì)列名稱,否則會(huì)報(bào)錯(cuò)。
             *
             *  @params1: queue 隊(duì)列的名稱
             *  @params2: durable 隊(duì)列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果為true,會(huì)對(duì)當(dāng)前隊(duì)列加鎖,其他的通道不能訪問(wèn),并且連接自動(dòng)關(guān)閉
             *  @params4: autoDelete 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后是否自動(dòng)刪除消息。
             *  @params5: arguments 可以設(shè)置隊(duì)列附加參數(shù),設(shè)置隊(duì)列的有效期,消息的最大長(zhǎng)度,隊(duì)列的消息生命周期等等。
             * */
            channel.queueDeclare("queue1", false, false, true, null);
            // 6: 準(zhǔn)備發(fā)送消息的內(nèi)容
            String message = "你好,學(xué)相伴!??!";
            // 7: 發(fā)送消息給中間件rabbitmq-server
            // @params1: 交換機(jī)exchange
            // @params2: 隊(duì)列名稱/routing
            // @params3: 屬性配置
            // @params4: 發(fā)送消息的內(nèi)容
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息發(fā)送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        } finally {
            // 7: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            // 8: 關(guān)閉連接
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

1:執(zhí)行發(fā)送,這個(gè)時(shí)候可以在web控制臺(tái)查看到這個(gè)隊(duì)列queue的信息

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

2:我們可以進(jìn)行對(duì)隊(duì)列的消息進(jìn)行預(yù)覽和測(cè)試如下:

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

3:進(jìn)行預(yù)覽和獲取消息進(jìn)行測(cè)試

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

NACK 只是做消息預(yù)覽,不會(huì)吧消息從隊(duì)列移除
ACK相當(dāng)于手動(dòng)的把消息處理了,這個(gè)時(shí)候就會(huì)把消息從隊(duì)列剔除,導(dǎo)致消息丟失

6、定義消費(fèi)者

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) {
        // 所有的中間件技術(shù)都是基于tcp/ip協(xié)議基礎(chǔ)上構(gòu)建新型協(xié)議規(guī)范,只不過(guò)rabbitmq遵循的是amqp
        // ip port

        // 1: 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 設(shè)置連接屬性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 從連接工廠中獲取連接
            connection = connectionFactory.newConnection("消費(fèi)者");
            // 4: 從連接中獲取通道channel
            channel = connection.createChannel();

            channel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到的消息是:" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                public void handle(String s) throws IOException {
                    System.out.println("接收失敗了。。。");
                }
            });
            System.out.println("開(kāi)始接收消息");
            System.in.read();
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        }finally {
            // 7: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消費(fèi)者和生產(chǎn)者的區(qū)別在于,消費(fèi)者是從mq中取消息,而生產(chǎn)者是從mq中存消息
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

4、RabbitMQ的核心組成部分

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
核心概念:

  1. Server:又稱Broker ,接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)。 安裝rabbitmq-server
  2. Connection:連接,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接 TCP/IP/ 三次握手和四次揮手
  3. Channel:網(wǎng)絡(luò)信道,幾乎所有的操作都在Channel中進(jìn)行,Channel是進(jìn)行消息讀寫的通道,客戶端可以建立對(duì)各Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)。
  4. Message:消息:服務(wù)與應(yīng)用程序之間傳送的數(shù)據(jù),由Properties和body組成,Properties可是對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí),延遲等高級(jí)特性,Body則就是消息體的內(nèi)容。
  5. Virtual Host 虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由,一個(gè)虛擬主機(jī)理由可以有若干個(gè)Exhange和Queueu,同一個(gè)虛擬主機(jī)里面不能有相同名字的Exchange
  6. Exchange:交換機(jī),接受消息,根據(jù)路由鍵發(fā)送消息到綁定的隊(duì)列。(不具備消息存儲(chǔ)的能力)
  7. Bindings:Exchange和Queue之間的虛擬連接,binding中可以保護(hù)多個(gè)routing key.
  8. Routing key:是一個(gè)路由規(guī)則,虛擬機(jī)可以用它來(lái)確定如何路由一個(gè)特定消息。
  9. Queue:隊(duì)列:也成為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者。

4.1 RabbitMQ整體架構(gòu)

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

4.2RabbitMQ的運(yùn)行流程

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

所以發(fā)送消息的時(shí)候沒(méi)有設(shè)置交換機(jī),rabbitmq發(fā)送消息一定會(huì)有默認(rèn)一個(gè)交換機(jī),并且消息不是直接到隊(duì)列當(dāng)中的,而是由交換機(jī)根據(jù)路由鍵發(fā)送消息到綁定的隊(duì)列

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

5、RabbitMQ的模式

5.1 發(fā)布訂閱模式–fanout

特點(diǎn):Fanout—發(fā)布與訂閱模式,是一種廣播機(jī)制,它是沒(méi)有路由key的模式。

也就是只要生產(chǎn)者發(fā)送一條消息經(jīng)過(guò)交換機(jī)加入隊(duì)列中,左右的消費(fèi)者都能拿到消息

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
這里就直接用web界面演示

  1. 新建一個(gè)fanout模式的交換機(jī)(讓交換機(jī)代替生產(chǎn)者去發(fā)消息)
    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  2. 創(chuàng)建3個(gè)消息隊(duì)列q1、q2、q3
    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  3. 將隊(duì)列綁定到交換機(jī)上
    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  4. 由交換機(jī)代替生產(chǎn)者發(fā)送消息
    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  5. 然后三個(gè)隊(duì)列都會(huì)有一個(gè)交換機(jī)發(fā)來(lái)的消息
    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  6. q1隊(duì)列消息正常被消費(fèi)者拾?。ㄆ渌?duì)列一樣)
    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  7. q1隊(duì)列消息正常被消費(fèi)者拾取之后,隊(duì)列消息-1

ACK后  頁(yè)面在自動(dòng)會(huì)更新隊(duì)列消息條目,默認(rèn)5

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

5.2路由模式-direct模式

Direct模式是fanout模式上的一種疊加,增加了路由RoutingKey的模式。
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

這樣就可以給指定設(shè)置了路由key的隊(duì)列發(fā)送消息,并且一個(gè)隊(duì)列可以有多個(gè)路由key,當(dāng)發(fā)送消息指定了路由key,則只有設(shè)置了相對(duì)應(yīng)的路由key的隊(duì)列才能接收到消息

5.3路由模式-Topic模式

Topic模式是direct模式上的一種疊加,增加了模糊路由RoutingKey的模式。

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

* 代表一級(jí)(必須有一級(jí))
# 代表0級(jí)或者多級(jí)

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

注意最好用代碼的形式來(lái)進(jìn)行綁定

在實(shí)際開(kāi)發(fā)中,我們既可以在RabbitMq的web界面進(jìn)行交換機(jī)的創(chuàng)建,隊(duì)列的創(chuàng)建,綁定路由key等等操作。
還可以在生產(chǎn)者代碼里面通過(guò)channel.XXX的方式設(shè)置交換機(jī),設(shè)置隊(duì)列,設(shè)置路由key,等等,效果是一樣的

例如下面代碼:生產(chǎn)者(消費(fèi)者也可以聲明) 代碼實(shí)現(xiàn)【交換機(jī)和隊(duì)列】的聲明和綁定

package com.xxx.rabbitmq.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ入門案例 - 完整的聲明方式創(chuàng)建
 * 代碼實(shí)現(xiàn)創(chuàng)建交換機(jī)和隊(duì)列,并綁定關(guān)系
 * 生產(chǎn)者
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 設(shè)置連接屬性
        connectionFactory.setHost("121.196.153.197");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 從連接工廠中獲取連接
            connection = connectionFactory.newConnection("生產(chǎn)者");
            // 4: 從連接中獲取通道channel
            channel = connection.createChannel();
            // 5: 準(zhǔn)備發(fā)送消息的內(nèi)容
            String message = "你好,交換機(jī)";
            // 6:準(zhǔn)備交換機(jī)。取名規(guī)范 :  類型_業(yè)務(wù)模塊_交換機(jī)
            String exchangeName = "direct_order_exchange";
            // 7: 定義路由key
            String routeKeyOrder = "order";
            String routeKeyCourse = "course";
            // 8: 指定交換機(jī)的類型
            String exchangeType = "direct";
            // 9: 聲明交換機(jī)/注冊(cè)交換機(jī)
            // @params1: 交換機(jī)名稱
            // @params2: 交換機(jī)類型
            // @params3: 是否持久化 所謂的持久化就是值:交換機(jī)不會(huì)隨著服務(wù)器的重啟造成丟失,如果是true代表不丟失,false重啟丟失
            channel.exchangeDeclare(exchangeName, exchangeType, true);
            // 10: 聲明隊(duì)列/注冊(cè)隊(duì)列
            // @params1: 隊(duì)列名稱
            // @params2: 是否持久化
            // @params3: 是不是排他性,是否是獨(dú)占獨(dú)立
            // @params4: 是不是自動(dòng)刪除 隨著最后一個(gè)消費(fèi)者消息完畢消息以后是否把隊(duì)列自動(dòng)刪除
            // @params5: 是不是有參數(shù) 參數(shù)攜帶可能會(huì)引發(fā)headers模式
            channel.queueDeclare("queue5", true, false, false, null);
            channel.queueDeclare("queue6", true, false, false, null);
            channel.queueDeclare("queue7", true, false, false, null);
            // 11: 綁定隊(duì)列
            // @params1: 隊(duì)列名稱
            // @params2: 交換機(jī)名稱
            // @params3: routeKey
            channel.queueBind("queue5", exchangeName, routeKeyOrder);
            channel.queueBind("queue6", exchangeName, routeKeyOrder);
            channel.queueBind("queue7", exchangeName, routeKeyCourse);
            // 12: 發(fā)送消息給中間件rabbitmq-server
            // @params1: 交換機(jī)exchange
            // @params2: 隊(duì)列名稱
            // @params3: 屬性配置
            // @params4: 發(fā)送消息的內(nèi)容
            channel.basicPublish(exchangeName, routeKeyOrder, null, message.getBytes());
            System.out.println("消息發(fā)送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        } finally {
            // 13: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }

    }
}


消費(fèi)者去隊(duì)列拿消息:

package com.xxx.rabbitmq.all;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * RabbitMQ入門案例 - 完整的聲明方式創(chuàng)建
 * 消費(fèi)者
 */
public class Consumer {
    private static Runnable runnable = new Runnable() {
        public void run() {
            // 1: 創(chuàng)建連接工廠
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2: 設(shè)置連接屬性
            connectionFactory.setHost("121.196.153.197");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //獲取隊(duì)列的名稱
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 3: 從連接工廠中獲取連接
                connection = connectionFactory.newConnection();
                // 4: 從連接中獲取通道channel
                channel = connection.createChannel();
                // 5: 申明隊(duì)列queue存儲(chǔ)消息
                /*
                 *  如果隊(duì)列不存在,則會(huì)創(chuàng)建
                 *  Rabbitmq不允許創(chuàng)建兩個(gè)相同的隊(duì)列名稱,否則會(huì)報(bào)錯(cuò)。
                 *
                 *  @params1: queue 隊(duì)列的名稱
                 *  @params2: durable 隊(duì)列是否持久化
                 *  @params3: exclusive 是否排他,即是否私有的,如果為true,會(huì)對(duì)當(dāng)前隊(duì)列加鎖,其他的通道不能訪問(wèn),并且連接自動(dòng)關(guān)閉
                 *  @params4: autoDelete 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后是否自動(dòng)刪除消息。
                 *  @params5: arguments 可以設(shè)置隊(duì)列附加參數(shù),設(shè)置隊(duì)列的有效期,消息的最大長(zhǎng)度,隊(duì)列的消息生命周期等等。
                 * */
                // 這里如果queue已經(jīng)被創(chuàng)建過(guò)一次了,可以不需要定義
                //channel.queueDeclare("queue1", false, false, false, null);
                // 6: 定義接受消息的回調(diào)
                Channel finalChannel = channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String s) throws IOException {
                    }
                });
                System.out.println(queueName + ":開(kāi)始接受消息");
                System.in.read();
            } catch (Exception ex) {
                ex.printStackTrace();
                System.out.println("發(fā)送消息出現(xiàn)異常...");
            } finally {
                // 7: 釋放連接關(guān)閉通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    };


    public static void main(String[] args) {
        // 啟動(dòng)三個(gè)線程去執(zhí)行
        new Thread(runnable, "queue5").start();
        new Thread(runnable, "queue6").start();
        new Thread(runnable, "queue7").start();
    }

}


5.4輪詢模式 - Work模式

5.4.1Work模式 - 輪詢模式(Round-Robin)

輪詢模式的分發(fā):一個(gè)消費(fèi)者一條,按均分配;

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

生產(chǎn)者:

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author: 學(xué)相伴-飛哥
 * @description: Producer 簡(jiǎn)單隊(duì)列生產(chǎn)者
 * @Date : 2021/3/2
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 設(shè)置連接屬性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 從連接工廠中獲取連接
            connection = connectionFactory.newConnection("生產(chǎn)者");
            // 4: 從連接中獲取通道channel
            channel = connection.createChannel();
            // 6: 準(zhǔn)備發(fā)送消息的內(nèi)容
            //===============================end topic模式==================================
            for (int i = 1; i <= 20; i++) {
                //消息的內(nèi)容
                String msg = "學(xué)相伴:" + i;
                // 7: 發(fā)送消息給中間件rabbitmq-server
                // @params1: 交換機(jī)exchange
                // @params2: 隊(duì)列名稱/routingkey
                // @params3: 屬性配置
                // @params4: 發(fā)送消息的內(nèi)容
                channel.basicPublish("", "queue1", null, msg.getBytes());
            }
            System.out.println("消息發(fā)送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        } finally {
            // 7: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消費(fèi)者work1:

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author: 學(xué)相伴-飛哥
 * @description: Consumer
 * @Date : 2021/3/2
 */
public class Work1 {
    public static void main(String[] args) {
        // 1: 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 設(shè)置連接屬性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 從連接工廠中獲取連接
            connection = connectionFactory.newConnection("消費(fèi)者-Work1");
            // 4: 從連接中獲取通道channel
            channel = connection.createChannel();
            // 5: 申明隊(duì)列queue存儲(chǔ)消息
            /*
             *  如果隊(duì)列不存在,則會(huì)創(chuàng)建
             *  Rabbitmq不允許創(chuàng)建兩個(gè)相同的隊(duì)列名稱,否則會(huì)報(bào)錯(cuò)。
             *
             *  @params1: queue 隊(duì)列的名稱
             *  @params2: durable 隊(duì)列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果為true,會(huì)對(duì)當(dāng)前隊(duì)列加鎖,其他的通道不能訪問(wèn),并且連接自動(dòng)關(guān)閉
             *  @params4: autoDelete 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后是否自動(dòng)刪除消息。
             *  @params5: arguments 可以設(shè)置隊(duì)列附加參數(shù),設(shè)置隊(duì)列的有效期,消息的最大長(zhǎng)度,隊(duì)列的消息生命周期等等。
             * */
            // 這里如果queue已經(jīng)被創(chuàng)建過(guò)一次了,可以不需要定義
//            channel.queueDeclare("queue1", false, false, false, null);
            // 同一時(shí)刻,服務(wù)器只會(huì)推送一條消息給消費(fèi)者
            // 6: 定義接受消息的回調(diào)
            Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try{
                        System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                    }catch(Exception ex){
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println("Work1-開(kāi)始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        } finally {
            // 7: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

work2

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author: 學(xué)相伴-飛哥
 * @description: Consumer
 * @Date : 2021/3/2
 */
public class Work2 {
    public static void main(String[] args) {
        // 1: 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 設(shè)置連接屬性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 從連接工廠中獲取連接
            connection = connectionFactory.newConnection("消費(fèi)者-Work2");
            // 4: 從連接中獲取通道channel
            channel = connection.createChannel();
            // 5: 申明隊(duì)列queue存儲(chǔ)消息
            /*
             *  如果隊(duì)列不存在,則會(huì)創(chuàng)建
             *  Rabbitmq不允許創(chuàng)建兩個(gè)相同的隊(duì)列名稱,否則會(huì)報(bào)錯(cuò)。
             *
             *  @params1: queue 隊(duì)列的名稱
             *  @params2: durable 隊(duì)列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果為true,會(huì)對(duì)當(dāng)前隊(duì)列加鎖,其他的通道不能訪問(wèn),并且連接自動(dòng)關(guān)閉
             *  @params4: autoDelete 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后是否自動(dòng)刪除消息。
             *  @params5: arguments 可以設(shè)置隊(duì)列附加參數(shù),設(shè)置隊(duì)列的有效期,消息的最大長(zhǎng)度,隊(duì)列的消息生命周期等等。
             * */
            // 這里如果queue已經(jīng)被創(chuàng)建過(guò)一次了,可以不需要定義
            //channel.queueDeclare("queue1", false, true, false, null);
            // 同一時(shí)刻,服務(wù)器只會(huì)推送一條消息給消費(fèi)者
            //channel.basicQos(1);
            // 6: 定義接受消息的回調(diào)
            Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try{
                        System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(200);
                    }catch(Exception ex){
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println("Work2-開(kāi)始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        } finally {
            // 7: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

work1和work2的消息處理能力不同,但是最后處理的消息條數(shù)相同,是“按均分配”。

往隊(duì)列發(fā)送20條消息,
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

結(jié)果就是輪詢給消費(fèi)者拿消息,即便有的消費(fèi)者消費(fèi)很快(也只能按照新順序拿消息),也只能按照輪詢一個(gè)一個(gè)拿,

也就是說(shuō),不會(huì)因?yàn)槟硞€(gè)消費(fèi)者所在的服務(wù)器滿,而導(dǎo)致少消費(fèi),一定是公平消費(fèi)
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

5.4.1Work模式 - 公平分發(fā)模式(Round-Robin)

根據(jù)消費(fèi)者的消費(fèi)能力進(jìn)行公平分發(fā),處理快的處理的多,處理慢的處理的少;按勞分配;

相比較輪詢模式,公平分發(fā)的不同在于:修改應(yīng)答方式為手動(dòng)
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

qos設(shè)置為1,就代表消費(fèi)者拿到cpu的執(zhí)行權(quán)就每次從只拿走一條消息,一條一條的拿。若不設(shè)置,就是默認(rèn)輪詢拿一條
所以根據(jù)隊(duì)列堆積的消息條數(shù)以及內(nèi)存和磁盤空間來(lái)合理設(shè)置qos

【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
這個(gè)時(shí)候,性能好的消費(fèi)者就會(huì)消費(fèi)得多,而性能差的消費(fèi)者就消費(fèi)得少,能者多勞
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java
【RabbitMQ】消息隊(duì)列-RabbitMQ篇章,RabbitMq,rabbitmq,java

更新中------
參考來(lái)自:狂神文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-659284.html

到了這里,關(guān)于【RabbitMQ】消息隊(duì)列-RabbitMQ篇章的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Java中如何使用消息隊(duì)列實(shí)現(xiàn)異步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息隊(duì)列實(shí)現(xiàn)異步處理。下面是一個(gè)簡(jiǎn)單的示例代碼,用于說(shuō)明如何使用 ActiveMQ 實(shí)現(xiàn)消息隊(duì)列異步處理: 添加 ActiveMQ 依賴 在 pom.xml 文件中添加以下依賴: 創(chuàng)建消息隊(duì)列 創(chuàng)建一個(gè)名為 “TestQueue” 的消息隊(duì)列,并配置 ActiveMQ 連接信息: 創(chuàng)建消息消費(fèi)者

    2024年02月16日
    瀏覽(33)
  • RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    假設(shè)有一個(gè)業(yè)務(wù)場(chǎng)景:超過(guò)30分鐘未付款的訂單自動(dòng)關(guān)閉,這個(gè)功能應(yīng)該怎么實(shí)現(xiàn)? RabbitMQ使用死信隊(duì)列,可以實(shí)現(xiàn)消息的延遲接收。 隊(duì)列有一個(gè)消息過(guò)期屬性。就像豐巢超過(guò)24小時(shí)就收費(fèi)一樣,通過(guò)設(shè)置這個(gè)屬性,超過(guò)了指定事件的消息將會(huì)被丟棄。 這個(gè)屬性交:x-message

    2024年02月13日
    瀏覽(104)
  • 3.精通RabbitMQ—消息隊(duì)列、RabbitMQ

    3.精通RabbitMQ—消息隊(duì)列、RabbitMQ

    RabbitMQ面試題 (總結(jié)最全面的面試題) 入門RabbitMQ消息隊(duì)列,看這篇文章就夠了 消息隊(duì)列 是一種基于 隊(duì)列 ,用于解決 不同進(jìn)程或應(yīng)用 之間 通訊 的 消息中間件 。 支持多種 消息傳遞模式 ,如 隊(duì)列模型 、 發(fā)布/訂閱模型 等。 業(yè)務(wù)解耦 :通過(guò) 發(fā)布/訂閱 模式,減少系統(tǒng)的 耦

    2024年02月15日
    瀏覽(19)
  • 【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    這篇文章,主要介紹消息隊(duì)列RabbitMQ之死信隊(duì)列。 目錄 一、RabbitMQ死信隊(duì)列 1.1、什么是死信隊(duì)列 1.2、設(shè)置過(guò)期時(shí)間TTL 1.3、配置死信交換機(jī)和死信隊(duì)列(代碼配置) (1)設(shè)置隊(duì)列過(guò)期時(shí)間 (2)設(shè)置單條消息過(guò)期時(shí)間 (3)隊(duì)列設(shè)置死信交換機(jī) (4)配置的基本思路 1.4、配置

    2024年02月16日
    瀏覽(95)
  • 消息隊(duì)列-RabbitMQ:延遲隊(duì)列、rabbitmq 插件方式實(shí)現(xiàn)延遲隊(duì)列、整合SpringBoot

    消息隊(duì)列-RabbitMQ:延遲隊(duì)列、rabbitmq 插件方式實(shí)現(xiàn)延遲隊(duì)列、整合SpringBoot

    1、延遲隊(duì)列概念 延時(shí)隊(duì)列內(nèi)部是有序的 , 最重要的特性 就體現(xiàn)在它的 延時(shí)屬性 上,延時(shí)隊(duì)列中的元素是希望在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來(lái)說(shuō), 延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列。 延遲隊(duì)列使用場(chǎng)景: 訂單在十分鐘之內(nèi)未支付則

    2024年02月22日
    瀏覽(19)
  • RabbitMq消息模型-隊(duì)列消息

    RabbitMq消息模型-隊(duì)列消息

    基本模型(SimpleQueue)、工作模型(WorkQueue) 隊(duì)列消息特點(diǎn): 消息不會(huì)丟失 并且 有先進(jìn)先出的順序。 消息接收是有順序的,不是隨機(jī)的,僅有一個(gè)消費(fèi)者能拿到數(shù)據(jù),而且不同消費(fèi)者拿不到同一份數(shù)據(jù)。 基本模型: SimpleQueue 在上圖的模型中,有以下幾個(gè)概念: P:為生產(chǎn)

    2024年02月09日
    瀏覽(30)
  • 【RabbitMQ】RabbitMQ 消息的堆積問(wèn)題 —— 使用惰性隊(duì)列解決消息的堆積問(wèn)題

    【RabbitMQ】RabbitMQ 消息的堆積問(wèn)題 —— 使用惰性隊(duì)列解決消息的堆積問(wèn)題

    消息的堆積問(wèn)題是指在消息隊(duì)列系統(tǒng)中,當(dāng)生產(chǎn)者以較快的速度發(fā)送消息,而消費(fèi)者處理消息的速度較慢,導(dǎo)致消息在隊(duì)列中積累并達(dá)到隊(duì)列的存儲(chǔ)上限。在這種情況下,最早被發(fā)送的消息可能會(huì)在隊(duì)列中滯留較長(zhǎng)時(shí)間,直到超過(guò)隊(duì)列的容量上限。當(dāng)隊(duì)列已滿且沒(méi)有更多的可

    2024年02月05日
    瀏覽(19)
  • 【圖解RabbitMQ-3】消息隊(duì)列RabbitMQ介紹及核心流程

    【圖解RabbitMQ-3】消息隊(duì)列RabbitMQ介紹及核心流程

    ?????作者名稱:DaenCode ??作者簡(jiǎn)介:CSDN實(shí)力新星,后端開(kāi)發(fā)兩年經(jīng)驗(yàn),曾擔(dān)任甲方技術(shù)代表,業(yè)余獨(dú)自創(chuàng)辦智源恩創(chuàng)網(wǎng)絡(luò)科技工作室。會(huì)點(diǎn)點(diǎn)Java相關(guān)技術(shù)棧、帆軟報(bào)表、低代碼平臺(tái)快速開(kāi)發(fā)。技術(shù)尚淺,閉關(guān)學(xué)習(xí)中······ ??人生感悟:嘗盡人生百味,方知世間冷暖。

    2024年02月09日
    瀏覽(20)
  • 消息隊(duì)列-RabbitMQ:workQueues—工作隊(duì)列、消息應(yīng)答機(jī)制、RabbitMQ 持久化、不公平分發(fā)(能者多勞)

    消息隊(duì)列-RabbitMQ:workQueues—工作隊(duì)列、消息應(yīng)答機(jī)制、RabbitMQ 持久化、不公平分發(fā)(能者多勞)

    Work Queues— 工作隊(duì)列 (又稱任務(wù)隊(duì)列) 的主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成 。 我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列,在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個(gè)工作線程時(shí),這些工作線程將一起處理這些任務(wù) 。 輪訓(xùn)分發(fā)消

    2024年02月21日
    瀏覽(23)
  • 【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機(jī),消息的 TTL 以及延遲隊(duì)列

    【RabbitMQ】 RabbitMQ 消息的延遲 —— 深入探索 RabbitMQ 的死信交換機(jī),消息的 TTL 以及延遲隊(duì)列

    消息隊(duì)列是現(xiàn)代分布式應(yīng)用中的關(guān)鍵組件,用于實(shí)現(xiàn)異步通信、解耦系統(tǒng)組件以及處理高并發(fā)請(qǐng)求。消息隊(duì)列可以用于各種應(yīng)用場(chǎng)景,包括任務(wù)調(diào)度、事件通知、日志處理等。在消息隊(duì)列的應(yīng)用中,有時(shí)需要實(shí)現(xiàn)消息的延遲處理、處理未能成功消費(fèi)的消息等功能。 本文將介紹

    2024年02月05日
    瀏覽(96)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包