4、Work Queues
Work Queues— 工作隊列 (又稱任務隊列) 的主要思想是避免立即執(zhí)行資源密集型任務,而不得不等待它完成。我們把任務封裝為消息并將其發(fā)送到隊列,在后臺運行的工作進程將彈出任務并最終執(zhí)行作業(yè)。當有多個工作線程時,這些工作線程將一起處理這些任務。
輪訓分發(fā)消息
在這個案例中我們會啟動兩個工作線程,一個消息發(fā)送線程,我們來看看他們兩個工作線程是如何工作的。
1、抽取工具類
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
//得到一個連接的 channel
public static Channel getChannel() throws Exception {
//創(chuàng)建一個連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("42.192.149.23");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2、啟動兩個工作線程來接受消息
import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Worker01 {
//定義隊列
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//利用工具類創(chuàng)建信道
Channel channel = RabbitMqUtils.getChannel();
//消息接受,實現(xiàn)接口函數(shù)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
};
//消息被取消
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消費者取消消費接口回調(diào)邏輯");
};
//消費者消費
System.out.println("C1 消費者啟動等待消費.................. ");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
(1)開啟多線程
選中 Allow multiple instances:
(2)啟動后
3、啟動一個發(fā)送消息線程
public class Task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) { //hasNext,有下一個,就發(fā)送
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //生產(chǎn)者發(fā)送消息,理解各參數(shù)意思
System.out.println("消息發(fā)送完成:" + message);
}
}
}
啟動后
C1、C2,輪詢接受到消息
5、消息應答
消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅只完成了部分突然它掛掉了,會發(fā)生什么情況。
**RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消息標記為刪除。**在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息,以及后續(xù)發(fā)送給該消費者的消息,因為它無法接收到。
為了保證消息在發(fā)送過程中不丟失,引入消息應答機制,消息應答就是:消費者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。
1、自動應答
消息發(fā)送后立即被認為已經(jīng)傳送成功,這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)衡,因為這種模式如果消息在接收到之前,消費者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了。當然另一方面這種模式消費者那邊可以傳遞過載的消息,沒有對傳遞的消息數(shù)量進行限制,當然這樣有可能使得消費者這邊由于接收太多還來不及處理的消息,導致這些消息的積壓,使得內(nèi)存耗盡,最終這些消費者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費者可以高效并以某種速率能夠處理這些消息的情況下使用。
2、手動消息應答的方法
Channel.basicAck (用于肯定確認):RabbitMQ 已知道該消息成功被處理,可以將其丟棄了。
Channel.basicNack (用于否定確認)
Channel.basicReject (用于否定確認):與 Channel.basicNack 相比少一個參數(shù),不處理該消息了直接拒絕,可以將其丟棄了。
Multiple 的解釋:
手動應答的好處是可以批量應答并且減少網(wǎng)絡(luò)擁堵 。
true 代表批量應答 channel 上未應答的消息
false 同上面相比只會應答 tag=8 的消息
3、消息自動重新入隊
如果消費者由于某些原因失去連接 (其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導致消息未發(fā)送 ACK 確認,RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費者可以處理,它將很快將其重新分發(fā)給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確保不會丟失任何消息。
4、消息手動應答代碼
默認消息采用的是自動應答,所以我們要想實現(xiàn)消息消費過程中不丟失,需要把自動應答改為手動應答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
(1)消息生產(chǎn)者
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* 消息生產(chǎn)者,消息在手動應答時是不丟失的,放回隊列重新消費。
*
*
*/
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//聲明隊列
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner sc = new Scanner(System.in);
System.out.println("請輸入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
//發(fā)布消息
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息" + message);
}
}
}
(2)睡眠工具類
(3)消費者 01
(4)消費者02:把睡眠時間改成 30 秒
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Work03 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1 等待接收消息處理時間較 短");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到消息:" + message);
/**
* 1.消息標記 tag
* 2.是否批量應答未應答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (s) -> {
System.out.println(s + "消費者取消消費接口回調(diào)邏輯");
};
//采用手動應答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
啟動后:
中途停掉消費者02
消息由消費者01接收
理解:在生產(chǎn)者輪詢發(fā)送消息:cc和dd,給消費者01和消費者02的時候,消費者01處理的消息的時間較短,所以立馬就接受到了,但是消費者02處理消息的時間較長,需要30s。如果在消費者02處理消息的時候,將消費者02關(guān)閉,那么生產(chǎn)者發(fā)送的消息dd,雖然消費者02 無法接收了,但是也不會中途丟失,消息dd會重新返回到隊列中,然后再從隊列中,發(fā)送給消費者01。這就是RabbitMQ的消息應答機制。
6、RabbitMQ 持久化(3步走)
當 RabbitMQ 服務停掉以后,消息生產(chǎn)者發(fā)送過來的消息不丟失要如何保障?默認情況下 RabbitMQ 退出或由于某種原因崩潰時,它忽視隊列和消息,除非告知它不要這樣做。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久化。
1、隊列如何實現(xiàn)持久化(第1步)
之前創(chuàng)建的隊列都是非持久化的,rabbitmq 如果重啟的話,該隊列就會被刪除掉,如果要隊列實現(xiàn)持久化需要在聲明隊列的時候把 durable 參數(shù)設(shè)置為持久化。
//讓隊列持久化
boolean durable = true;
//聲明隊列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
(1)直接修改聲明會報錯
(2)在RabbitMQ的Web界面,手動刪除隊列
(3)將隊列持久化
2、消息持久化(第2步)
消息實現(xiàn)持久化需要在消息生產(chǎn)者修改代碼
MessageProperties.PERSISTENT_TEXT_PLAIN 添加這個屬性。
將消息標記為持久化并不能完全保證不會丟失消息。盡管它告訴 RabbitMQ 將消息保存到磁盤,但是這里依然存在當消息剛準備存儲在磁盤的時候但是還沒有存儲完,消息還在緩存的一個間隔點。此時并沒有真正寫入磁盤。持久性保證并不強,但是對于我們的簡單任務隊列而言,這已經(jīng)綽綽有余了。
7、不公平分發(fā)(能者多勞)
問題
在最開始的時候我們學習到 RabbitMQ 分發(fā)消息采用的輪訓分發(fā),但是在某種場景下這種策略并不是很好,比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非???/strong>,而另外一個消費者 2 處理速度卻很慢,這個時候我們還是采用輪訓分發(fā)的化就會到這處理速度快的這個消費者很大一部分時間處于空閑狀態(tài),而處理慢的那個消費者一直在干活,這種分配方式在這種情況下其實就不太好,但是 RabbitMQ 并不知道這種情況它依然很公平的進行分發(fā)。
為了避免這種情況,在消費者中消費之前,我們可以設(shè)置參數(shù) channel.basicQos(1)
需要看一下basicQos的源碼,參數(shù)設(shè)置的要求
//不公平分發(fā)
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//采用手動應答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
測試效果:
生產(chǎn)者連續(xù)發(fā)多幾條消息
讓處理消息速度快的消費者01,多接收一點消息
消費者02接受的少
時間長,所以少
理解:意思就是如果這個任務我還沒有處理完或者我還沒有應答你,你先別分配給我,我目前只能處理一個任務,然后 rabbitmq 就會把該任務分配給沒有那么忙的那個空閑消費者,當然如果所有的消費者都沒有完 成手上任務,隊列還在不停的添加新任務,隊列有可能就會遇到隊列被撐滿的情況,這個時候就只能添加新的 worker 或者改變其他存儲任務的策略。文章來源:http://www.zghlxwxcb.cn/news/detail-833810.html
消息隊列-RabbitMQ:workQueues—工作隊列、消息應答機制、RabbitMQ 持久化、不公平分發(fā)(能者多勞) 到此完結(jié),筆者歸納、創(chuàng)作不易,大佬們給個3連再起飛吧文章來源地址http://www.zghlxwxcb.cn/news/detail-833810.html
到了這里,關(guān)于消息隊列-RabbitMQ:workQueues—工作隊列、消息應答機制、RabbitMQ 持久化、不公平分發(fā)(能者多勞)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!