目錄
一、同步通信 VS 異步通信
二、MQ——消息隊列
RabbitMQ?
RabbitMQ安裝?
RabbitMQ的整體架構
常見消息模型?
?基本消息隊列(BasicQueue)
工作消息隊列(WorkQueue)
?發(fā)布、訂閱(Publish、Subscribe)
?Fanout Exchange
Direct Exchange?
Topic Exchange?
SpringAMQP-消息轉換器?
一、同步通信 VS 異步通信
同步通信:雙方在同一個時鐘信號的控制下,進行數(shù)據(jù)的接收和發(fā)送,來一個時鐘,發(fā)送端發(fā)送,接收端接收,他們彼此之間的工作狀態(tài)是一致的,例如直播、打電話。
優(yōu)點:
- 時效性強,能夠立即得到結果
缺點:
- 耦合性較高:每次加入新的需求,都需要修改原有代碼
- 性能下降:調用者需要等待服務提供者響應,若調用鏈過長則響應時間等于每次調用時間之和
- 資源利用率低:調用鏈中的每個服務在等待響應的過程中,不能釋放請求占用的資源,高并發(fā)的情況下會造成資源的極度浪費
- 級聯(lián)失?。喝绻仗峁┱叱霈F(xiàn)問題,所有的調用方也會跟著出問題
適用場景:業(yè)務要求時效性高
異步通信:異步通信在發(fā)送字符時,所發(fā)送的字符之間的時間間隔可以是任意的。例如微信聊天。
在異步調用過程常見的實現(xiàn)就是事件驅動模式,系統(tǒng)中發(fā)生的事件會觸發(fā)相應的事件處理器或監(jiān)聽器
,從而實現(xiàn)特定的業(yè)務邏輯或功能。例如在如下的支付場景中,當有請求發(fā)送給支付服務時,支付服務就會通知Broker,接著后續(xù)的訂閱事件就會接收到請求,開始同時處理業(yè)務,但是支付服務不用等到后續(xù)訂閱事件完成后再返回,而是將請求通知給Broker之后支付服務就會返回結果。
優(yōu)點:
- 服務解耦
- 性能提升,吞吐量提高
- 服務之間沒有強依賴,不用擔心級聯(lián)失敗問題(故障隔離)
- 流量削峰
缺點:
- 依賴于Broker的可靠性、安全性和吞吐能力
- 結構復雜后,業(yè)務沒有了明顯的流水線,難以追蹤管理
適用場景:對于并發(fā)和吞吐量的要求高,時效性的要求低
二、MQ——消息隊列
MQ(消息隊列):存放消息的隊列,也是事件驅動架構的Broker。
常見的消息隊列實現(xiàn)對比:
RabbitMQ?
RabbitMQ是基于Erlang語言開發(fā)的消息通信中間件,RabbitMQ的性能以及可用性較好,國內應用較為廣泛,所以對RabbitMQ進行重點學習。
RabbitMQ的官網(wǎng)地址:https://www.rabbitmq.com
RabbitMQ安裝?
可以根據(jù)自己的需求在RabbitMQ的官網(wǎng)進行查看:下載和安裝 RabbitMQ — 兔子MQ
RabbitMQ的整體架構
??首先,Publisher會把消息發(fā)送給exchange(交換機),exchange負責路由再把消息投遞到queue(隊列),queue負責暫存消息,Consumer會從隊列中獲取消息并處理消息。
RabbitMQ中的幾個概念:
? channel :操作 MQ 的工具? exchange :路由消息到隊列中? queue :緩存消息? virtual host :虛擬主機,是對 queue 、 exchange 等資源的邏輯分組
常見消息模型?
RabbitMQ的官方文檔中給出了5個MQ的Demo實例,可以分為如下:
- 基本消息隊列(BasicQueue)
- 工作消息隊列(WorkQueue)
- 發(fā)布訂閱(Publish、Subscribe),又根據(jù)交換機類型不同分為三種:
? ? ? ? ? ? ? ??Fanout Exchange:廣播
? ? ? ? ? ? ? ? Direct?Exchange:路由
? ? ? ? ? ? ? ? Topic Exchange:主題
?基本消息隊列(BasicQueue)
官方的HelloWorld是基于最基礎的消息隊列模型來實現(xiàn)的,只包括三個角色:
- publisher:消息發(fā)布者,將消息發(fā)送到隊列queue
- queue:消息隊列,負責接受并緩存消息
- consumer:訂閱隊列,處理隊列中的消息
?
?

