目錄
前言
RabbitMQ
什么是RabbitMQ
RabbitMQ特點(diǎn)
安裝啟動
RabbitMQ和Kafka的消息收發(fā)區(qū)別
RabbitMQ使用案例
添加依賴
添加配置
創(chuàng)建RabbitMQ配置類
RabbitMQ消息的發(fā)送
RabbitMQ消息的接收
測試
結(jié)語
前言
前一篇,我們學(xué)習(xí)了Kafka的基本使用,這一篇,我們來學(xué)習(xí)RabbitMQ。他們作為消息隊(duì)列本身都具有很強(qiáng)大的功能,博主寫完后的感受是:這也只能算是初體驗(yàn)了。畢竟師父領(lǐng)進(jìn)門,修行靠個人,但是博主怎么會做這種師傅呢?一定要手把手教會各位童鞋,暫時不去深入的講解了,這一塊內(nèi)容,等博主研究準(zhǔn)備好之后,會在后續(xù)的系列中對消息隊(duì)列來一個進(jìn)階的教程,大家先入個門,慢慢學(xué)習(xí),微服務(wù)的東西很多,先學(xué)會才是正道。
RabbitMQ
RabbitMQ多用于Java,和Kafka多用于大數(shù)據(jù)不同,RabbitMQ更偏向于功能,性能一般,所以在Java開發(fā)中,比較受歡迎。那么下面,我們就來了解一下RabbitMQ及其特點(diǎn)。
什么是RabbitMQ
RabbitMQ是使用Erlang語言開發(fā)的開源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議來實(shí)現(xiàn)。AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對點(diǎn)和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。所以在Java業(yè)務(wù)中使用很多,目前我所知道的很多都是用的此消息隊(duì)列,只不過大家評價都是:功能強(qiáng),性能一般。與其相反的,就是Kafka了:性能強(qiáng),功能弱。
這是因?yàn)樗麄冊O(shè)計(jì)的目的不一樣,RabbitMQ 在有大量消息堆積的情況下性能會下降,其優(yōu)勢體現(xiàn)在功能上。而Kafka一開始是用來處理海量日志的,所以體現(xiàn)出來就是性能強(qiáng),功能弱。
所謂,尺有所短,寸有所長,正是這個道理。全都兼顧的幾乎是不存在的。
RabbitMQ特點(diǎn)
本想去百科里復(fù)制出來,哈哈,結(jié)果百科寫的太簡單了:
- 可伸縮性:集群服務(wù)
- 消息持久化:從內(nèi)存持久化消息到硬盤,再從硬盤加載到內(nèi)存
真是想偷懶都沒辦法。
RabbitMQ擁有數(shù)以萬計(jì)的用戶,是最受歡迎的開源消息代理之一。從T-Mobile到Runtastic,RabbitMQ在全球范圍內(nèi)用于小型初創(chuàng)企業(yè)和大型企業(yè)。
RabbitMQ體量輕,易于在本地和云端部署。它支持多種消息協(xié)議。RabbitMQ可以部署在分布式和聯(lián)合配置中,以滿足大規(guī)模、高可用性的要求。
RabbitMQ可在許多操作系統(tǒng)和云環(huán)境中運(yùn)行,并為大多數(shù)流行的語言提供大量的開發(fā)工具。
官網(wǎng)地址:RabbitMQ Tutorials — RabbitMQ
RabbitMQ的可靠性:體現(xiàn)在持久化、傳輸確認(rèn)和發(fā)布確認(rèn)上。
Flexible Routing:靈活路由,消息進(jìn)入隊(duì)列之前,通過RabbitMQ內(nèi)置的Exchange來路由消息?,針對一些復(fù)雜路由,還能將多個Exchange綁定在一起,通過插件實(shí)現(xiàn)自己的Exchange。
集群:這個是很多微服務(wù)軟件都有的功能,比如Redis,ES等,通過此功能,可以實(shí)現(xiàn)高可用性的能力,在一個節(jié)點(diǎn)發(fā)生故障時,其他節(jié)點(diǎn)能快速使用。
多協(xié)議:RabbitMQ支持多種協(xié)議,具體看這里:Which protocols does RabbitMQ support? — RabbitMQ
多語言客戶端:RabbitMQ 幾乎支持所有常用語言,具體可查看這里:Clients Libraries and Developer Tools — RabbitMQ?
管理監(jiān)控:這個我們在下面安裝的時候會提到此后臺頁面,還會登錄上去給大家看,類似于nacos的管理頁面,通過此界面,可以管理和監(jiān)控RabbitMQ。
插件:RabbitMQ 提供了許多插件,可多方面擴(kuò)展,也可以自定義插件,詳情可看這里:Plugins — RabbitMQ
以上,可通過訪問此鏈接查看詳細(xì)內(nèi)容:Messaging that just works — RabbitMQ?
安裝啟動
這也是本篇內(nèi)容最痛苦的一個地方,因?yàn)镽abbitMQ是Erlang語言開發(fā)的,所以要先安裝Erlang語言的運(yùn)行環(huán)境,為了不去分別講解Mac和Windows的安裝方式,博主準(zhǔn)備偷個懶,讓大家也偷個懶。
首先,看到這里,足以說明是至少是一名初級Java工程師,你至少開發(fā)過一個項(xiàng)目,那么你一定知道Docker,所以,下面,讓我們愉快的使用Docker來安裝RabbitMQ吧。
在Docker中搜索RabbitMQ,下載rabbitmq:management:
?你下載上面的那個也是可以的,都沒關(guān)系。
接著打開終端,在里面直接輸入,記住,是直接輸入,不用進(jìn)任何目錄:
docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management
?
這個命令輸入完成,你的?rabbitmq容器就已經(jīng)跑起來了:
此時,為了測試rabbitmq是否啟動成功,我們需要在瀏覽器輸入:http://localhost:15672
打開后是一個登錄頁面:
因?yàn)闆]有設(shè)置登錄的賬戶名和密碼,所以我們采用默認(rèn)的賬戶密碼guest來登錄,輸入賬戶密碼都為guest即可登錄:
?
如果你想添加用戶密碼,那就來這里:
?
如果你想修改當(dāng)前用戶密碼,你可點(diǎn)擊此處:
?
有個Update this user,輸入新密碼即可:?
?
?
如果你不想使用當(dāng)前用戶名,那就新增一個用戶,再刪除原來的用戶。到這里,安裝啟動這一環(huán)節(jié)就結(jié)束了,你說,博主是不是偷了個大懶呢??
RabbitMQ和Kafka的消息收發(fā)區(qū)別
Kafka使用話題名稱來收發(fā)信息,同一個話題,一個發(fā),可多個收,使用起來結(jié)構(gòu)簡單易懂。
RabbitMQ則要略復(fù)雜些,它是通過交換機(jī)和路由key來指定要發(fā)送的消息隊(duì)列,生產(chǎn)者發(fā)送消息時指定交換機(jī)和路由key,這樣一個確定的消息隊(duì)列就產(chǎn)生了,而消費(fèi)者只需要指定這個產(chǎn)生的消息隊(duì)列就可以接收到消息。
所以在編寫代碼時,RabbitMQ會比Kafka多一個配置類,確切的說,是每個業(yè)務(wù)都要有一個配置類,這個配置類坐的就是指定交換機(jī)和路由key,再由路由key指定隊(duì)列的工作。
RabbitMQ使用案例
此處,我們依然采用前面的微服務(wù)項(xiàng)目作為基礎(chǔ),在stock模塊來完成RabbitMQ的簡單使用。
添加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
添加配置
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /
用戶名和密碼要用你上面管理頁面添加的?有效用戶名密碼,不添加,默認(rèn)guest。
創(chuàng)建RabbitMQ配置類
上面提到過,交換機(jī),路由key,隊(duì)列,這三者的指定需要在配置類中完成,具體要怎么做,我們來看看,stock模塊之前寫過quartz,我們就把config類建在quartz包下:
package com.codingfire.cloud.stock.quartz;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
// SpringBoot整合RabbitMQ之后
// 這些配置信息要保存在Spring容器中,所以這些配置也要交給SpringBoot管理
@Configuration
public class RabbitMQConfig {
// 聲明需要使用的交換機(jī)\路由Key\隊(duì)列的名稱
public static final String STOCK_EX="stock_ex";
public static final String STOCK_ROUT="stock_rout";
public static final String STOCK_QUEUE="stock_queue";
// 聲明交換機(jī),需要幾個聲明幾個,這里就一個
// 方法中實(shí)例化交換機(jī)對象,確定名稱,保存到Spring容器
@Bean
public DirectExchange stockDirectExchange(){
return new DirectExchange(STOCK_EX);
}
// 聲明隊(duì)列,需要幾個聲明幾個,這里就一個
// 方法中實(shí)例化隊(duì)列對象,確定名稱,保存到Spring容器
@Bean
public Queue stockQueue(){
return new Queue(STOCK_QUEUE);
}
// 聲明路由Key(交換機(jī)和隊(duì)列的關(guān)系),需要幾個聲明幾個,這里就一個
// 方法中實(shí)例化路由Key對象,確定名稱,保存到Spring容器
@Bean
public Binding stockBinding(){
return BindingBuilder.bind(stockQueue()).to(stockDirectExchange())
.with(STOCK_ROUT);
}
}
都是比較固定的模版,直接用就行。
RabbitMQ消息的發(fā)送
發(fā)送消息,我們可以在QuartzJob類中進(jìn)行:
package com.codingfire.cloud.stock.quartz;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.LocalDateTime;
public class QuartzJob implements Job {
// RabbitTemplate就是amqp框架提供的發(fā)送消息的對象
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//輸出當(dāng)前時間
System.out.println("--------------"+ LocalDateTime.now() +"---------------");
// 先簡單的發(fā)送一串文字
rabbitTemplate.convertAndSend(RabbitMQConfig.STOCK_EX,
RabbitMQConfig.STOCK_ROUT,"黃河之水天上來,奔流到海不復(fù)還。");
}
}
我們在講解Quartz時只用這個類每隔10s打印出當(dāng)前時間,現(xiàn)在我們在打印時間的同時,再用RabbitMQ發(fā)送一串文字,rabbitTemplate和使用redi很像,不同的是,這里是通過template指定發(fā)送的交換機(jī)和路由。?
RabbitMQ消息的接收
我們在quartz包下再建一個用于接收消息的類:
package com.codingfire.cloud.stock.quartz;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// 這個對象也是需要交由Spring容器管理的,才能實(shí)現(xiàn)監(jiān)聽Spring容器中保存的隊(duì)列的效果
@Component
// 和Kafka不同的是Kafka在一個方法上聲明監(jiān)聽器
// 而RabbitMQ是在類上聲明,監(jiān)聽具體的隊(duì)列名稱
@RabbitListener(queues = {RabbitMQConfig.STOCK_QUEUE})
public class RabbitMQConsumer {
// 監(jiān)聽了類,但是運(yùn)行代碼的一定是個方法
// 框架要求這個類中只允許一個方法包含下面這個注解
// 表示這個方法是處理消息的方法
// 方法的參數(shù)就是消息的值
@RabbitHandler
public void process(String str){
System.out.println("接收到的消息為:"+str);
}
}
里面的方法名可以是自定義的,只要類上指定隊(duì)列名稱就可以接收,類中注釋認(rèn)真看,要記住。?
測試
做完這些,下面就是啟動并測試的環(huán)節(jié),由于此微服務(wù)模塊配置了nacos和seata,所以這兩個服務(wù)是要起的,接著就是RabbitMQ,上面已經(jīng)教大家啟動過了,保證是運(yùn)行狀態(tài):然后運(yùn)行stock的啟動文件,在控制臺查看消息發(fā)送接收的情況,由于博主還依賴了數(shù)據(jù)庫,此處還需要啟動mysql服務(wù),大家根據(jù)自身情況啟動服務(wù),然后再次運(yùn)行:
?
可以看到我們的消息收發(fā)是正常的,測試成功,同時能看到我們做Quartz時,減少庫存的操作也在運(yùn)行。這就是?RabbitMQ的基本使用。
在RabbitMQ后臺,也能看到我們注冊的交換機(jī)和消息隊(duì)列等信息:
感興趣的可以去看看。?
結(jié)語
又到了說再見的時候,RabbitMQ和Kafka一樣博大精深,這里只是基本的使用,你只要知道,他們是用來收發(fā)消息的,且不需要關(guān)心什么時候完成,你可以認(rèn)為他們是在系統(tǒng)不忙的時候才去做,這樣可以降低服務(wù)器壓力,另外還能實(shí)時監(jiān)控:
文章來源:http://www.zghlxwxcb.cn/news/detail-778450.html
是不是很方便呢?趕快到自己的項(xiàng)目中使用吧。?文章來源地址http://www.zghlxwxcb.cn/news/detail-778450.html
到了這里,關(guān)于Java開發(fā) - 消息隊(duì)列之RabbitMQ初體驗(yàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!