一、MQ基本介紹
MQ(message queue):本質(zhì)上是個隊列,遵循FIFO原則,隊列中存放的是message,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。MQ提供“邏輯解耦+物理解耦”的消息通信服務(wù)。使用了MQ之后消息發(fā)送上游只需要依賴MQ,不需要依賴其它服務(wù)。
功能1:流量消峰
功能2:應(yīng)用解耦
功能3:異步處理
MQ的分類:
1.Kafka
2.RabbitMQ
RabbitMQ概念:
四大核心概念:
交換機(jī):
隊列:?
?六大核心模式:
1.簡單模式。2.工作模式。3.發(fā)布訂閱模式。4.路由模式。5.主題模式。6.發(fā)布確認(rèn)模式。
RabbitMQ工作原理:
Channer:信道,發(fā)消息的通道。
二、MQ下載安裝
下載:
1. 官網(wǎng)地址:https://www.rabbitmq.com/download.html。參考的下載地址如下:Linux下安裝RabbitMQ_rabbitmq下載_零碎de記憶的博客-CSDN博客
2.安裝Erlang環(huán)境
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers
3.下載Erlang,方式1:找到下面網(wǎng)址,在網(wǎng)址中下載rpm文件:
el/7/erlang-22.3.4.12-1.el7.x86_64.rpm - rabbitmq/erlang · packagecloud
或者直接輸入下面指令下載rpm文件:?
wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm
然后輸入下面的命令安裝已下載的安裝包:
yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm
4.下載RabbitMQ,輸入下面的下載
wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm
?輸入下面的命令進(jìn)行本地安裝:
yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm
5. 下載socat,檢查是否已下載:
yum install socat -y
注意以下的操作都要在 /usr/local/software目錄下查看:?
6.添加開機(jī)啟動RabbitMQ服務(wù):chkconfig rabbitmq-server on。啟動rabbitmq /sbin/service rabbitmq-server start。
7.查看服務(wù)狀態(tài) /sbin/service rabbitmq-server status
8.停止服務(wù) /sbin/service rabbitmq-server stop。重新查看服務(wù)狀態(tài)。
10.開啟web管理界面 sudo rabbitmq-plugins enable rabbitmq_management
11.查看防火墻狀態(tài):systemctl status firewalld。關(guān)閉防火墻:systemctl stop firewalld。關(guān)閉rabbit服務(wù)器輸入:sudo rabbitmqctl stop。開啟rabbit服務(wù)器輸入:sudo rabbitmq-server -detached。
12.在瀏覽器中輸入地址:Linux服務(wù)器ip地址:15672,可訪問web管理界面。
13.用戶名guest,密碼默認(rèn),但無法登陸,無權(quán)限。
14.rabbitmqctl list_users查看用戶。創(chuàng)建賬號 rabbitmqctl add_user admin 123。設(shè)置用戶角色為管理員 rabbitmqctl set_user_tags admin administrator。設(shè)置用戶權(quán)限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"。
15.再經(jīng)嘗試可以重新登錄:
三、創(chuàng)建Java開發(fā)環(huán)境
1.創(chuàng)建1個新項目,命名atguigu-rabbitmq,然后創(chuàng)建模塊Module。GroupId可以填寫:com.atguigu.rabbitmq,ArtifactId可以填rabbitmq-hello,選擇quickstart:
導(dǎo)入依賴如下:
<dependencies>
<!--rabbitmq依賴客戶端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的依賴-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version> <!-- 根據(jù)你的需求指定版本號 -->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
在下圖中,P是生產(chǎn)者,C是消費者。中間的框是一個隊列-RabbitMQ代表使用者保留的消息緩存區(qū)。
生產(chǎn)者代碼
public class producer {
//隊列名稱
public static final String QUEUE_NAME = "hello";
//發(fā)消息
public static void main( String[] args ) throws IOException, TimeoutException {
//第1步:創(chuàng)建一個連接工程
ConnectionFactory factory = new ConnectionFactory();
//第2步:輸入工廠IP,用戶名和密碼——連接RabbitMQd隊列
factory.setHost("192.168.182.136");
factory.setUsername("admin");
factory.setPassword("123");
//第3步:創(chuàng)建連接
Connection connection = factory.newConnection();
//第4步:獲取信道
Channel channel = connection.createChannel();
//第5步:生成一個隊列(隊列名稱,是否持久化,是否排他,自動刪除,隊列參數(shù))
//持久化:是否存儲入磁盤,默認(rèn)是將消息存儲在內(nèi)存中
//排他:隊列是否只供一個消費者消費,是否進(jìn)行消息共享,true可以供多個消費者消費
//自動刪除:最后一個消費者斷開連接后,該隊列是否自動刪除
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//第6步:發(fā)消息,(交換機(jī),路由key本次是隊列名,參數(shù),發(fā)送的消息)
String message = "hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息發(fā)送成功!??!");
}
}
消費者代碼
public class consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String [] args) throws IOException, TimeoutException {
//第1步:創(chuàng)建一個連接工程
ConnectionFactory factory = new ConnectionFactory();
//第2步:輸入工廠IP,用戶名和密碼——連接RabbitMQd隊列
factory.setHost("192.168.182.136");
factory.setUsername("admin");
factory.setPassword("123");
//第3步:創(chuàng)建連接
Connection connection = factory.newConnection();
//第4步:獲取信道
Channel channel = connection.createChannel();
//第5步:聲明,接收消息
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
};
//第6步:取消消息時的回調(diào)
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消費被中斷");
};
//第7步:接收,(隊列名,自動or手動,接收消息,回調(diào))
//1.消費哪個隊列;2.消費成功后是否要自動應(yīng)答true代表自動應(yīng)答,false表示手動應(yīng)答
//3.消費者未成功消費的回調(diào)
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
注意幾點:1.確保rabbitmq處于開啟的狀態(tài)(開啟方式見前面)2.最好讓防火墻處于關(guān)閉的狀態(tài) 3.最好通過方法左側(cè)的開關(guān)按鈕進(jìn)行啟動,確保啟動是選擇Current File。
?
四、工作隊列
工作隊列:又稱任務(wù)隊列,主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊列。在后臺運行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個工作線程時,這些工作線程將一起處理這些任務(wù)。
情況:生產(chǎn)者大量分發(fā)消息給隊列,工作線程接收隊列的消息,工作線程不止一個,三者關(guān)系時競爭關(guān)系,你一條我一條他一條,但要注意一個消息只能被處理一次,不能被處理多次。
重復(fù)性的代碼可以被抽取成為工具類。
在java — com — atguigu —?rabbitmq下創(chuàng)建utils包,工具類起名RabbitMqUtils,放入如下代碼:
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
//第1步:創(chuàng)建一個連接工程
ConnectionFactory factory = new ConnectionFactory();
//第2步:輸入工廠IP,用戶名和密碼——連接RabbitMQd隊列
factory.setHost("192.168.182.137");
factory.setUsername("admin");
factory.setPassword("123");
//第3步:創(chuàng)建連接
Connection connection = factory.newConnection();
//第4步:獲取信道
Channel channel = connection.createChannel();
return channel;
}
}
工作線程的更新后,worker01的代碼如下:
public static final String QUEUE_NAME = "hello";
public static void main(String [] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);//聲明隊列,沒有會報錯
//消息接收
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收到的消息:" + new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag + "消息被取消消費接口回調(diào)邏輯");
};
System.out.println("c1等待接收消息......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
重復(fù)利用one包下的consumer類,將其更改為c2工作線程:
Task01作為生產(chǎn)者用于生產(chǎn)數(shù)據(jù),與前面不同的是,Task01支持從IDEA控制臺輸入數(shù)據(jù):
public class Task01 {
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//從控制臺當(dāng)中接收信息
Scanner scanner = new Scanner(System.in); //掃描控制臺輸入內(nèi)容
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("發(fā)送消息完成..");
}
}
}
?
?
五、消息應(yīng)答
概念:
自動應(yīng)答:
手動應(yīng)答:
手動應(yīng)答好處,建議不批量應(yīng)答,選擇false:
?消息自動重新入隊:
原本正常傳輸,C1突然失去連接,檢測到C1斷開連接,于是會讓消息重新入隊,原本的消息交由C2進(jìn)行處理。
實驗思路:寫1個生產(chǎn)者,2個消費者,當(dāng)關(guān)閉掉其中1個工作線程,消息不丟失,還被另一個工作線程接收。消費在手動應(yīng)答時不丟失、放回隊列中重新消費。
消息手動應(yīng)答(生產(chǎn)者):
public class Task2 {
public static final String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(task_queue_name,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",task_queue_name,null,message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息:"+message);
}
}
}
消息手動應(yīng)答(消費者):
public class Work03 {
public static final String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息處理時間較短");
DeliverCallback deliverCallback = (consumerTag,message)->{
SleepUtils.sleep(1);
System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
//1.消息的標(biāo)記tag 2.是否批量應(yīng)答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//采用手動應(yīng)答
boolean autoAck = false;
channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{
System.out.println(consumerTag + "消費者取消消費接口回調(diào)邏輯");
}));
}
}
public class Work04 {
public static final String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接收消息處理時間較短");
DeliverCallback deliverCallback = (consumerTag,message)->{
SleepUtils.sleep(30);
System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
//1.消息的標(biāo)記tag 2.是否批量應(yīng)答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//采用手動應(yīng)答
boolean autoAck = false;
channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{
System.out.println(consumerTag + "消費者取消消費接口回調(diào)邏輯");
}));
}
}
實現(xiàn)效果:在生產(chǎn)者輸入AA BB CC DD EE等消息,消費者1接收速度快,會立即打印AA CC EE等消息,消費者2接收速度慢,會在一段時間后接收到BB,此時如果關(guān)閉消費者2,則消費者1輸出DD,表明消費在手動應(yīng)答時不丟失、放回隊列中重新消費。
六、持久化與分發(fā)
如果報錯,說明原本的隊列就是不持久化,此時無法設(shè)定持久化,只能先將隊列刪除然后再重新設(shè)定。
控制隊列持久化,需要修改生產(chǎn)者聲明函數(shù)的第2個參數(shù):
消息持久化:
隊列持久化和消息持久化不同,隊列是MQ里的一個組件,消息是生產(chǎn)者發(fā)送的消息。
如果要讓消息持久化,在發(fā)消息的時候就要通知隊列。
更改的是生產(chǎn)者的信道的basicPublish的第3個參數(shù),添加MessageProperties.PERSISTENT_TEXT_PLAIN
不公平分發(fā):?
消費者處理任務(wù)的速度不一致,為了不讓速度快的消費者長時間處于空閑狀態(tài),因此采用不公平分發(fā)。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
預(yù)取值:
前面N條數(shù)據(jù)分別交給誰處理,如下圖就是前7條數(shù)據(jù)中,2條給C1,5條給C2
七、發(fā)布確認(rèn)
原理:
1.設(shè)置要求隊列必須持久化:就算服務(wù)器宕機(jī),隊列也不至于消失。
2.設(shè)置要求隊列中的消息也必須持久化。
3. 發(fā)布確認(rèn),消息保存到磁盤上之后,隊列要告知生產(chǎn)者。
Channel channel = connection.createChannel();
channel.confirmSelect();
public static void main(String[] args){
}
單個發(fā)布確認(rèn):
是一種同步確認(rèn)發(fā)布的方式,發(fā)布消息-確認(rèn)消息-發(fā)布消息...必須要確認(rèn)后才能繼續(xù)發(fā)布,類似于一手交錢一手交貨,缺點是發(fā)布速度很慢。
1. 創(chuàng)建com/atguigu/rabbitmq/four文件夾下的ConfirmMessage
public static void publishMessageIndividually() throws Exception{
Channel channel = RabbitMqUtils.getChannel(); //獲取信道
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
channel.confirmSelect();//開啟發(fā)布確認(rèn)
long begin = System.currentTimeMillis();
for(int i=0;i<MESSAGE_COUNT;i++){
String message = i +"";
channel.basicPublish("",queueName,null,message.getBytes());
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息發(fā)送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布"+MESSAGE_COUNT+"個單獨確認(rèn)消息,耗時:"+(end-begin)+"ms");
}
批量發(fā)布確認(rèn):
public static void publishMessageBatch() throws Exception{
Channel channel = RabbitMqUtils.getChannel(); //獲取信道
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
channel.confirmSelect();//開啟發(fā)布確認(rèn)
long begin = System.currentTimeMillis();
int batchSize = 100; //批量確認(rèn)消息的大小
//批量發(fā)送消息,批量發(fā)布確認(rèn)
for(int i=0;i<MESSAGE_COUNT;i++){
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
//判斷達(dá)到100條消息的時候,批量確認(rèn)一次
if(i%batchSize==0) channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布"+MESSAGE_COUNT+"個批量確認(rèn)消息,耗時:"+(end-begin)+"ms");
}
異步發(fā)布確認(rèn):
map序列,key是消息序號(deliveryTag是消息的標(biāo)識,multiple是是否為批量),value是消息內(nèi)容,將消息每一條都編號,broker會對消息進(jìn)行應(yīng)答,分為兩種一種是確認(rèn)應(yīng)答,另一種是未確認(rèn)應(yīng)答。消息生產(chǎn)者不需要等待接收方的消息,只需要負(fù)責(zé)發(fā)送消息即可,消息是否應(yīng)答最終會以異步的形式回傳,也就是說確認(rèn)的時間可以是稍后的。
addConfirmListener單參數(shù)的是只能監(jiān)聽成功的,多參數(shù)的是可以監(jiān)聽成功也可以監(jiān)聽失敗的,都是接口需要自己來寫。
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.getChannel(); //獲取信道
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
channel.confirmSelect();//開啟發(fā)布確認(rèn)
long begin = System.currentTimeMillis();
//消息確認(rèn)成功,回調(diào)函數(shù)
ConfirmCallback ackCallback = (deliveryTag, multiple)->{
System.out.println("確認(rèn)的消息:"+deliveryTag);
};
//消息確認(rèn)失敗回調(diào)函數(shù)
ConfirmCallback nackCallback = (deliveryTag, multiple)->{
System.out.println("未確認(rèn)的消息:"+deliveryTag);
};
//準(zhǔn)備消息的監(jiān)聽器,監(jiān)聽哪些消息成功了,哪些消息失敗了
channel.addConfirmListener(ackCallback,nackCallback);
//批量發(fā)送消息
for(int i=0;i<MESSAGE_COUNT;i++){
String message="消息"+i;
channel.basicPublish("",queueName,null,message.getBytes());
//發(fā)布確認(rèn)
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布"+MESSAGE_COUNT+"個異步確認(rèn)消息,耗時:"+(end-begin)+"ms");
}
?處理異步未確認(rèn)消息:
最好的解決方案就是把未確認(rèn)的消息放到一個基于內(nèi)存的能被發(fā)布線程訪問的隊列,比如說用ConcurrentLinkedQueue這個隊列在confirm callbacks與發(fā)布線程之間進(jìn)行消息的傳遞。
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.getChannel(); //獲取信道
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
channel.confirmSelect();//開啟發(fā)布確認(rèn)
/*線程安全有序的一個哈希表,適用于高并發(fā)的情況下
1.輕松地將序號與消息進(jìn)行關(guān)聯(lián)
2.輕松地批量刪除條目只要給到序號
3.支持高并發(fā)(多線程)*/
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
//消息確認(rèn)成功,回調(diào)函數(shù)
ConfirmCallback ackCallback = (deliveryTag, multiple)->{
if(multiple){
//2.刪除掉已經(jīng)確認(rèn)的消息,剩下的就是未確認(rèn)的消息
ConcurrentNavigableMap<Long, String> confirmd =
outstandingConfirms.headMap(deliveryTag);
}else{
outstandingConfirms.remove(deliveryTag);
}
System.out.println("確認(rèn)的消息:"+deliveryTag);
};
//消息確認(rèn)失敗回調(diào)函數(shù)
ConfirmCallback nackCallback = (deliveryTag, multiple)->{
//3.打印一下未確認(rèn)的消息都有哪些
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未確認(rèn)的消息是:"+message+"未確認(rèn)的消息:"+deliveryTag);
};
//準(zhǔn)備消息的監(jiān)聽器,監(jiān)聽哪些消息成功了,哪些消息失敗了
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
//批量發(fā)送消息
for(int i=0;i<MESSAGE_COUNT;i++){
String message="消息"+i;
channel.basicPublish("",queueName,null,message.getBytes());
//1.此處記錄下所有發(fā)送的消息,消息的總和
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布"+MESSAGE_COUNT+"個異步確認(rèn)消息,耗時:"+(end-begin)+"ms");
}
}
三種方式對比:
八、交換機(jī)
一個消息可以被消費多次,需要通過交換機(jī),仍舊遵循隊列中的消息只能被消費一次。
?生產(chǎn)者生產(chǎn)的消息從不會直接發(fā)送到隊列。生產(chǎn)者將消息發(fā)送到交換機(jī)。交換機(jī)負(fù)責(zé)接收來自生產(chǎn)者的消息,將消息推入隊列。
Exchanges的類型:直接(direct),主題(topic),標(biāo)題(headers),扇出(fanout)
消息能路由發(fā)送到隊列中其實是由routingKey(bindingkey)綁定key指定的。
創(chuàng)建臨時隊列:
String queueName = channel.queueDedare().getQueue();
綁定:
根據(jù)Routing key來確定消息要發(fā)給哪個隊列,如果Routing Key相同消息就可以發(fā)送給多個隊列。
先添加一個隊列queue1,再添加一個交換機(jī)exchange1,最后點擊exchange1交換機(jī),進(jìn)入綁定菜單,然后輸入綁定的隊列是queue1,然后Routing key隨便設(shè)置為123。
Fanout交換機(jī)(廣播)
Fanout(扇出)是將接收到的所有消息廣播到它知道的所有隊列中。如果Routing Key相同則發(fā)送給隊列以相同消息。
生產(chǎn)者:
public class EmitLog {
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息"+message);
}
}
}
消費者:
public class ReceiveLogs01 {
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//聲明一個交換機(jī)
//聲明一個隊列臨時隊列,隊列的名稱是隨機(jī)的,當(dāng)消費者斷開與隊列的連接時候,隊列就刪除了
String queueName = channel.queueDeclare().getQueue();
//綁定交換機(jī)與隊列
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("ReceiveLogs01控制臺打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
}
}
效果:實現(xiàn)廣播的功能
Direct路由交換機(jī)
消費者1:
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("console",false,false,false,null);
channel.queueBind("console",EXCHANGE_NAME,"info"); //隊列名稱,交換機(jī)名稱,Routingkey
DeliverCallback deliverCallback =(consumerTag,message)->{
System.out.println("ReceiveLogsDirect01控制臺打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume("console",true,deliverCallback,consumerTag->{});
}
}
消費者2:
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("disk",false,false,false,null);
channel.queueBind("disk",EXCHANGE_NAME,"error"); //隊列名稱,交換機(jī)名稱,Routingkey
DeliverCallback deliverCallback =(consumerTag,message)->{
System.out.println("ReceiveLogsDirect02控制臺打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume("disk",true,deliverCallback,consumerTag->{});
}
}
生產(chǎn)者:?
public class DirectLogs {
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息"+message);
}
}
}
效果:
如果【channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));】的第2個參數(shù)填info就只會發(fā)送消息給消費者1,填寫error就只會發(fā)送消息給消費者2。
Topics主題交換機(jī)
發(fā)布(生產(chǎn)者)訂閱(消費者)模式:
消費者1:
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME="topic_logs";//交換機(jī)名稱
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName="Q1";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收隊列:"+queueName+" 綁定鍵:"+message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
}
}
消費者2:
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME="topic_logs";//交換機(jī)名稱
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName="Q2";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收隊列:"+queueName+" 綁定鍵:"+message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
}
}
?生產(chǎn)者1:
public class EmitLogTopic {
public static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Map<String,String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被隊列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant","被隊列Q1Q2接收到");
bindingKeyMap.put("quick.orange.fox","被隊列Q1接收到");
bindingKeyMap.put("lazy.brown.fox","被隊列Q2接收到");
bindingKeyMap.put("lazy.pink.rabbit","雖然滿足兩個綁定但只被隊列Q2接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何綁定不會被任何隊列接收到會被丟棄");
bindingKeyMap.put("quick.orange.male.rabbit","是四個單詞不匹配任何綁定會被丟棄");
bindingKeyMap.put("lazy.orange.male.rabbit","是四個單詞但匹配Q2");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息:"+message);
}
}
}
結(jié)果:
九、死信
無法被消費消息被稱為死信
死信的來源:
死信實戰(zhàn)架構(gòu)圖:
1個生產(chǎn)者2個消費者。生產(chǎn)者原本走正常交換機(jī),消息走正常隊列,被C1消費。當(dāng)滿足消息被拒絕,消息TTL過期,隊列達(dá)到最大長度三者其一時,消息成為死信,會進(jìn)入dead_exchange交換機(jī),進(jìn)入dead_queue死信隊列,死信隊列的信息由C2消費。
消費者1:
public class Consumer01 {
//普通交換機(jī)的名稱
public static final String NORMAL_EXCHANGE ="normal_exchange";
//死信交換機(jī)的名稱
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通隊列的名稱
public static final String NORMAL_QUEUE = "normal_queue";
//死信隊列的名稱
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel1 = RabbitMqUtils.getChannel();
//聲明死信和普通交換機(jī),類型為direct
channel1.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel1.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
Map<String,Object> arguments = new HashMap<>(); //設(shè)置參數(shù)
//正常隊列設(shè)置死信交換機(jī)
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //****相當(dāng)于正常的C1不能消費掉就通過這個交換機(jī)進(jìn)行轉(zhuǎn)發(fā)
//設(shè)置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "lisi");
//聲明普通隊列
channel1.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); //正常交換機(jī)不正常,需要將死信轉(zhuǎn)發(fā)給死信隊列
//聲明死信隊列
channel1.queueDeclare(DEAD_QUEUE,false,false,false,null);
//綁定普通的交換機(jī)與隊列
channel1.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//綁定死信的交換機(jī)與死信的隊列
channel1.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("Consumer01接收的消息是:" + new String(message.getBody(),"UTF-8"));
};
channel1.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{});
}
}
消費者2:
public class Consumer02 {
//死信隊列的名稱
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel1 = RabbitMqUtils.getChannel();
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("Consumer02接收的消息是:" + new String(message.getBody(),"UTF-8"));
};
channel1.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});
}
}
生產(chǎn)者:
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//死信消息,設(shè)置TTL時間,time to live
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
實驗步驟:首先確保rabbitMq網(wǎng)頁中normal_queue和dead_queue隊列都被刪除干凈,然后先啟動Consumer01將交換機(jī)和隊列聲明,然后關(guān)閉Consumer01模擬無法接收的場景,然后啟動Producer會發(fā)送10條消息,此時啟動Consumer02,最終發(fā)送的消息會在10秒后變成死信,Consumer02會接收到這些信息。
**易錯點:假如隊列已存在,會報錯,我們可以點進(jìn)隊列,
然后點擊Delete Queue即可:?
隊列達(dá)到最大長度:
更改Producer代碼:
更改Consumer01代碼:
實驗步驟:首先確保rabbitMq網(wǎng)頁中normal_queue和dead_queue隊列都被刪除干凈,然后先啟動Consumer01將交換機(jī)和隊列聲明,然后關(guān)閉Consumer01模擬假死,此時啟動Consumer02和Producer,因為隊列的最大容量為6,所以有4條消息會被發(fā)送到死信隊列。
拒絕應(yīng)答:
在Producer中確保設(shè)置TTL時間注釋調(diào),然后basicProperties改為null,
在Consumer01中把長度限制注釋掉,更改DeliverCallback后的代碼,如下:
DeliverCallback deliverCallback = (consumerTag,message)->{ //接收消息
String msg = new String(message.getBody(), "UTF-8");
if(msg.equals("info5")){
System.out.println("此消息是被Consumer01拒絕的消息是:" +msg);
channel1.basicReject(message.getEnvelope().getDeliveryTag(),false);
} else {
System.out.println("Consumer01接收的消息是:" +msg);
channel1.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
//開啟手動應(yīng)答,如果自動應(yīng)答了根本不存在拒絕的問題
channel1.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag->{});
注意一定要開啟手動應(yīng)答,手動拒絕消息info5,效果如下:文章來源:http://www.zghlxwxcb.cn/news/detail-728024.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-728024.html
到了這里,關(guān)于RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊列,集群,交換機(jī),持久化,生產(chǎn)者、消費者)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!