在使用端口時,需要在云服務器上開放所用的端口?
基本消息隊列的消息發(fā)送流程:
- 建立Connection
- 創(chuàng)建Channel
- 利用Channel聲明隊列
- 利用Channel向隊列中發(fā)送消息
代碼實現(xiàn):
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立連接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼 factory.setHost("x.x.x.x"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("xx"); factory.setPassword("xx"); // 1.2.建立連接 Connection connection = factory.newConnection(); // 2.創(chuàng)建通道Channel Channel channel = connection.createChannel(); // 3.創(chuàng)建隊列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.發(fā)送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("發(fā)送消息成功:【" + message + "】"); // 5.關閉通道和連接 channel.close(); connection.close(); } }
運行結果:
?
基本消息隊列的消息接收流程:?
- 建立Connection
- 創(chuàng)建Channel
- 利用Channel聲明隊列
- 定義Consumer的消費行為handleDelivery()
- 利用Channel將消費者與隊列進行綁定
代碼實現(xiàn):
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立連接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼 factory.setHost("x.x.x.x"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("xx"); factory.setPassword("xx"); // 1.2.建立連接 Connection connection = factory.newConnection(); // 2.創(chuàng)建通道Channel Channel channel = connection.createChannel(); // 3.創(chuàng)建隊列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.訂閱消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.處理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
運行結果:
上述實現(xiàn)方式相對比較復雜,就引入了SpringAMQP來實現(xiàn)。
AMQP:是用于在應用程序之間傳遞業(yè)務消息的開放標準。該協(xié)議與語言和平臺無關,更符合微服務中獨立性的要求。?
SpringAMQP:SpringAMQP是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息。包含兩部分,其中spring-amqp是基礎抽象,spring-rabbit是底層的默認實現(xiàn)。
SpringAMQP的官方地址
那么利用SpringAMQP來實現(xiàn)基本消息隊列的流程如下:
- 在父工程中引入spring-amqp的依賴
- 在publisher服務中利用RabbitTemplate發(fā)送消息到simple.queue這個隊列
- 在consumer服務中編寫消費邏輯,綁定simple.queue這個隊列
具體實現(xiàn):
1、在父工程中引入spring-amqp的依賴:
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、在publisher中編寫測試方法,向simple.queue發(fā)送消息:
在publisher服務的配置文件中添加mq的連接信息:
spring: rabbitmq: host: # rabbitMQ的ip地址 port: 5672 # 端口 username: # 用戶名 password: # 密碼 virtual-host: # 虛擬主機
在publisher服務中新建一個測試類,編寫測試方法:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
在RabbitMQ中的simple隊列中查詢信息:
3、在consumer服務中編寫消費邏輯,監(jiān)聽simple.queue
在consumer服務的配置文件中添加mq連接信息:
spring: rabbitmq: host: # rabbitMQ的ip地址 port: 5672 # 端口 username: # 用戶名 password: # 密碼 virtual-host: # 虛擬主機
在consumer服務中新建一個類,編寫具體的消費邏輯:
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) throws InterruptedException { System.out.println("消費者接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } }
運行啟動類:
工作消息隊列(WorkQueue)
下面場景中如果queue中有50條請求消息,但是consumer1只能處理40條,剩余的10條就可以由consumer進行處理,所以說工作消息隊列可以提高消息的處理速度,避免隊列消息堆積
模擬Workqueue,實現(xiàn)一個隊列綁定多個消費者,基本實現(xiàn)思路如下:
- 在publisher服務中定義測試方法,每秒產(chǎn)生50條消息,發(fā)送到simple.queue中
- 在consumer服務中定義兩個消息監(jiān)聽者,都監(jiān)聽simple.queue隊列
- 消費者1每秒處理50條消息,消費者2每秒處理10條消息
代碼實現(xiàn):
在publisher服務中定義測試方法,每秒產(chǎn)生50條消息,發(fā)送到simple.queue中
在consumer服務中定義兩個消息監(jiān)聽者,都監(jiān)聽simple.queue隊列,設置消費者1每秒處理50條消息,消費者2每秒處理10條消息public void testSendMessage2WorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, message__"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消費者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消費者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
修改application.yml文件,設置preFetch這個值,可以控制預取消息的上限,確保消費者2取消息時只能取一條,提高效率(“能者多勞”):
spring: rabbitmq: listener: simple: prefetch: 1
運行結果:
?發(fā)布、訂閱(Publish、Subscribe)
發(fā)布訂閱模式與之前案例的區(qū)別就是允許將同一消息發(fā)送給多個消費者。實現(xiàn)方式是加入了exchange(交換機)。
常見exchange類型包括:
- Fanout:廣播
- Direct:路由
- Topic:話題
exchange負責消息路由,而不是存儲,路由失敗則消息丟失?
?Fanout Exchange
?Fanout Exchange會將接收到的消息路由到每一個跟其綁定的queue中,如下:
基本實現(xiàn)思路如下:
- 在consumer中,利用代碼聲明隊列、交換機,將二者進行綁定
- 在consumer中,編寫兩個消費方法,分別監(jiān)聽fanout.queue1和fanout.queue2
- 在publisher中編寫測試方法,向fanout發(fā)送消息
代碼實現(xiàn):
在consumer中,利用代碼聲明隊列、交換機,將二者進行綁定
@Configuration public class FanoutConfig { // itcast.fanout @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } // fanout.queue1 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } // 綁定隊列1到交換機 @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } // fanout.queue2 @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } // 綁定隊列2到交換機 @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
在consumer中,編寫兩個消費方法,分別監(jiān)聽fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消費者接收到fanout.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消費者接收到fanout.queue2的消息:【" + msg + "】"); }
在publisher中編寫測試方法,向fanout發(fā)送消息
@Test public void testSendFanoutExchange() { // 交換機名稱 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, every one!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
運行結果:
Direct Exchange?
Direct Exchange會將接收到的消息根據(jù)規(guī)則路由到指定的Queue,因此被稱為路由模式
- l每一個Queue都與Exchange設置一個BindingKey
- l發(fā)布者發(fā)送消息時,指定消息的RoutingKey
- lExchange將消息路由到BindingKey與消息RoutingKey一致的隊列
基本實現(xiàn)思路如下:
- 利用@RabbitListener聲明Exchange、Queue、RoutingKey
- 在consumer服務中,編寫兩個消費者方法,分別監(jiān)聽direct.queue1和direct.queue2
- 在publisher中編寫測試方法,向itcast. direct發(fā)送消息
代碼實現(xiàn):
在consumer服務中,編寫兩個消費者方法,分別監(jiān)聽direct.queue1和direct.queue2,并利用@RabbitListener聲明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消費者接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消費者接收到direct.queue2的消息:【" + msg + "】"); }
在publisher服務發(fā)送消息到DirectExchange
@Test public void testSendDirectExchange() { // 交換機名稱 String exchangeName = "itcast.direct"; // 消息 String message = "hello, red!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
運行結果:
Topic Exchange?
Topic Exchange與Direct Exchange類似,區(qū)別在于Topic Exchange的routingKey必須是多個單詞的列表,并且以.分割
Queue與Exchange指定BindingKey時可以使用通配符:
#:代指0個或多個單詞
*:代指一個單詞
基本實現(xiàn)思路如下:
- 利用@RabbitListener聲明Exchange、Queue、RoutingKey
- 在consumer服務中,編寫兩個消費者方法,分別監(jiān)聽topic.queue1和topic.queue2
- 在publisher中編寫測試方法,向itcast. topic發(fā)送消息
代碼實現(xiàn):
利用@RabbitListener聲明Exchange、Queue、RoutingKey,在consumer服務中,編寫兩個消費者方法,分別監(jiān)聽topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消費者接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消費者接收到topic.queue2的消息:【" + msg + "】"); }
在publisher中編寫測試方法,向itcast. topic發(fā)送消息
@Test public void testSendTopicExchange() { // 交換機名稱 String exchangeName = "itcast.topic"; // 消息 String message = "今天天氣不錯,我的心情好極了!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(exchangeName, "china.weather", message); }
運行結果:
SpringAMQP-消息轉換器?
在SpringAMQP的發(fā)送方法中,接收消息的類型是Object,也就是說我們可以發(fā)送任意對象類型的消息,SpringAMQP會幫我們序列化為字節(jié)后發(fā)送。
Spring的對消息對象的處理是由org.springframework.amqp.support.converter.MessageConverter來處理的。而默認實現(xiàn)是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定義一個MessageConverter 類型的Bean即可。
推薦用JSON方式序列化,實現(xiàn)步驟如下:
在父工程中引入依賴文章來源:http://www.zghlxwxcb.cn/news/detail-820917.html
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
在publisher和consumer服務中聲明MessageConverter:文章來源地址http://www.zghlxwxcb.cn/news/detail-820917.html
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
到了這里,關于服務器的異步通信——RabbitMQ的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!