目錄
一:中間件
二:分布式消息隊列?
2.1:是消息隊列
2.1.1:消息隊列的優(yōu)勢
2.1.1.1:異步處理化
2.1.1.2:削峰填谷
2.2:分布式消息隊列
2.2.1:分布式消息隊列的優(yōu)勢
2.2.1.1:數(shù)據(jù)的持久化
2.2.1.2:可擴展性
2.2.1.3:應(yīng)用解耦
2.2.1.4:發(fā)送訂閱?
2.2.2:分布式消息隊列的應(yīng)用場景?
三:Rabbitmq
3.1:基本概念
3.2:快速入門?
3.2.1:引入消息隊列Java客戶端
3.2.2:單消費開發(fā)生產(chǎn)者和消費者
?3.2.3:多消費開發(fā)生產(chǎn)者和消費者
3.3.3:交換機
3.3.3.1:交換機的類別
a):fanout
一:中間件
連接多個系統(tǒng),幫助多個系統(tǒng)緊密協(xié)作的技術(shù)(組件)
二:分布式消息隊列?
2.1:是消息隊列
概念:存儲消息的隊列
關(guān)鍵詞:存儲,消息,隊列
存儲:存儲數(shù)據(jù)
消息:某種數(shù)據(jù)結(jié)構(gòu),比如l字符串,對象,二進制數(shù)據(jù),json等
隊列:先進先出的數(shù)據(jù)結(jié)構(gòu)
作用:在不同的系統(tǒng)下,應(yīng)用之間實現(xiàn)消息的傳輸,不需要考慮傳輸應(yīng)用的編程語言,系統(tǒng)和,框架等等,實現(xiàn)應(yīng)用解耦的作用。
? ? ? ? eg:可以讓Java開發(fā)的應(yīng)用發(fā)消息,讓php開發(fā)的應(yīng)用收消息。
?針對生產(chǎn)者來說:不需要關(guān)心消費者什么時候接受消息,什么時候消費,我只需要把我的工作完成就好了。生產(chǎn)者和消費者之間實現(xiàn)了解耦。
針對上圖,同樣我們會發(fā)現(xiàn),當(dāng)小李要別的書籍的時候,小王也可以將別的書籍放到消息隊列中。生產(chǎn)者和消費者從某一種程度上實現(xiàn)了解耦合。
2.1.1:消息隊列的優(yōu)勢
2.1.1.1:異步處理化
生產(chǎn)者發(fā)送消息之后,可以繼續(xù)去忙別的,消費者什么時候消費都可以,不產(chǎn)生阻塞。
2.1.1.2:削峰填谷
先把用戶的請求放到消息隊列種,消費者(實際執(zhí)行操作的應(yīng)用)可以按照自己的需求,慢慢去取。
舉個栗子:
原本:
12點時來了10萬個請求,原本情況下,10萬個請求都在系統(tǒng)內(nèi)部立刻處理,很快系統(tǒng)壓力過大宕機。
現(xiàn)在:
把10萬個請求放到消息隊列中,處理系統(tǒng)以自己的恒定速率(比如每秒1個)慢慢執(zhí)行,穩(wěn)定處理。
2.2:分布式消息隊列
2.2.1:分布式消息隊列的優(yōu)勢
分布式消息隊列繼承于消息隊列的優(yōu)勢,并進行了一部分的拓展。
2.2.1.1:數(shù)據(jù)的持久化
把消息集中存儲在硬盤當(dāng)中,服務(wù)器重啟就不會丟失。
2.2.1.2:可擴展性
可以根據(jù)需求,隨時增加(或減少)節(jié)點,繼續(xù)保持穩(wěn)定的服務(wù)。
2.2.1.3:應(yīng)用解耦
可以連接不同語言(Java,PHP),框架開發(fā)的系統(tǒng),讓這些系統(tǒng)讀取數(shù)據(jù)。
示例:
以前的項目:
加了分布式消息隊列之后的項目:?
1:一個系統(tǒng)掛了,不影響另一個系統(tǒng)。
2:系統(tǒng)掛了之后并恢復(fù),仍然可以從消息隊列中取消息
3:只要發(fā)送消息到隊列,就可以立即進行返回,不用同步調(diào)用所有系統(tǒng),性能更高
2.2.1.4:發(fā)送訂閱?
假設(shè)情景:當(dāng)QQ進行了一部分改革之后,其他使用QQ的APP也應(yīng)該處理
這部分改革。
QQ做了一個情景,要讓其他系統(tǒng)知道,比如公告消息。如果QQ一次性給這些應(yīng)用發(fā)消息,所引出的問題如下:
1.每次發(fā)通知都要調(diào)用很多系統(tǒng),很麻煩,很可能失敗
2.不知道哪個系統(tǒng)需要這些QQ的改革。
解決方案:大的核心系統(tǒng)始終往消息隊列發(fā)消息,其他的系統(tǒng)都去訂閱這個消息隊列的消息,用的時候進行取就OK。
2.2.2:分布式消息隊列的應(yīng)用場景?
1:耗時場景。
2:高并發(fā)場景。
3:分布式系統(tǒng)的協(xié)作。(跨團隊,跨業(yè)務(wù)合作,應(yīng)用解耦)
4:強穩(wěn)定的場景(金融業(yè)務(wù),持久化,可靠性,削鋒填谷)??
三:Rabbitmq
特點:生態(tài)好,易學(xué)習(xí),易于理解,時效性強,支持不同語言的客戶端,擴展性,可用性都很不錯。
3.1:基本概念
AMPQ協(xié)議:Rabbitmq是遵循AMPQ協(xié)議的一種消息中間件。
生產(chǎn)者:發(fā)消息到交換機
消費者:收消息的,從某個隊列中取消息
交換機(exchange):負(fù)責(zé)把消息轉(zhuǎn)發(fā)到對應(yīng)的隊列
隊列(Queue):存儲消息的
路由(Rountes):轉(zhuǎn)發(fā),怎么把一個消息從一個地方轉(zhuǎn)發(fā)到另一個地方(比如生產(chǎn)者轉(zhuǎn)發(fā)到某個隊列)
Rabbitmq:端口占用? ?5672:程序連接的端口 15672:管理界面端口
Rabbitmq的安裝:https://blog.csdn.net/qq_25919879/article/details/113055350
管理器頁面打不開:
3.2:快速入門?
3.2.1:引入消息隊列Java客戶端
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.17.0</version>
</dependency>
3.2.2:單消費開發(fā)生產(chǎn)者和消費者
生產(chǎn)者端代碼:
public class SingeProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
//頻道相當(dāng)于客戶端(jdbcClient,redisClient),提供了和消隊列server建立通信,程序通過channel進行發(fā)送消息
Channel channel = connection.createChannel()) {
//創(chuàng)建消息隊列,第二個參數(shù)(durable):是否開啟持久化,第三個參數(shù)exclusiove:是否允許當(dāng)前這個創(chuàng)建消息隊列的
//連接操作消息隊列 第四個參數(shù):沒有人使用隊列,是否需要刪除
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//發(fā)送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消費者代碼:
public class SingeConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//創(chuàng)建頻道,提供通信
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//如何處理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
?3.2.3:多消費開發(fā)生產(chǎn)者和消費者
場景:一個生產(chǎn)者給隊列里面發(fā)了一條消息,多個消費者進行消費。適用于多個機器同時去接收并處理任務(wù)(每個機器處理任務(wù)有限)
隊列持久化:
durable:
參數(shù)設(shè)置為true,服務(wù)器隊列不丟失
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
?消息持久化:
?指定MessageProperties.PERSISTENY_TEXT_PLAIN參數(shù)
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
生產(chǎn)者端代碼:
public class MultiProducer {
private static final String TASK_QUEUE_NAME = "multi_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
消費者代碼:?
在消費者代碼中,如何測驗一個消費者只能取一個任務(wù),我們利用for循環(huán)來進行解決。
指定確認(rèn)某條消息:
第一個參數(shù):獲取消息的信息
第二個參數(shù):如果是true,把所有的歷史消息全都確認(rèn)了。如果為false,取出當(dāng)前的消息。
//第二個參數(shù):是否一次性取所有的消息。如果為true,則要取所有的擠壓在消息隊列中的消息
//如果為false,則為一次性取一個消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
指定拒絕某條消息
第一個參數(shù):獲取消息的信息
第二個參數(shù):如果是true,則代表是否要拒絕所有的歷史消息。
第三個參數(shù):如果是false, 則代表失敗的任務(wù)是否要重新入隊。
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
public class MultiConsumer {
private static final String TASK_QUEUE_NAME = "multi_queue";
public static void main(String[] argv) throws Exception {
//建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
for (int i = 0; i <= 2; i++) {
final Channel channel = connection.createChannel();
int finalI=i;
//聲明隊列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//控制單個消費者的任務(wù)積壓數(shù):每個消費者最多處理一個任務(wù),每個消費者智能處理一個任務(wù)
channel.basicQos(1);
//處理從隊列中取的的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
//處理工作
System.out.println(" [x] Received '" +"編號:"+finalI+ message + "'");
//停20秒模擬一個機器處理工作能力有限
Thread.sleep(20000);
//第二個參數(shù):是否一次性取所有的消息。如果為true,則要取所有的擠壓在消息隊列中的消息
//如果為false,則為一次性取一個消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//開啟消費監(jiān)聽
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
}
3.3.3:交換機
一個生產(chǎn)者給多個隊列發(fā)消息,一個生產(chǎn)者對多個隊列。交換機:轉(zhuǎn)發(fā)功能,怎么把消息轉(zhuǎn)發(fā)到不同的隊列上。
3.3.3.1:交換機的類別
a):fanout
場景:很適用于發(fā)布訂閱的場景。
特點:消息會被轉(zhuǎn)發(fā)到所有綁定到交換機的隊列。
生產(chǎn)者代碼:當(dāng)生產(chǎn)者發(fā)送消息后,由交換機放到消息隊列中,消費者從消息隊列中取。
public class FonoutProducer {
private static final String EXCHANGE_NAME = "1";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//創(chuàng)建交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
消費者代碼:
public class FonoutConsumer {
private static final String EXCHANGE_NAME = "1";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Channel channel2= connection.createChannel();
//聲明交換機
//創(chuàng)建隊列,隨機分配一個隊列名稱
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName="xiaowang";
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName2="xiaoli";
channel2.queueDeclare(queueName2,true,false,false,null);
channel2.queueBind(queueName2,EXCHANGE_NAME,"");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [小王] Received '" + message + "'");
};
DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [小李] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
}
}
運行結(jié)果:
文章來源:http://www.zghlxwxcb.cn/news/detail-715747.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-715747.html
到了這里,關(guān)于分布式消息隊列:RabbitMQ(1)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!