微服務(wù)的遠(yuǎn)程異步調(diào)用
為什么需要異步調(diào)用?
- 故障隔離:支付服務(wù)不負(fù)責(zé)調(diào)用其他三個(gè)服務(wù),只負(fù)責(zé)通知Broker支付成功這個(gè)事件,然后就返回結(jié)果,后面的服務(wù)故障了和前面發(fā)布事件的服務(wù)無關(guān),前面的服務(wù)發(fā)布完事件就結(jié)束了
- 吞吐量提升:Broker將支付成功的事件廣播給訂閱了這個(gè)事件的那些服務(wù),服務(wù)們各自并發(fā)進(jìn)行接下來的工作,吞吐量變高,性能提升
- 耦合度低:有新服務(wù)加入只要讓它訂閱就行,耦合度低
- 流量削峰:broker可以起到緩沖作用,把大量事件存著給后面慢慢處理
異步調(diào)用的缺點(diǎn):
- 依賴于Broker的可靠性、安全性、吞吐能力
- 架構(gòu)復(fù)雜,業(yè)務(wù)流程不清晰,難以追蹤
MQ介紹
MQ(MessageQueue)消息隊(duì)列,就是上文事件驅(qū)動(dòng)架構(gòu)中的Broker
RabbitMQ
RabbitMQ結(jié)構(gòu)
- VirtualHost: 一般屬于不同用戶,互相隔離,是對(duì)exchange、queue等資源的邏輯分組
- channel: 操作MQ的工具
- queue: 緩存消息隊(duì)列
- exchange: 路由消息到隊(duì)列中
RabbitMQ的單機(jī)部署
基于Erlang語言設(shè)計(jì)
RabbitMQ文檔
在Centos7虛擬機(jī)中使用Docker來安裝。
1.下載鏡像
方式一:在線拉取
docker pull rabbitmq:3-management
方式二:從本地加載
將.tar
鏡像包上傳到虛擬機(jī)中后,使用命令加載鏡像即可:
docker load -i mq.tar
2.安裝MQ
執(zhí)行下面的命令來運(yùn)行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \ # 15672是管理平臺(tái)的端口
-p 5672:5672 \ # 5672消息通信的端口
-d \
rabbitmq:3-management
在瀏覽器使用虛擬機(jī)地址:15672
就能看到管理平臺(tái)
ps.一般情況下每個(gè)用戶要獨(dú)享一個(gè)虛擬主機(jī)
管理后臺(tái)介紹
RabbitMQ入門
官方入門示例
常見消息模型
- 基本消息隊(duì)列basic queue
- 工作消息隊(duì)列work queue
- 發(fā)布訂閱:廣播fanout
- 發(fā)布訂閱:路由direct
- 發(fā)布訂閱:主題topic
SpringAMQP
- 監(jiān)聽器容器,異步處理入棧消息
- 發(fā)送和接收消息的RabbitTemplate
- RabbitAdmin用于自動(dòng)聲明隊(duì)列,交換和綁定
exchange、queue這種東西,如果沒有提前創(chuàng)建好,在使用的時(shí)候也會(huì)自動(dòng)創(chuàng)建
SpringAMQP實(shí)現(xiàn)基礎(chǔ)消息隊(duì)列
- 父工程引入spring-amqp依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在publisher服務(wù)中利用RabbitTemplate發(fā)送消息到simple.queue隊(duì)列
在publisher的application.yml中添加mq的連接信息
spring:
rabbitmq:
host: 192.168.36.128 # 主機(jī)名
port: 5672
virtual-host: / # 虛擬主機(jī)
username: itcast
password: 123456
在測試類中編寫一個(gè)測試方法,注入RabbitTemplate對(duì)象
別忘了加注解讓spring boot啟動(dòng),要不然沒東西注入報(bào)空指針
//@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
運(yùn)行測試方法,在管理平臺(tái)中就能看到隊(duì)列中有一個(gè)消息了
- 在consumer服務(wù)中編寫消費(fèi)邏輯,綁定simple.queue隊(duì)列
同樣,添加依賴,然后在配置文件中添加AMQP信息
消費(fèi)者只需要新建一個(gè)類(為了被Springboot找到并內(nèi)部注入需要添加@Component),定義一個(gè)監(jiān)聽方法(用@RabbitListener修飾)即可:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String message){
System.out.println("spring消費(fèi)者接收到消息:"+message);
}
}
啟動(dòng)consumer微服務(wù)以后,它會(huì)自動(dòng)監(jiān)聽simple.listener隊(duì)列中有沒有消息,如果有就直接拿過來
SpringAMQP實(shí)現(xiàn)工作隊(duì)列
綁定兩個(gè)consumer可以提高消息處理的速度,避免消息堆積
假設(shè)publisher共發(fā)送了50條消息,那設(shè)置兩個(gè)consumer的監(jiān)聽者:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenWorkQueueMessage1(String msg) throws InterruptedException {
System.out.println("消費(fèi)者1接收到消息:"+msg);
Thread.sleep(50);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
System.err.println("消費(fèi)者2接收到消息:"+msg);
Thread.sleep(10);
}
}
事實(shí)上,50條消息被平均分配給了兩個(gè)consumer監(jiān)聽器,消費(fèi)者1接收完25條以后還要慢慢等消費(fèi)者2接收完它的25條,并不會(huì)搶消息
這是消息預(yù)取機(jī)制造成的問題,兩個(gè)消費(fèi)者是在消費(fèi)前就把消息分配好了
在配置文件中,可以設(shè)置消息預(yù)取的上限simple.prefetch(默認(rèn)為無限),設(shè)置為1的時(shí)候就是一條一條取,以達(dá)到能者多勞,總體速度變快的效果。
spring:
rabbitmq:
host: 192.168.36.128
port: 5672
virtual-host: /
username: itcast
password: 123456
listener:
simple:
prefetch: 1 # 每次只能獲取一條消息,處理完成才拿下一條
SpringAMQP實(shí)現(xiàn)發(fā)布訂閱
發(fā)布訂閱模式與先前案例的區(qū)別是,允許同一消息被群發(fā)給多個(gè)消費(fèi)者,而不是一個(gè)消費(fèi)者消費(fèi)完就刪除。實(shí)現(xiàn)方法靠exchange(交換機(jī))
常見的場景也是一個(gè)事件的完成會(huì)調(diào)動(dòng)很多后續(xù)的服務(wù)
消息發(fā)布者現(xiàn)在只需要把消息交給交換機(jī),不需要知道給哪些隊(duì)列,交換機(jī)會(huì)幫助轉(zhuǎn)發(fā)
交換機(jī)三種類型:
- Fanout廣播
- Direct路由
- Topic話題
1. Fanout Exchange 廣播模式
- 在consumer中聲明隊(duì)列、交換機(jī),并且把隊(duì)列綁定到交換機(jī)(下面還有使用@RabbitListener注解聲明和綁定的方法)
@Configuration
public class FanoutConfig {
@Bean // 聲明交換機(jī)
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
@Bean // 聲明隊(duì)列
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
@Bean // 綁定
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}//以相同方式聲明第2個(gè)隊(duì)列并綁定
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
這些Bean會(huì)被springboot自動(dòng)裝配,被AMQP使用
2. 在consumer中編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽fanout.queue1和fanout.queue2
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueueMessage1(String msg) throws InterruptedException {
System.out.println("消費(fèi)者1接收到消息:"+msg);
Thread.sleep(50);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueueMessage2(String msg) throws InterruptedException {
System.err.println("消費(fèi)者2接收到消息:"+msg);
Thread.sleep(10);
}
}
- 在publisher中編寫測試方法,向itcast.fanout發(fā)送消息
@Test
public void testSendFanoutExchange(){
// 交換機(jī)名稱
String exchangeName = "itcast.fanout";
// 消息
String msg = "it's a broadcast";
// 發(fā)送
rabbitTemplate.convertAndSend(exchangeName, "", msg);
}
2. Direct Exchange 路由模式
DIrect Exchange會(huì)根據(jù)規(guī)則把消息路由(routes)到指定隊(duì)列
實(shí)現(xiàn)思路如下:
- 利用@RabbitListener注解聲明Exchange、Queue、RoutingKey,之前那種自定義Bean的方式比較繁瑣
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = "direct"),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消費(fèi)者接收到來自direct.queue1的消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = "direct"),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消費(fèi)者接收到來自direct.queue2的消息:"+msg);
}
}
- 發(fā)送消息,攜帶routingKey
@SpringBootTest
public class PublisherTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSendDirectExchange(){
// 交換機(jī)名稱
String exchangeName = "itcast.direct";
// 消息
String msg = "Hello Blue";
// 發(fā)送
rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
}
3. Topic Exchange 話題
Topic中的BindingKey支持通配符(注意是BindingKey):
#: 0或多個(gè)單詞
*: 1個(gè)單詞
實(shí)現(xiàn)思路:
在@RabbitListener中
- 聲明bindingKey的時(shí)候使用通配符
- Exchange的type指定為
"topic"
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = "topic"),
key = {"china.#"}
))
這樣routingKey只要符合bingdingKey的模式,就會(huì)把消息分發(fā)給它
消息轉(zhuǎn)換器
rabbitTemplate.convertAndSend發(fā)送的信息是Object類型的,所以可以傳任意對(duì)象,會(huì)自動(dòng)序列化
默認(rèn)使用的序列化方式是java提供的序列化,類會(huì)被序列化成字節(jié)串,有許多缺點(diǎn)
我們可以采用別的序列化方式,比如JSON序列化方式,把MessageConverter類型的容器中的對(duì)象頂?shù)艟托?mark hidden color="red">文章來源:http://www.zghlxwxcb.cn/news/detail-637069.html
記得發(fā)送端和接收端的對(duì)象類型要一致或者兼容,用的序列化MessageConverter也要一致文章來源地址http://www.zghlxwxcb.cn/news/detail-637069.html
到了這里,關(guān)于Java分布式微服務(wù)4——異步服務(wù)通訊(RabbitMQ)中間件的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!