一.概念:
- MQ(Message Queue)消息隊(duì)列,是基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)中“先進(jìn)先出”的一種數(shù)據(jù)結(jié)構(gòu)。指把要傳輸?shù)臄?shù)據(jù)(消息)放在隊(duì)列中,用隊(duì)列機(jī)制來實(shí)現(xiàn)消息傳遞——生產(chǎn)者產(chǎn)生消息并把消息放入隊(duì)列,然后由消費(fèi)者去處理。消費(fèi)者可以到指定隊(duì)列拉取消息,或者訂閱相應(yīng)的隊(duì)列,由MQ服務(wù)端給其推送消息
-
MQ的作用
- 消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用解耦,異步消息,流量削鋒等問題,實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。?
- 解耦:一個(gè)業(yè)務(wù)需要多個(gè)模塊共同實(shí)現(xiàn),或者一條消息有多個(gè)系統(tǒng)需要對(duì)應(yīng)處理,只需要主業(yè)務(wù)完成以后,發(fā)送一條MQ,其余模塊消費(fèi)MQ消息,即可實(shí)現(xiàn)業(yè)務(wù),降低模塊之間的耦合。
- 異步:主業(yè)務(wù)執(zhí)行結(jié)束后從屬業(yè)務(wù)通過MQ,異步執(zhí)行,減低業(yè)務(wù)的響應(yīng)時(shí)間,提高用戶體驗(yàn)。
- 削峰:高并發(fā)情況下,業(yè)務(wù)異步處理,提供高峰期業(yè)務(wù)處理能力,避免系統(tǒng)癱瘓
-
MQ的缺點(diǎn)
- 系統(tǒng)可用性降低。依賴服務(wù)越多,服務(wù)越容易掛掉。需要考慮MQ癱瘓的情況
- 系統(tǒng)復(fù)雜性提高。需要考慮消息丟失、消息重復(fù)消費(fèi)、消息傳遞的順序性
- 業(yè)務(wù)一致性。主業(yè)務(wù)和從屬業(yè)務(wù)一致性的處理
-
主要的MQ產(chǎn)品
- 主要的MQ產(chǎn)品包括:
- ??????RabbitMQ、? ?erlang的? ? ? ? ? 可用性很高? ?? 單擊吞吐量一般? ?微秒級(jí)? 可靠性高? ??
- ActiveMQ、? ?apache?,java? ? ?可用性一般? ? ??單擊吞吐量差? ? ? ? ?毫秒級(jí)? ? ? ?一般
- RocketMQ、??阿里的,Java? ? ? ?高? ? ? ? ? ? ? ? ? ?單擊吞吐量高? ? ? ? ? 毫秒級(jí)? ? ? ?高
- ZeroMQ、
- Kafka、? ? ? Scala? ,Java? ? ? ? 高? ? ? ? ? ? ?? ?單擊吞吐量非常高? ? ? ?毫秒級(jí)? ? ?一般
- IBM?WebSphere?等
- 主要的MQ產(chǎn)品包括:
-
RabbitMQ:
- 概念:RabbitMQ是一套開源(MPL)的消息隊(duì)列服務(wù)軟件,是由 LShift 提供的一個(gè) Advanced Message Queuing Protocol (AMQP) 的開源實(shí)現(xiàn),由以高性能、健壯以及可伸縮性出名的 Erlang 寫成
-
docker下載和使用
-
拉取
-
?docker pull rabbitmq:3.8-management
-
-
查看鏡像:
-
docker images
-
-
給rabbitmq設(shè)置容器
- ?
docker run \ -e RABBITMQ_DEFAULT_USER=rabbit \ -e RABBITMQ_DEFAULT_PASS=rabbit \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8-management
- ?
- 然后直接在地址欄訪問rabbitmq的后臺(tái)管理界面
- 192.168.8.171:15672
- 登錄密碼是剛剛設(shè)置的rabbit,rabbit
- 用idea使用MQ
- 新建maven項(xiàng)目,改成2.3.9
- 導(dǎo)入需要用到的依賴
-
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
然后我們?cè)谶@個(gè)mq1父項(xiàng)目當(dāng)中建兩個(gè)子模塊,一個(gè)充當(dāng)消息的發(fā)送者,另一個(gè)充當(dāng)接收者(消費(fèi)者):pub,consumer
-
先建發(fā)送者,我們選擇的是手動(dòng)配置模塊,建maven模塊
-
-
在模塊當(dāng)中,因?yàn)槲覀?strong>未選擇自動(dòng)配置springboot項(xiàng)目,我們需要將父項(xiàng)目的主配置文件放入pub這個(gè)項(xiàng)目的resource目錄下,再建一個(gè)啟動(dòng)類,和一個(gè)測(cè)試類
?package com.pro; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class PubApp { public static void main(String[] args) { SpringApplication.run(PubApp.class,args); } }
?發(fā)送者測(cè)試類:
-
建立連接
-
建立通道
-
創(chuàng)建隊(duì)列
-
發(fā)送消息
-
關(guān)閉資源
-
package com.pro.test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class PubTest { @Test public void testSendMeg() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.8.171"); factory.setPort(5672); factory.setUsername("rabbit"); factory.setPassword("rabbit"); //建立連接 Connection connection = factory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //創(chuàng)建隊(duì)列 String queueName="xjj"; channel.queueDeclare(queueName,false,false,false,null); for (int i = 0; i < 5; i++) { //發(fā)送消息 String message = "楊星大帥哥"; channel.basicPublish("",queueName,null,message.getBytes()); System.out.println("消息發(fā)送成功"+message); } //關(guān)閉資源 channel.close(); connection.close(); //通道的聲明 } }
測(cè)試發(fā)送消息之后,我們就可以在管理平臺(tái)上很清晰的看到有待消費(fèi)的消息
- 在第一次測(cè)試的時(shí)候,報(bào)了這樣的錯(cuò)
- 但是正常是不會(huì)出現(xiàn)這種情況的,發(fā)現(xiàn)配置文件都不正常,所以就重新建立一個(gè)項(xiàng)目mq2
- 步驟就是上面一樣的
-
-
拉取
- 現(xiàn)在,我們來建另一個(gè)消費(fèi)者模塊consumer,需要將父項(xiàng)目的主配置文件放入pub這個(gè)項(xiàng)目的resource目錄下,再建一個(gè)啟動(dòng)類,和一個(gè)測(cè)試類,
-
package com.pro.test; import com.rabbitmq.client.*; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.io.IOException; import java.util.concurrent.TimeoutException; @RunWith(SpringRunner.class) @SpringBootTest public class ConsumerTest { @Test public void testConsumerMsg() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.8.171"); factory.setPort(5672); factory.setUsername("rabbit"); factory.setPassword("rabbit"); //建立連接 Connection connection = factory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //創(chuàng)建隊(duì)列 String queueName="xjj"; channel.queueDeclare(queueName,false,false,false,null); //訂閱消息:從隊(duì)列當(dāng)中取數(shù)據(jù) channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body :就是發(fā)過來的消息 //處理消息 String msg = new String(body); System.out.println("收到"+queueName+"的消息:"+msg); } }); System.out.println("消費(fèi)者,一直在這里等消息"); } }
- 如果想實(shí)現(xiàn)發(fā)送者已發(fā)送消息,消費(fèi)者立馬可以看到的效果,就需要寫一個(gè)測(cè)試類ConnTest,來運(yùn)行剛剛測(cè)試類當(dāng)中的代碼,但是需要用到main方法,作為程序運(yùn)行的主入口,讓獲取消息隊(duì)列的消息這個(gè)方法一直在后臺(tái)運(yùn)行,然后,我們?cè)偃グl(fā)送者模塊,發(fā)送消息,立馬就能獲取并打印出來
-
package com.pro.com.pro.test; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnTest { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.8.171"); factory.setPort(5672); factory.setUsername("rabbit"); factory.setPassword("rabbit"); //建立連接 Connection connection = factory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //創(chuàng)建隊(duì)列 String queueName="xjj"; channel.queueDeclare(queueName,false,false,false,null); //訂閱消息:從隊(duì)列當(dāng)中取數(shù)據(jù) channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body :就是發(fā)過來的消息 //處理消息 String msg = new String(body); System.out.println("收到"+queueName+"的消息:"+msg); } }); System.out.println("消費(fèi)者,一直在這里等消息"); } }
-
通過這個(gè)案例的練習(xí),我們可以知道,消息中間件可以實(shí)現(xiàn)
- 發(fā)送者發(fā)送完消息,消費(fèi)者不去取,中間件會(huì)一直保存(快遞驛站)
- 取完之后,驛站里面的快遞(消息)就不存在消息隊(duì)列當(dāng)中了
- 可以應(yīng)用到發(fā)短信上面
-
10月17日使用AMQP簡(jiǎn)化冗長(zhǎng)的代碼
- amqp:
- 首先在配置文件當(dāng)中配置好連接消息隊(duì)列
-
- 然后先來簡(jiǎn)化發(fā)送者的代碼,先寫一個(gè)配置類來配置隊(duì)列queue(也可以在pubApp啟動(dòng)類當(dāng)中來配置),不然后面你發(fā)送的時(shí)候,還沒有創(chuàng)建隊(duì)列,就無法發(fā)送到隊(duì)列當(dāng)中
- 然后我們?cè)賵?zhí)行這個(gè)測(cè)試類來發(fā)消息(很清晰的發(fā)現(xiàn)代碼量相比昨天少了很多)
-
有了發(fā)送者,現(xiàn)在我們需要修改接收者
- 修改這也是一樣需要在配置文件當(dāng)中配置連接地址
- 然后寫一個(gè)監(jiān)聽類來監(jiān)聽隊(duì)列名為“Java”的隊(duì)列,然后讀取里面的消息
- 然后啟動(dòng)啟動(dòng)類就可以發(fā)現(xiàn)效果了
??文章來源:http://www.zghlxwxcb.cn/news/detail-494138.html
-
work工作隊(duì)列
- 可以共享隊(duì)列,實(shí)現(xiàn)消息分?jǐn)?/li>
- 假設(shè)發(fā)送者發(fā)了50條消息
- 現(xiàn)在寫兩個(gè)消費(fèi)者來消費(fèi),監(jiān)聽器里面配置兩個(gè)消費(fèi)者,通過啟動(dòng)類啟動(dòng)后觀察消費(fèi)情況
- 下圖中搞錯(cuò)了,是兩個(gè)消費(fèi)者平攤了50條數(shù)據(jù),可以看到按單雙消費(fèi)
- 但是我們現(xiàn)在想要實(shí)現(xiàn)按需分配,能者多勞,速度快的做的多
- 那么我們需要在消費(fèi)者的配置文件當(dāng)中加上一個(gè)配置prefetch
-
#表示處理完一個(gè)才再分配一個(gè) spring.rabbitmq.listener.simple.prefetch=1
-
廣播
- 我們先把交換機(jī)綁定兩個(gè)隊(duì)列
-
然后我們要在監(jiān)聽器里面監(jiān)聽這兩個(gè)隊(duì)列,來獲取消息
-
@RabbitListener(queues = "fanout.queue1") public void listenerFanoutQueue1(String msg)throws Exception{ System.out.println("消費(fèi)者接受到了fanout.queue1的消息:"+msg); } @RabbitListener(queues = "fanout.queue2") public void listenerFanoutQueue2(String msg)throws Exception{ System.out.println("消費(fèi)者接受到了fanout.queue2的消息:"+msg); }
-
然后,我們現(xiàn)在要到生產(chǎn)者這邊生產(chǎn)并發(fā)送消息:指定交換機(jī)和消息
-
@Test //發(fā)送50條消息到Java隊(duì)列中 public void testSendFanoutExchange() throws InterruptedException { String exchange = "ycznz.fanout"; String msg = "hello,fanout 測(cè)試交換機(jī)發(fā)到兩個(gè)隊(duì)列中"; //指定交換機(jī),和指定消息 rabbitTemplate.convertAndSend(exchange, "", msg); }
- 可以發(fā)現(xiàn)兩個(gè)消費(fèi)者都收到了這條消息,這樣測(cè)試的目的在于知道,通過交換機(jī)指定隊(duì)列,消費(fèi)者再?gòu)年?duì)列當(dāng)中得到消息,兩個(gè)消費(fèi)者都是均衡的得到發(fā)送者發(fā)送的消息
- 好
?文章來源地址http://www.zghlxwxcb.cn/news/detail-494138.html
到了這里,關(guān)于MQ-消息隊(duì)列-RabbitMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!