一、初見MQ
(一)什么是MQ?
MQ(MessageQueue),意思是消息隊(duì)列,也就是事件驅(qū)動(dòng)架構(gòu)中的Broker。
(二)同步調(diào)用
1、概念: 同步調(diào)用是指,某一服務(wù)需要多個(gè)服務(wù)共同參與,但多個(gè)服務(wù)之間有一定的執(zhí)行順序,當(dāng)每一個(gè)服務(wù)都需要等待前面一個(gè)服務(wù)完成才能繼續(xù)執(zhí)行。
2、存在的問題
- 耦合度高: 新需求需要改動(dòng)原代碼
- 性能下降: 調(diào)用者需要等待服務(wù)提供者相應(yīng),如果調(diào)用鏈過長(zhǎng)則響應(yīng)時(shí)間等于每次調(diào)用的時(shí)間之和。
- 資源浪費(fèi): 調(diào)用鏈的每個(gè)服務(wù)在等待響應(yīng)過程中,不會(huì)釋放請(qǐng)求資源,高并發(fā)場(chǎng)景下會(huì)浪費(fèi)系統(tǒng)資源。
- 級(jí)聯(lián)失?。?/strong> 若服務(wù)提供者出現(xiàn)宕機(jī),所有調(diào)用者都會(huì)因故障而導(dǎo)致整個(gè)服務(wù)集群故障。
(三)異步調(diào)用
1、實(shí)現(xiàn)模式: 異步調(diào)用常見實(shí)現(xiàn)的就是事件驅(qū)動(dòng)模式。
2、事件驅(qū)動(dòng)的優(yōu)勢(shì)
- 服務(wù)解耦: 只需要將請(qǐng)求交付給事件管理器進(jìn)行管理即可完成服務(wù)。
- 性能提升: 與客戶交互的服務(wù)短時(shí)間就能完成,并不需要等待后續(xù)服務(wù)完成。
- 服務(wù)弱依賴: 其它服務(wù)宕機(jī)不影響服務(wù)集群的使用
- 流量緩沖: 事件管理器通過任務(wù)隊(duì)列的方式,使得訂閱的服務(wù)按照自身速度進(jìn)行執(zhí)行。
3、事件驅(qū)動(dòng)的缺點(diǎn)
- 高度依賴Broker的可靠性、安全性、吞吐能力
- 架構(gòu)復(fù)雜時(shí),業(yè)務(wù)沒有明顯的流程線,不便于跟蹤管理
(四)MQ常見框架
RabbitMQ(中小企業(yè)) | ActiveMQ | RocketMQ(大型企業(yè)) | Kafka | |
---|---|---|---|---|
公司/社區(qū) | Rabbit | Apache | Alibaba | Apache |
開發(fā)語言 | Erlang | Java | Java | Java |
協(xié)議支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協(xié)議 | 自定義協(xié)議 |
可用性 | 高 | 一般 | 高 | 高 |
單機(jī)吞吐量 | 一般 | 差 | 高 | 極高 |
消息延遲 | 微妙級(jí) | 毫秒級(jí) | 毫秒級(jí) | 毫秒以內(nèi) |
消息可靠 | 高 | 一般 | 高 | 一般 |
二、使用MQ
(一)RabbitMQ概述
RqbbitMQ是基于Erlang語言開發(fā)的開源消息通訊中間件,官方地址:https://rabbitmq.com/
(二)安裝MQ
docker pull rabbitmq:3-management
(三)運(yùn)行RabbitMQ
#配置 MQ的用戶名和密碼,容器名和主機(jī)名,端口,鏡像名 ,注意:15672端口是MQ的控制臺(tái)訪問端口,5672是對(duì)外暴露的消息通信端口
docker run -e RABBITMQ_DEFAULT_USER=xxx -e RABBITMQ_DEFAULT_PASS=xxxx --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
訪問MQ的控制臺(tái)
(4)RabbitMQ的整體結(jié)構(gòu)
(5)RabbitMQ中的幾個(gè)概念
- channel: 操作MQ的工具
- exchange: 路由消息到隊(duì)列
- queue: 緩存消息
- Virtual Host: 虛擬主機(jī),是對(duì)queue,exchange等資源進(jìn)行邏輯分組
(6)常見的MQ模型
- 基本消息隊(duì)列(BasicQueue): Publisher —1:1— Queue —1:1— Customer
- 工作消息隊(duì)列(WorkQueue): Publisher —1:1— Queue —1:n— Customer
-
發(fā)布/訂閱(Publish、Subscribe): 根據(jù)交換機(jī)類型又有三種模型
- Fanout Exchange: 廣播,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
- Direct Exchange: 路由,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
- Topic Exchange: 主題,
- RPC
- 發(fā)布者確認(rèn)
第一種:基本消息隊(duì)列的基本使用
包含三種角色:publisher、queue、consumer
- publisher: 消費(fèi)發(fā)布者,將消息發(fā)布到隊(duì)列queue
- queue: 消息隊(duì)列,負(fù)責(zé)接受并緩存消息
- consumer: 訂閱隊(duì)列,處理隊(duì)列中的消息
收發(fā)消息的過程: 獲取連接 》 建立通信通道 》 創(chuàng)建消息隊(duì)列 》 收發(fā)消息 》 釋放資源
1、publisher和consumer引入依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
2、Publisher創(chuàng)建發(fā)送消息通道
@SpringBootTest
class PublisherApplicationTests {
@Test
void testSendMessage() throws IOException, TimeoutException {
// 1、建立連接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設(shè)置連接參數(shù)
connectionFactory.setHost("192.168.92.131");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
// 3、建立連接
Connection connection = connectionFactory.newConnection();
// 4、建立通信通道Channel
Channel channel = connection.createChannel();
// 5、創(chuàng)建隊(duì)列
String queueName = "simple.queue";
channel.queueDeclare(queueName,false,false,false,null);
// 6、發(fā)送信息
String message = "hello,rabbitmq!";
channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("發(fā)送消息成功:【"+message+"】");
// 7、關(guān)閉通道和連接
channel.close();
connection.close();
}
}
2、Consumer創(chuàng)建訂閱通道
class ConsumerApplicationTests {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、建立連接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設(shè)置連接參數(shù)
connectionFactory.setHost("192.168.92.131");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
// 3、建立連接
Connection connection = connectionFactory.newConnection();
// 4、建立通信通道Channel
Channel channel = connection.createChannel();
// 5、創(chuàng)建隊(duì)列
String queueName = "simple.queue";
channel.queueDeclare(queueName,false,false,false,null);
// 6、訂閱消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 7、處理消息
String message = new String(body);
System.out.println("接收到消息:【"+message+"】");
}
});
System.out.println("等待接收消息....");
}
}
第二種:Work Queue 工作隊(duì)列
與基本隊(duì)列的區(qū)別在于,它能使用多個(gè)訂閱隊(duì)列進(jìn)行高效的處理請(qǐng)求。(因?yàn)橐粋€(gè)訂閱隊(duì)列的處理速度是有限的)
使用過程與基本隊(duì)列幾乎一致,只是開啟了多個(gè)訂閱隊(duì)列。
在使用過程中我們會(huì)發(fā)現(xiàn),多個(gè)訂閱隊(duì)列對(duì)任務(wù)的分配是平均的,這就是預(yù)取機(jī)制。
我們需要的是快速處理的訂閱隊(duì)列獲取更多的請(qǐng)求,慢速處理的訂閱隊(duì)列獲取少量的請(qǐng)求,它如何實(shí)現(xiàn)呢?
通過修改配置文件,設(shè)置一個(gè) preFetch 值。
spring:
rabbitmq:
host: 192.168.92.131 #IP
port: 5672 #端口
virtual-host: / #虛擬主機(jī)
username: root #用戶名
password: root #密碼
listener:
simple:
prefetch: 1 # 每次取 1 個(gè)請(qǐng)求,處理完才能取下一個(gè)。
第三種:FanoutQueue 廣播消息隊(duì)列
SpringAMQP提供聲明交換機(jī)、隊(duì)列、綁定關(guān)系的API
主要使用的是Exchange.FanoutExchange類。
實(shí)現(xiàn)思路:
1、在consumer服務(wù),聲明隊(duì)列,交換機(jī),并將兩者綁定。
@Configuration
public class FanoutConfig{
//交換機(jī)
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("com.fanout");
}
//隊(duì)列
@Bean
public Queue fanoutQueue1(){
return new Queue("com.queue1");
}
//綁定關(guān)系
@Bean
public Binding bindingQueue(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
//...以相同方式聲明第2個(gè)隊(duì)列,并完成綁定
}
2、在consumer服務(wù),編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽fanout.queue1和fanout.queue2
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "com.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {
//...處理結(jié)果
}
@RabbitListener(queues = "com.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {
//...處理結(jié)果
}
}
3、在publisher編寫測(cè)試方法,向交換機(jī)發(fā)送信息
@Test
public void sendFanoutExchange() {
//1、交換機(jī)
String exchangeName = "com.fanout";
//2、消息
String message = "Hello Fanout";
//3、發(fā)送消息
rabbitTemplate.covertAndSend(exchangeName, "", message);
}
第四種:路由信息隊(duì)列
路由模式的流程: 即設(shè)置密鑰的綁定關(guān)系,只有攜帶相應(yīng)的密鑰才能進(jìn)入相應(yīng)的隊(duì)列
- 每一個(gè) Queue 與 Exchange 設(shè)置一個(gè) BindingKey
- 發(fā)布者發(fā)送消息時(shí),需要指定消息的 RoutingKey
- Exchange根據(jù)消息路由到 BindingKey 與 RoutingKey 一致的隊(duì)列
實(shí)現(xiàn)思路:
1、利用 @RabbitListener 聲明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","blue"}))
public void listenRoutingQueue1(String msg) throws InterruptedException {
//...處理結(jié)果
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","green"}))
public void listenRoutingQueue2(String msg) throws InterruptedException {
//...處理結(jié)果
2、發(fā)送消息實(shí)現(xiàn)
//指定隊(duì)列處理
@Test
public void sendRoutingExchange1(){
//交換機(jī),消息
String exchangeName = "com.exchange";
String message = "Hello,RoutingMQ";
//發(fā)送消息
rabbitTemplate.covertAndSend(exchangeName, "blue", message);
}
//多隊(duì)列處理
@Test
public void sendRoutingExchange2(){
//交換機(jī),消息
String exchangeName = "com.exchange";
String message = "Hello,RoutingMQ";
//發(fā)送消息
rabbitTemplate.covertAndSend(exchangeName, "red", message);
}
第五種:主題信息隊(duì)列(通配key)
TopicExchange 與 DirectExchange 的區(qū)別: routingkey必須是多個(gè)單詞的列表,并且以,
分割。并且Queue與Exchange指定的BindingKey時(shí)可使用通配符:
- **#:**代指 0 / n 個(gè)單詞
- *: 代指一個(gè)單詞
實(shí)現(xiàn)思路:
1、通過 @RabbitListener 聲明Exchange、Queue、RoutingKey
@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue1"), key = {"china.#"}))
public void listenTopicQueue1(String msg) {
//處理代碼....
}
@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue2"), key = {"#.news"}))
public void listenTopicQueue2(String msg) {
//處理代碼....
}
2、在publisher服務(wù)中,向交換機(jī)發(fā)送消息
@Test
public void sendTopicMessage(){
//交換機(jī),消息
String exchangeName = "com.exchange";
String message = "Hello,Topic";
rabbitTemplate.convertAndSend(exchangeName,"china.call",message);
}
四、SpringAMQP
(一)概念
- AMQP: Advanced Message Queuing Protocol 傳遞消息隊(duì)列協(xié)議,是用于在應(yīng)用程序或之間傳遞業(yè)務(wù)消息的開放標(biāo)準(zhǔn)。該協(xié)議與語言及平臺(tái)無關(guān),更符合為服務(wù)中獨(dú)立性的要求。
- Spring AMQP: Spring AMQP是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息。其中 spring-amqp是基礎(chǔ)抽象,spring-rabbit是底層的默認(rèn)實(shí)現(xiàn)。
(二)實(shí)現(xiàn)基礎(chǔ)消息隊(duì)列
1、引入spring-amqp依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、publisher服務(wù)中利用RabbitTemplate發(fā)送消息到任務(wù)隊(duì)列
- 配置mq連接信息
spring:
rabbitmq:
host: 192.168.92.131 #IP
port: 5672
virtual-host: /
username: root
password: root
- 編寫發(fā)送方法
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
String queueName = "simple.queue";
String message = "Hello World";
rabbitTemplate.convertAndSend(queueName,message);
}
3、在consumer服務(wù)中編寫消費(fèi)邏輯,綁定simple.queue隊(duì)列
- 配置mq連接信息
spring:
rabbitmq:
host: 192.168.92.131 #IP
port: 5672
virtual-host: /
username: root
password: root
- 編寫發(fā)送方法1
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void getMessage(){
String queueName = "simple.queue";
// receive 表示接收方法,接收到的信息會(huì)封裝到Message,可以看receive的返回值
Message message = rabbitTemplate.receive(queueName);
// Message.getBody 是 byte[]
System.out.println(new String(message.getBody()));
}
-
編寫發(fā)送方法2
- 創(chuàng)建一個(gè)監(jiān)聽類
// 注冊(cè)成 Bean 對(duì)象
@Component
public class SpringRabbitListener {
// 監(jiān)聽器注釋,queues = 訂閱隊(duì)列,并將返回值注入?yún)?shù)列表中
@RabbitListener(queues = "simple.queue")
public void ListenSimpleQueueMessage(String msg){
System.out.println("Spring 消費(fèi)者接收到消息:【" + msg + "】");
}
}
(三)消息轉(zhuǎn)換器
為了讓我們能夠自由識(shí)別consumer發(fā)送的消息,則需要使用的是消息轉(zhuǎn)換器。
消息轉(zhuǎn)換器如何使用?
Spring對(duì)消息對(duì)象的處理是由org.springframework.amqp.support.converter.MessageConverter來處理,默認(rèn)實(shí)現(xiàn)的是SimpleMessageConverter,基于ObjectObjectOutputStream完成序列化。
我們只需要定義一個(gè) MessageConverter 類型的Bean即可,推薦使用JSON序列化
1、publisher引入依賴
<!-- 接收消息需要使用jackson的轉(zhuǎn)換依賴 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
<!-- 發(fā)送消息需要使用jackson的核心依賴 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2、publisher啟動(dòng)類,聲明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
3、consumer啟動(dòng)類,聲明MessageConverter文章來源:http://www.zghlxwxcb.cn/news/detail-607148.html
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
4、監(jiān)聽隊(duì)列消息文章來源地址http://www.zghlxwxcb.cn/news/detail-607148.html
@RabbitListener(queues = "object.queue")
public void listenObjectMessage(Object msg) {
//處理數(shù)據(jù)....
}
到了這里,關(guān)于SpringCloud學(xué)習(xí)路線(9)——服務(wù)異步通訊RabbitMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!