1、RabbitMQ是什么
RabbitMQ是一個(gè)開(kāi)源的遵循AMQP協(xié)議
實(shí)現(xiàn)的基于Erlang語(yǔ)言編寫,支持多種客戶端(語(yǔ)言)。用于在分布式系統(tǒng)中存儲(chǔ)消息,轉(zhuǎn)發(fā)消息
,具有高可用
,高可擴(kuò)性
,易用性
等特征。
1.1、RabbitMQ—使用場(chǎng)景
一般場(chǎng)景
像一般的下訂單業(yè)務(wù)如下圖:
將訂單信息寫入數(shù)據(jù)庫(kù)成功后,發(fā)送注冊(cè)郵件,再發(fā)送注冊(cè)短信。以上三個(gè)任務(wù)全部完成后,返回給客戶端,
像這樣耗時(shí)就很大 = 所有服務(wù)操作的耗時(shí)總和,而且若是這一整條執(zhí)行鏈某個(gè)環(huán)節(jié)出了問(wèn)題觸發(fā)回滾,得不償失
public void makeOrder(){
// 1 :保存訂單
orderService.saveOrder();
// 2: 發(fā)送短信服務(wù)
messageService.sendSMS("order");//1-2 s
// 3: 發(fā)送email服務(wù)
emailService.sendEmail("order");//1-2 s
// 4: 發(fā)送APP服務(wù)
appService.sendApp("order");
}
那么當(dāng)我們開(kāi)辟一個(gè)線程池去異步處理
的話,也存在缺點(diǎn):(最大的原因就是自己去實(shí)現(xiàn)起來(lái),因素過(guò)多,實(shí)現(xiàn)復(fù)雜)
存在問(wèn)題:
1:耦合度高
2:需要自己寫線程池自己維護(hù)成本太高
3:出現(xiàn)了消息可能會(huì)丟失,需要你自己做消息補(bǔ)償
4:如何保證消息的可靠性你自己寫
5:如果服務(wù)器承載不了,你需要自己去寫高可用
所以MQ就誕生了
只管下單,下單后直接就給用戶提示下單成功,別的事交給mq去派發(fā),讓別的服務(wù)去mq拿消息處理
用戶響應(yīng)耗時(shí) = 下單(主要)耗時(shí)(50ms) 別的(次要)處理服務(wù)全放到消息隊(duì)列當(dāng)中等待處理
public void makeOrder(){
// 1 :保存訂單
orderService.saveOrder();
rabbitTemplate.convertSend("ex","2","消息內(nèi)容");
}
解耦
發(fā)送方將消息發(fā)送到消息隊(duì)列中,接收方從隊(duì)列中獲取消息進(jìn)行處理。這種松耦合的通信模式可以提高系統(tǒng)的可擴(kuò)展性和靈活性。
這樣使得下單服務(wù)并不受,發(fā)短信、發(fā)郵件、等等服務(wù)的影響(前提是下單不依賴任何一個(gè)服務(wù)的返回值)
削峰
當(dāng)系統(tǒng)面臨突然的請(qǐng)求高峰時(shí),消息隊(duì)列可以起到緩沖的作用。請(qǐng)求先進(jìn)入消息隊(duì)列排隊(duì),然后逐個(gè)被處理,使得系統(tǒng)能夠逐漸消化高峰期的請(qǐng)求壓力,避免過(guò)載和故障。
也就是如果某個(gè)時(shí)刻有大量的請(qǐng)求,此時(shí)都會(huì)到mq里面去,而不會(huì)瞬間開(kāi)啟很多線程去異步執(zhí)行,從而達(dá)到銷峰的效果,使得即便大量的用戶請(qǐng)求來(lái)了,那系統(tǒng)處理請(qǐng)求還是非常平滑的
異步
消息隊(duì)列支持異步處理,即發(fā)送方發(fā)送消息后,并不需要等待接收方立即處理完成,而是繼續(xù)執(zhí)行其他任務(wù)。接收方在合適的時(shí)間從隊(duì)列中獲取消息進(jìn)行處理。這種異步處理可以提高系統(tǒng)的性能和響應(yīng)速度,尤其適用于處理耗時(shí)的操作。
好處
1:完全解耦,用MQ建立橋接
2:有獨(dú)立的線程池和運(yùn)行模型
3:出現(xiàn)了消息可能會(huì)丟失,MQ有持久化功能
4:如何保證消息的可靠性,死信隊(duì)列和消息轉(zhuǎn)移的等
5:如果服務(wù)器承載不了,你需要自己去寫高可用,HA鏡像模型高可用。
按照以上約定,用戶的響應(yīng)時(shí)間相當(dāng)于是訂單信息寫入數(shù)據(jù)庫(kù)的時(shí)間,也就是50毫秒。注冊(cè)郵件,發(fā)送短信寫入消息隊(duì)列后,直接返回,因此寫入消息隊(duì)列的速度很快,基本可以忽略,因此用戶的響應(yīng)時(shí)間可能是50毫秒。因此架構(gòu)改變后,系統(tǒng)的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了兩倍
2、Dokcer安裝RabbitMQ
2.1安裝Dokcer
- yum 包更新到最新
> yum update
- 安裝軟件包,yum-util提供yum-config-manager功能,另外兩個(gè)是devicemapper驅(qū)動(dòng)依賴的
> yum install -y yum-utils device-mapper-persistent-data lvm2
- 設(shè)置yum源為阿里云
> yum-config-manager --add-repo
> http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
- 安裝docker
> yum install docker-ce-y
- 安裝后查看docker版本
> docker-v
- 安裝加速鏡像
從阿里云獲取鏡像加速器:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://spukdfwp.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docke
2.2安裝rabbitmq
- 路徑:
https://www.rabbitmq.com/download.html
- 點(diǎn)擊上圖中標(biāo)紅線的 community Docker
image,跳轉(zhuǎn)到如下地址:https://registry.hub.docker.com/_/rabbitmq/
當(dāng)前可以看到安裝鏡像的時(shí)候可以設(shè)置用戶名,密碼,ip。就不用安裝完進(jìn)入容器內(nèi)部設(shè)置
3. 官網(wǎng)給的安裝案例
$ docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
4.命令講解
docker run -id --hostname my-rabbit --name=myrabbit -p 15672:15672 rabbitmq:3-management
--hostname:指定容器主機(jī)名稱
--name:指定容器名稱
-p:將mq端口號(hào)映射到本地
-e 設(shè)置
5.修改命令創(chuàng)建并安裝
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:3-management
6.阿里云開(kāi)放上方命令 設(shè)置的端口號(hào)
-p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883
7.安裝成功
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
rabbitmq 3-management 6c3c2a225947 7 months ago 253MB
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1de1f1e10cb0 rabbitmq:3-management "docker-entrypoint.s…" 6 minutes ago Up 6 minutes 4369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp, 0.0.0.0:25672->25672/tcp, :::25672->25672/tcp, 0.0.0.0:61613->61613/tcp, :::61613->61613/tcp, 15691-15692/tcp myrabbit
[root@iZbp1av1izm1qqcdfa0nndZ ~]#
8.停掉手動(dòng)安裝的rabbimq
systemctl stop rabbitmq-server
9.啟動(dòng)docker的rabbitmq容器
##查看容器
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1de1f1e10cb0 rabbitmq:3-management "docker-entrypoint.s…" 9 minutes ago Up 9 minutes 4369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp, 0.0.0.0:25672->25672/tcp, :::25672->25672/tcp, 0.0.0.0:61613->61613/tcp, :::61613->61613/tcp, 15691-15692/tcp myrabbit
##啟動(dòng)容器 docker start 容器id(CONTAINER ID)
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker start 1de1f1e10cb0
1de1f1e10cb0
[root@iZbp1av1izm1qqcdfa0nndZ ~]#
10.通過(guò)服務(wù)器(虛擬機(jī)ip+端口號(hào)(15672)
)訪問(wèn)RabbitMQ主頁(yè)http://192.168.157.128:15672
默認(rèn)登錄賬號(hào)和密碼都是admin
并且在admin賬號(hào)下可以通過(guò)增加用戶,給用戶不同角色,也就對(duì)應(yīng)不同的操作權(quán)限:
詳情如下:
3、RabbitMQ入門案例 - Simple 簡(jiǎn)單模式
1.實(shí)現(xiàn)步驟:
1:jdk1.8
2:構(gòu)建一個(gè)maven工程
3:導(dǎo)入rabbitmq的maven依賴
4:?jiǎn)?dòng)rabbitmq-server服務(wù)
5:定義生產(chǎn)者
6:定義消費(fèi)者
7:觀察消息的在rabbitmq-server服務(wù)中的過(guò)程
2.構(gòu)建一個(gè)maven工程
3.導(dǎo)入rabbitmq的maven依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.啟動(dòng)rabbitmq-server服務(wù)
systemctl start rabbitmq-server
或者
docker start myrabbit
5、定義生產(chǎn)者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 1: 創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 設(shè)置連接屬性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產(chǎn)者");
// 4: 從連接中獲取通道channel
channel = connection.createChannel();
// 5: 申明隊(duì)列queue存儲(chǔ)消息
/*
* 如果隊(duì)列不存在,則會(huì)創(chuàng)建
* Rabbitmq不允許創(chuàng)建兩個(gè)相同的隊(duì)列名稱,否則會(huì)報(bào)錯(cuò)。
*
* @params1: queue 隊(duì)列的名稱
* @params2: durable 隊(duì)列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果為true,會(huì)對(duì)當(dāng)前隊(duì)列加鎖,其他的通道不能訪問(wèn),并且連接自動(dòng)關(guān)閉
* @params4: autoDelete 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后是否自動(dòng)刪除消息。
* @params5: arguments 可以設(shè)置隊(duì)列附加參數(shù),設(shè)置隊(duì)列的有效期,消息的最大長(zhǎng)度,隊(duì)列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, true, null);
// 6: 準(zhǔn)備發(fā)送消息的內(nèi)容
String message = "你好,學(xué)相伴!??!";
// 7: 發(fā)送消息給中間件rabbitmq-server
// @params1: 交換機(jī)exchange
// @params2: 隊(duì)列名稱/routing
// @params3: 屬性配置
// @params4: 發(fā)送消息的內(nèi)容
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息發(fā)送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 7: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 8: 關(guān)閉連接
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
1:執(zhí)行發(fā)送,這個(gè)時(shí)候可以在web控制臺(tái)查看到這個(gè)隊(duì)列queue的信息
2:我們可以進(jìn)行對(duì)隊(duì)列的消息進(jìn)行預(yù)覽和測(cè)試如下:
3:進(jìn)行預(yù)覽和獲取消息進(jìn)行測(cè)試
NACK 只是做消息預(yù)覽,不會(huì)吧消息從隊(duì)列移除
ACK相當(dāng)于手動(dòng)的把消息處理了,這個(gè)時(shí)候就會(huì)把消息從隊(duì)列剔除,導(dǎo)致消息丟失
6、定義消費(fèi)者
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
// 所有的中間件技術(shù)都是基于tcp/ip協(xié)議基礎(chǔ)上構(gòu)建新型協(xié)議規(guī)范,只不過(guò)rabbitmq遵循的是amqp
// ip port
// 1: 創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 設(shè)置連接屬性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接工廠中獲取連接
connection = connectionFactory.newConnection("消費(fèi)者");
// 4: 從連接中獲取通道channel
channel = connection.createChannel();
channel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到的消息是:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收失敗了。。。");
}
});
System.out.println("開(kāi)始接收消息");
System.in.read();
}catch (Exception e){
e.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
}finally {
// 7: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消費(fèi)者和生產(chǎn)者的區(qū)別在于,消費(fèi)者是從mq中取消息,而生產(chǎn)者是從mq中存消息
4、RabbitMQ的核心組成部分
核心概念:
- Server:又稱Broker ,接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)。 安裝rabbitmq-server
- Connection:連接,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接 TCP/IP/ 三次握手和四次揮手
- Channel:網(wǎng)絡(luò)信道,幾乎所有的操作都在Channel中進(jìn)行,Channel是進(jìn)行消息讀寫的通道,客戶端可以建立對(duì)各Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)。
- Message:消息:服務(wù)與應(yīng)用程序之間傳送的數(shù)據(jù),由Properties和body組成,Properties可是對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí),延遲等高級(jí)特性,Body則就是消息體的內(nèi)容。
- Virtual Host 虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由,一個(gè)虛擬主機(jī)理由可以有若干個(gè)Exhange和Queueu,同一個(gè)虛擬主機(jī)里面不能有相同名字的Exchange
- Exchange:交換機(jī),接受消息,根據(jù)路由鍵發(fā)送消息到綁定的隊(duì)列。(不具備消息存儲(chǔ)的能力)
- Bindings:Exchange和Queue之間的虛擬連接,binding中可以保護(hù)多個(gè)routing key.
- Routing key:是一個(gè)路由規(guī)則,虛擬機(jī)可以用它來(lái)確定如何路由一個(gè)特定消息。
- Queue:隊(duì)列:也成為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者。
4.1 RabbitMQ整體架構(gòu)
4.2RabbitMQ的運(yùn)行流程
所以發(fā)送消息的時(shí)候沒(méi)有設(shè)置交換機(jī),rabbitmq發(fā)送消息一定會(huì)有默認(rèn)一個(gè)交換機(jī),并且消息不是直接到隊(duì)列當(dāng)中的,而是由交換機(jī)根據(jù)路由鍵發(fā)送消息到綁定的隊(duì)列
5、RabbitMQ的模式
5.1 發(fā)布訂閱模式–fanout
特點(diǎn):Fanout—發(fā)布與訂閱模式,是一種廣播機(jī)制
,它是沒(méi)有路由key的模式。
也就是只要生產(chǎn)者發(fā)送一條消息經(jīng)過(guò)交換機(jī)加入隊(duì)列中,左右的消費(fèi)者都能拿到消息
這里就直接用web界面演示
-
新建一個(gè)fanout模式的交換機(jī)(讓交換機(jī)代替生產(chǎn)者去發(fā)消息)
-
創(chuàng)建3個(gè)消息隊(duì)列q1、q2、q3
-
將隊(duì)列綁定到交換機(jī)上
-
由交換機(jī)代替生產(chǎn)者發(fā)送消息
-
然后三個(gè)隊(duì)列都會(huì)有一個(gè)交換機(jī)發(fā)來(lái)的消息
-
q1隊(duì)列消息正常被消費(fèi)者拾?。ㄆ渌?duì)列一樣)
-
q1隊(duì)列消息正常被消費(fèi)者拾取之后,隊(duì)列消息-1
ACK后 頁(yè)面在自動(dòng)會(huì)更新隊(duì)列消息條目,默認(rèn)5秒
5.2路由模式-direct模式
Direct模式是fanout模式上的一種疊加,增加了路由RoutingKey的模式
。
這樣就可以給指定設(shè)置了路由key的隊(duì)列發(fā)送消息,并且一個(gè)隊(duì)列可以有多個(gè)路由key,當(dāng)發(fā)送消息指定了路由key,則
只有設(shè)置了相對(duì)應(yīng)的路由key的隊(duì)列才能接收到消息
5.3路由模式-Topic模式
Topic模式是direct模式上的一種疊加,增加了模糊路由RoutingKey的模式。
*
代表一級(jí)(必須有一級(jí))#
代表0級(jí)或者多級(jí)
注意最好用代碼的形式來(lái)進(jìn)行綁定
在實(shí)際開(kāi)發(fā)中,我們既可以在
RabbitMq的web界面進(jìn)行交換機(jī)的創(chuàng)建,隊(duì)列的創(chuàng)建,綁定路由key等等操作。
還可以在生產(chǎn)者代碼里面通過(guò)channel.XXX的方式設(shè)置交換機(jī),設(shè)置隊(duì)列,設(shè)置路由key,等等,效果是一樣的
例如下面代碼:生產(chǎn)者(消費(fèi)者也可以聲明) 代碼實(shí)現(xiàn)【交換機(jī)和隊(duì)列】的聲明和綁定
package com.xxx.rabbitmq.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* RabbitMQ入門案例 - 完整的聲明方式創(chuàng)建
* 代碼實(shí)現(xiàn)創(chuàng)建交換機(jī)和隊(duì)列,并綁定關(guān)系
* 生產(chǎn)者
*/
public class Producer {
public static void main(String[] args) {
// 1: 創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 設(shè)置連接屬性
connectionFactory.setHost("121.196.153.197");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產(chǎn)者");
// 4: 從連接中獲取通道channel
channel = connection.createChannel();
// 5: 準(zhǔn)備發(fā)送消息的內(nèi)容
String message = "你好,交換機(jī)";
// 6:準(zhǔn)備交換機(jī)。取名規(guī)范 : 類型_業(yè)務(wù)模塊_交換機(jī)
String exchangeName = "direct_order_exchange";
// 7: 定義路由key
String routeKeyOrder = "order";
String routeKeyCourse = "course";
// 8: 指定交換機(jī)的類型
String exchangeType = "direct";
// 9: 聲明交換機(jī)/注冊(cè)交換機(jī)
// @params1: 交換機(jī)名稱
// @params2: 交換機(jī)類型
// @params3: 是否持久化 所謂的持久化就是值:交換機(jī)不會(huì)隨著服務(wù)器的重啟造成丟失,如果是true代表不丟失,false重啟丟失
channel.exchangeDeclare(exchangeName, exchangeType, true);
// 10: 聲明隊(duì)列/注冊(cè)隊(duì)列
// @params1: 隊(duì)列名稱
// @params2: 是否持久化
// @params3: 是不是排他性,是否是獨(dú)占獨(dú)立
// @params4: 是不是自動(dòng)刪除 隨著最后一個(gè)消費(fèi)者消息完畢消息以后是否把隊(duì)列自動(dòng)刪除
// @params5: 是不是有參數(shù) 參數(shù)攜帶可能會(huì)引發(fā)headers模式
channel.queueDeclare("queue5", true, false, false, null);
channel.queueDeclare("queue6", true, false, false, null);
channel.queueDeclare("queue7", true, false, false, null);
// 11: 綁定隊(duì)列
// @params1: 隊(duì)列名稱
// @params2: 交換機(jī)名稱
// @params3: routeKey
channel.queueBind("queue5", exchangeName, routeKeyOrder);
channel.queueBind("queue6", exchangeName, routeKeyOrder);
channel.queueBind("queue7", exchangeName, routeKeyCourse);
// 12: 發(fā)送消息給中間件rabbitmq-server
// @params1: 交換機(jī)exchange
// @params2: 隊(duì)列名稱
// @params3: 屬性配置
// @params4: 發(fā)送消息的內(nèi)容
channel.basicPublish(exchangeName, routeKeyOrder, null, message.getBytes());
System.out.println("消息發(fā)送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 13: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消費(fèi)者去隊(duì)列拿消息:
package com.xxx.rabbitmq.all;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* RabbitMQ入門案例 - 完整的聲明方式創(chuàng)建
* 消費(fèi)者
*/
public class Consumer {
private static Runnable runnable = new Runnable() {
public void run() {
// 1: 創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 設(shè)置連接屬性
connectionFactory.setHost("121.196.153.197");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//獲取隊(duì)列的名稱
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接工廠中獲取連接
connection = connectionFactory.newConnection();
// 4: 從連接中獲取通道channel
channel = connection.createChannel();
// 5: 申明隊(duì)列queue存儲(chǔ)消息
/*
* 如果隊(duì)列不存在,則會(huì)創(chuàng)建
* Rabbitmq不允許創(chuàng)建兩個(gè)相同的隊(duì)列名稱,否則會(huì)報(bào)錯(cuò)。
*
* @params1: queue 隊(duì)列的名稱
* @params2: durable 隊(duì)列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果為true,會(huì)對(duì)當(dāng)前隊(duì)列加鎖,其他的通道不能訪問(wèn),并且連接自動(dòng)關(guān)閉
* @params4: autoDelete 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后是否自動(dòng)刪除消息。
* @params5: arguments 可以設(shè)置隊(duì)列附加參數(shù),設(shè)置隊(duì)列的有效期,消息的最大長(zhǎng)度,隊(duì)列的消息生命周期等等。
* */
// 這里如果queue已經(jīng)被創(chuàng)建過(guò)一次了,可以不需要定義
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定義接受消息的回調(diào)
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":開(kāi)始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 7: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 啟動(dòng)三個(gè)線程去執(zhí)行
new Thread(runnable, "queue5").start();
new Thread(runnable, "queue6").start();
new Thread(runnable, "queue7").start();
}
}
5.4輪詢模式 - Work模式
5.4.1Work模式 - 輪詢模式(Round-Robin)
輪詢模式的分發(fā):一個(gè)消費(fèi)者一條,按均分配;
生產(chǎn)者:
package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author: 學(xué)相伴-飛哥
* @description: Producer 簡(jiǎn)單隊(duì)列生產(chǎn)者
* @Date : 2021/3/2
*/
public class Producer {
public static void main(String[] args) {
// 1: 創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 設(shè)置連接屬性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產(chǎn)者");
// 4: 從連接中獲取通道channel
channel = connection.createChannel();
// 6: 準(zhǔn)備發(fā)送消息的內(nèi)容
//===============================end topic模式==================================
for (int i = 1; i <= 20; i++) {
//消息的內(nèi)容
String msg = "學(xué)相伴:" + i;
// 7: 發(fā)送消息給中間件rabbitmq-server
// @params1: 交換機(jī)exchange
// @params2: 隊(duì)列名稱/routingkey
// @params3: 屬性配置
// @params4: 發(fā)送消息的內(nèi)容
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息發(fā)送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 7: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消費(fèi)者work1:
package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author: 學(xué)相伴-飛哥
* @description: Consumer
* @Date : 2021/3/2
*/
public class Work1 {
public static void main(String[] args) {
// 1: 創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 設(shè)置連接屬性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接工廠中獲取連接
connection = connectionFactory.newConnection("消費(fèi)者-Work1");
// 4: 從連接中獲取通道channel
channel = connection.createChannel();
// 5: 申明隊(duì)列queue存儲(chǔ)消息
/*
* 如果隊(duì)列不存在,則會(huì)創(chuàng)建
* Rabbitmq不允許創(chuàng)建兩個(gè)相同的隊(duì)列名稱,否則會(huì)報(bào)錯(cuò)。
*
* @params1: queue 隊(duì)列的名稱
* @params2: durable 隊(duì)列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果為true,會(huì)對(duì)當(dāng)前隊(duì)列加鎖,其他的通道不能訪問(wèn),并且連接自動(dòng)關(guān)閉
* @params4: autoDelete 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后是否自動(dòng)刪除消息。
* @params5: arguments 可以設(shè)置隊(duì)列附加參數(shù),設(shè)置隊(duì)列的有效期,消息的最大長(zhǎng)度,隊(duì)列的消息生命周期等等。
* */
// 這里如果queue已經(jīng)被創(chuàng)建過(guò)一次了,可以不需要定義
// channel.queueDeclare("queue1", false, false, false, null);
// 同一時(shí)刻,服務(wù)器只會(huì)推送一條消息給消費(fèi)者
// 6: 定義接受消息的回調(diào)
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-開(kāi)始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 7: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
work2
package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author: 學(xué)相伴-飛哥
* @description: Consumer
* @Date : 2021/3/2
*/
public class Work2 {
public static void main(String[] args) {
// 1: 創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 設(shè)置連接屬性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接工廠中獲取連接
connection = connectionFactory.newConnection("消費(fèi)者-Work2");
// 4: 從連接中獲取通道channel
channel = connection.createChannel();
// 5: 申明隊(duì)列queue存儲(chǔ)消息
/*
* 如果隊(duì)列不存在,則會(huì)創(chuàng)建
* Rabbitmq不允許創(chuàng)建兩個(gè)相同的隊(duì)列名稱,否則會(huì)報(bào)錯(cuò)。
*
* @params1: queue 隊(duì)列的名稱
* @params2: durable 隊(duì)列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果為true,會(huì)對(duì)當(dāng)前隊(duì)列加鎖,其他的通道不能訪問(wèn),并且連接自動(dòng)關(guān)閉
* @params4: autoDelete 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后是否自動(dòng)刪除消息。
* @params5: arguments 可以設(shè)置隊(duì)列附加參數(shù),設(shè)置隊(duì)列的有效期,消息的最大長(zhǎng)度,隊(duì)列的消息生命周期等等。
* */
// 這里如果queue已經(jīng)被創(chuàng)建過(guò)一次了,可以不需要定義
//channel.queueDeclare("queue1", false, true, false, null);
// 同一時(shí)刻,服務(wù)器只會(huì)推送一條消息給消費(fèi)者
//channel.basicQos(1);
// 6: 定義接受消息的回調(diào)
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-開(kāi)始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 7: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
work1和work2的消息處理能力不同,但是
最后處理的消息條數(shù)相同,是“按均分配
”。
往隊(duì)列發(fā)送20條消息,
結(jié)果就是輪詢給消費(fèi)者拿消息,即便有的消費(fèi)者消費(fèi)很快(也只能按照新順序拿消息),也只能按照輪詢一個(gè)一個(gè)拿,
也就是說(shuō),不會(huì)因?yàn)槟硞€(gè)消費(fèi)者所在的服務(wù)器滿,而導(dǎo)致少消費(fèi),一定是公平消費(fèi)
5.4.1Work模式 - 公平分發(fā)模式(Round-Robin)
根據(jù)消費(fèi)者的消費(fèi)能力進(jìn)行公平分發(fā),處理快的處理的多,處理慢的處理的少;按勞分配
;
相比較輪詢模式,公平分發(fā)的不同在于:修改應(yīng)答方式為手動(dòng)
qos設(shè)置為1,就代表消費(fèi)者拿到cpu的執(zhí)行權(quán)就每次從只拿走一條消息,一條一條的拿。若不設(shè)置,就是默認(rèn)輪詢拿一條
所以根據(jù)隊(duì)列堆積的消息條數(shù)以及內(nèi)存和磁盤空間來(lái)合理設(shè)置qos
這個(gè)時(shí)候,性能好的消費(fèi)者就會(huì)消費(fèi)得多,而性能差的消費(fèi)者就消費(fèi)得少,能者多勞文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-659284.html
更新中------
參考來(lái)自:狂神文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-659284.html
到了這里,關(guān)于【RabbitMQ】消息隊(duì)列-RabbitMQ篇章的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!