MQ
同步調(diào)用的問題
微服務(wù)間基于Feign的調(diào)用就屬于同步方式,存在一些問題。
- 耦合度高:每次加入新的需求,都要修改原來的代碼
- 性能下降:調(diào)用者需要等待服務(wù)提供者響應(yīng),如果調(diào)用鏈過長則響應(yīng)時(shí)間等于每次調(diào)用的時(shí)間之和。
- 資源浪費(fèi):調(diào)用鏈中的每個(gè)服務(wù)在等待響應(yīng)過程中,不能釋放請求占用的資源,高并發(fā)場景下會極度浪費(fèi)系統(tǒng)資源
- 級聯(lián)失敗:如果服務(wù)提供者出現(xiàn)問題,所有調(diào)用方都會跟著出問題,如同多米諾骨牌一樣,迅速導(dǎo)致整個(gè)微服務(wù)群故障
異步調(diào)用方案
異步調(diào)用常見實(shí)現(xiàn)就是事件驅(qū)動(dòng)模式
- 耦合度低
- 吞吐量提升
- 故障隔離
- 流量削峰
異步通信的缺點(diǎn):
- 依賴于Broker的可靠性、安全性、吞吐能力
- 架構(gòu)復(fù)雜了,業(yè)務(wù)沒有明顯的流程線,不好追蹤管理
什么是MQ
MQ (MessageQueue),中文是消息隊(duì)列,字面來看就是存放消息的隊(duì)列。也就是事件驅(qū)動(dòng)架構(gòu)中的Broker。
a.安裝RabbitMQ
1.在線拉取
docker pull rabbitmq:3-management
2.執(zhí)行下面的命令來運(yùn)行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
15672 控制臺端口號
5672 消息通信端口
RabbitMQ中的幾個(gè)概念:
- channel:操作MQ的工具
- exchange:路由消息到隊(duì)列中
- queue:緩存消息
- virtual host:虛擬主機(jī),是對queue、exchange等資源的邏輯分組
b.消息模型
MQ的官方文檔中給出了5個(gè)MQ的Demo示例,對應(yīng)了幾種不同的用法:
- 基本消息隊(duì)列(BasicQueue)
- 工作消息隊(duì)列(WorkQueue)
- 發(fā)布訂閱(Publish、Subscribe),又根據(jù)交換機(jī)類型不同分為三種:
- Fanout Exchange:廣播
- Direct Exchange:路由
- Topic Exchange:主題
官方的HelloWorld是基于最基礎(chǔ)的消息隊(duì)列模型來實(shí)現(xiàn)的,只包括三個(gè)角色:
- publisher:消息發(fā)布者,將消息發(fā)送到隊(duì)列
- queuequeue:消息隊(duì)列,負(fù)責(zé)接受并緩存消息
- consumer:訂閱隊(duì)列,處理隊(duì)列中的消息
基本消息隊(duì)列的消息發(fā)送流程:
1.建立connection
2.創(chuàng)建channel
3.利用channel聲明隊(duì)列
4.利用channel向隊(duì)列發(fā)送消息
基本消息隊(duì)列的消息接收流程:
1.建立connection
2.創(chuàng)建channel
3.利用channel聲明隊(duì)列 (避免隊(duì)列不存在,所以雙方都聲明)
4.定義consumer的消費(fèi)行為handleDelivery()
5.利用channel將消費(fèi)者與隊(duì)列綁定
c.SpringAMQP發(fā)送和接收
案例:利用SpringAMQP實(shí)現(xiàn)HelloWorld中的基礎(chǔ)消息隊(duì)列功能
流程如下:
1.在父工程中引入spring-amqp的依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在publisher中編寫測試方法,向simple.queue發(fā)送消息
- 在publisher服務(wù)中編寫application.yml,添加mq連接信息:
spring:
rabbitmq:
host: 192.168.58.128 # rabbitMq的ip地址
port: 5672 # 端口
username: root
password: root
virtual-host: /
- 在publisher服務(wù)中新建一個(gè)測試類,編寫測試方法:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String queueName = "simple.queue";
String message = "Hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
3.在consumer中編寫消費(fèi)邏輯,監(jiān)聽simple.queue
- 在consumer服務(wù)中編寫application.yml,添加mq連接信息:
spring:
rabbitmq:
host: 192.168.58.128
port: 5672
username: root
password: root
virtual-host: /
- 在consumer服務(wù)中新建一個(gè)類,編寫消費(fèi)邏輯 (添加監(jiān)聽注解**@RabbitListener**):
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {"simple.queue"})
public void listenSimpleQueue(String msg){
System.out.println("消費(fèi)者接收到simple.queue的消息:【" + msg + "】");
}
}
d.WorkQueue模型
Work queue,工作隊(duì)列,可以提高消息處理速度,避免隊(duì)列消息堆積
案例:模擬WorkQueue,實(shí)現(xiàn)一個(gè)隊(duì)列綁定多個(gè)消費(fèi)者
1.在publisher服務(wù)中定義測試方法,每秒產(chǎn)生50條消息,發(fā)送到simple.queue
@Test
public void testSendMessage2WorkQueue() throws Exception {
String queueName = "simple.queue";
String message = "Hello, message__";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
2.在consumer服務(wù)中定義兩個(gè)消息監(jiān)聽者,都監(jiān)聽simple.queue隊(duì)列
3.消費(fèi)者1每秒處理50條消息,消費(fèi)者2每秒處理10條消息
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {"simple.queue"})
public void listenWorkQueue1(String msg) throws Exception {
System.out.println("消費(fèi)者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = {"simple.queue"})
public void listenWorkQueue2(String msg) throws Exception {
System.out.println("消費(fèi)者2...接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
}
消費(fèi)預(yù)取限制
修改application.yml文件,設(shè)置preFetch這個(gè)值,可以控制預(yù)取消息的上限:
spring:
rabbitmq:
host: 192.168.58.128
port: 5672
username: root
password: root
virtual-host: /
listener:
simple:
prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息
可以提高整個(gè)消息隊(duì)列的速度,避免消息堆積
e.發(fā)布訂閱模型
發(fā)布( Publish )、訂閱( Subscribe )
發(fā)布訂閱模式與之前案例的區(qū)別就是允許將同一消息發(fā)送給多個(gè)消費(fèi)者。實(shí)現(xiàn)方式是加入了exchange(交換機(jī))。
常見exchange類型包括:
- Fanout:廣播
- Direct:路由
- Topic:話題
注意:exchange負(fù)責(zé)消息路由,而不是存儲,路由失敗則消息丟失
1) FanoutExchange
Fanout Exchange 會將接收到的消息路由給每一個(gè)跟其綁定的queue
案例:利用SpringAMQP演示FanoutExchange的使用
1.在consumer服務(wù)中,利用代碼聲明隊(duì)列、交換機(jī),并將兩者綁定
在consumer服務(wù)常見一個(gè)類,添加@Configuration注解,并聲明FanoutExchange、Queue和綁定關(guān)系對象Binding
@Configuration
public class FanoutConfig {
// abcd.fanout
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("abcd.fanout");
}
// fanout.queue1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
// 綁定隊(duì)列1到交換機(jī)
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
// fanout.queue2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
// 綁定隊(duì)列2到交換機(jī)
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
2.在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽fanout.queue1和fanout.queue2
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {"fanout.queue1"})
public void listenFanoutQueue1(String msg){
System.out.println("消費(fèi)者接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = {"fanout.queue2"})
public void listenFanoutQueue2(String msg){
System.out.println("消費(fèi)者接收到fanout.queue2的消息:【" + msg + "】");
}
}
3.在publisher中編寫測試方法,向abcd.fanout發(fā)送消息
@Test
public void testSendFanoutExchange() {
// 交換機(jī)名稱
String exchangeName = "abcd.fanout";
//消息
String message = "Hello, every one!";
// 發(fā)送消息: 參數(shù)依次為:交換機(jī)名稱,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
2) DirectExchange
Direct Exchange 會將接收到的消息根據(jù)規(guī)則路由到指定的Queue,因此稱為路由模式(routes)。
- 每一個(gè)Queue都與Exchange設(shè)置一個(gè)BindingKey
- 發(fā)布者發(fā)送消息時(shí),指定消息的RoutingKey
- Exchange將消息路由到BindingKey與消息RoutingKey一致的隊(duì)列
案例:利用SpringAMQP演示DirectExchange的使用
1.利用@RabbitListener聲明Exchange、Queue、RoutingKey
2.在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽direct.queue1和direct.queue2
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "abcd.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消費(fèi)者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "abcd.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消費(fèi)者接收到direct.queue2的消息:【" + msg + "】");
}
}
3.在publisher中編寫測試方法,向abcd. direct發(fā)送消息
@Test
public void testSendDirectExchange(){
// 交換機(jī)名稱
String exchangeName = "abcd.direct";
// 消息
String message = "Hello, blue!";
// 發(fā)送消息,參數(shù)依次為:交換機(jī)名稱,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
3) TopicExchange
TopicExchange與DirectExchange類似,區(qū)別在于routingKey必須是多個(gè)單詞的列表,并且以. 分割。Queue與Exchange指定BindingKey時(shí)可以使用通配符:
- #:代指0個(gè)或多個(gè)單詞
- *:代指一個(gè)單詞
案例:利用SpringAMQP演示TopicExchange的使用
1.利用@RabbitListener聲明Exchange、Queue、RoutingKey
2.在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽topic.queue1和topic.queue2
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "abcd.topic", type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicQueue1(String msg){
System.out.println("消費(fèi)者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "abcd.topic", type = ExchangeTypes.TOPIC),
key = {"*.news"}
))
public void listenTopicQueue2(String msg){
System.out.println("消費(fèi)者接收到topic.queue2的消息:【" + msg + "】");
}
}
3.在publisher中編寫測試方法,向abcd. topic發(fā)送消息
@Test
public void testSendTopicExchange(){
// 交換機(jī)名稱
String exchangeName = "abcd.topic";
// 消息
String message = "Hello, new!";
// 發(fā)送消息,參數(shù)依次為:交換機(jī)名稱,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName, "hello.news", message);
}
f.消息轉(zhuǎn)換器
在SpringAMQP的發(fā)送方法中,接收消息的類型是Object,也就是說我們可以發(fā)送任意對象類型的消息,SpringAMQP會幫我們序列化為字節(jié)后發(fā)送。
案列:測試發(fā)送Object類型消息
1.在consumer中利用@Bean聲明一個(gè)隊(duì)列:
@Configuration
public class FanoutConfig {
@Bean
public Queue objectQueue() {
return new Queue("object.queue");
}
}
2.在publisher中發(fā)送消息以測試:
@Test
public void testSendObjectQueue(){
Map<String, Object> msg = new HashMap<>();
msg.put("name", "張三");
msg.put("age", 21);
rabbitTemplate.convertAndSend("object.queue", msg);
}
消息轉(zhuǎn)換器
Spring的對消息對象的處理是由org.springframework.amqp.support.converter.MessageConverter來處理的。而默認(rèn)實(shí)現(xiàn)是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。如果要修改只需要定義一個(gè)MessageConverter 類型的Bean即可。推薦用JSON方式序列化
1.在publisher服務(wù)引入依賴
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2.在publisher服務(wù)聲明MessageConverter:
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
3.在consumer服務(wù)引入Jackson依賴:
- 與步驟1一樣
4.在consumer服務(wù)定義MessageConverter:文章來源:http://www.zghlxwxcb.cn/news/detail-672603.html
- 與步驟2一樣
5.定義一個(gè)消費(fèi)者,監(jiān)聽object.queue隊(duì)列并消費(fèi)消息:文章來源地址http://www.zghlxwxcb.cn/news/detail-672603.html
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> msg){
System.out.println("接收到object.queue的消息:" + msg);
}
到了這里,關(guān)于微服務(wù)中間件--MQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!