今天介紹一下使用RabbitMQ的延遲插件方便實現(xiàn)延遲消息的方案。
RabbitMQ 是一個由 Erlang 語言開發(fā)的?AMQP(高級消息隊列協(xié)議) 的開源實現(xiàn)。
RabbitMQ 是輕量級且易于部署的,能支持多種消息協(xié)議。
RabbitMQ 可以部署在分布式和聯(lián)合配置中,以滿足高規(guī)模、高可用性的需求。
具體特點包括:
-
可靠性(Reliability):RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發(fā)布 確認。
-
靈活的路由(Flexible Routing):在消息進入隊列之前,通過 Exchange 來路由消息的。對 于典型的路由功能,RabbitMQ 已經(jīng)提供了一些內(nèi)置的 Exchange 來實現(xiàn)。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現(xiàn)自己的 Exchange 。
-
消息集群(Clustering):多個 RabbitMQ 服務(wù)器可以組成一個集群,形成一個邏輯 Broker。
-
高可用(Highly Available Queues):隊列可以在集群中的機器上進行鏡像,使得在部分節(jié) 點出問題的情況下隊列仍然可用。
-
多種協(xié)議(Multi-protocol):RabbitMQ 支持多種消息隊列協(xié)議,比如 STOMP、MQTT等等。
-
多語言客戶端(Many Clients):RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、 Ruby 等等。
-
管理界面(Management UI):RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監(jiān)控 和管理消息 Broker 的許多方面。
-
跟蹤機制(Tracing):如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發(fā)生 了什么。
-
插件機制(Plugin System):RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編 寫自己的插件。
RabbitMQ的消息模型
Why use rabbitMQ ?
下面,我以一個(花店)商家的角色來向大家形象地舉例:
異步
之前顧客來店里下單,我會叫顧客等一下,同時叫店員準備訂單,準備好送到顧客手上了顧客才能離開
現(xiàn)在顧客打電話給我:"我要買xxx,地址是:xxx,你幫我送一下"
我拿個小本子記下:顧客a,電話:xxx,地址:xxx
店員有空后就會準備訂單并配送
解耦
以前有新訂單時,我會親自找每一個店員(負責準備花束的,負責記賬的,負責送花的等),告訴他們有新訂單了,有空了處理一下
如果有店員入職,我通知的時候會多通知一個人;離職時,少通知一個人(維護一個需要通知的人員列表)
現(xiàn)在,有新訂單的時候,我只需要記到小本子上,店員有空了自己來看
削峰
去年七夕,很多電話打給我,我把每一個訂單告訴店員,但是店員忙不過來,客戶又一直打電話來催,最后店員累成狗直接罷工
今年七夕節(jié)我學乖了,電話打進來我會告訴顧客:"我知道了,會盡快安排處理",然后記到小本子上就行,店員有空時按順序來處理訂單就好
另外還有一種思路,引導客戶不一定要在七夕才開始下單,可以提前先買(淘寶的雙十一預售就是出于這樣的削峰思路)
以上是rabbitMQ解決的核心問題。
How to use rabbitMQ ?
基操安裝方式
MAC端
brew?install?rabbitmq
Windows端
-
安裝Erlang,下載地址:erlang.org/download/ot…
-
安裝RabbitMQ,下載地址:dl.bintray.com/rabbitmq/al…
-
安裝完成后,進入RabbitMQ安裝目錄下的sbin目錄
-
在地址欄輸入cmd并回車啟動命令行,然后輸入以下命令啟動管理功能:
rabbitmq-plugins?enable?rabbitmq_management
-
訪問地址查看是否安裝成功:http://localhost:15672/
CentOS端
安裝erlang
#?rabbitmq依賴erlang?需要自己去自行下載
cd?/path/to/erlang-sound-code?&&?./configure?--prefix=/usr/local/erlang
make?&&?make?install?
vim?/etc/profile
#?添加
export?PATH=$PATH:/usr/local/erlang/bin
source?/etc/profile
#?輸入erl,會出現(xiàn)版本信息,即安裝成功
安裝rabbitmq
?#下載?abbitmq_server-3.8.16?并移動到/usr/local/下
vim?/etc/profile
?#?添加
export?PATH=$PATH:/usr/local/rabbitmq_server-3.8.16/sbin
source?/etc/profile
cd?/usr/local/rabbitmq_server-3.8.16/sbin?
#?啟動
./rabbitmq-server?start
功能實現(xiàn)
RabbitMQ實現(xiàn)延遲消息的方式有兩種,一種是使用
死信隊列
實現(xiàn),另一種是使用延遲插件
實現(xiàn)。死信隊列的實現(xiàn)網(wǎng)上較多,本文介紹更簡單的,使用
延遲插件
實現(xiàn)(mac環(huán)境,java版本)。
另外的安裝方式(建議使用這種)
首先準備需要用到的安裝文件及插件(rabbitmq_delayed_message_exchange),版本需要匹配,不匹配的版本可能裝不上或?qū)е录嫒輪栴}。
本人使用的erl_25.0和rabbitMQ-3.10.0(可以到官網(wǎng)下載或者私信作者獲?。J褂眠@種方式安裝的優(yōu)點在于本地安裝和服務(wù)器安裝流程完全一致,不過服務(wù)器需要開放安全端口5672,15672視情況,一般建議測試環(huán)境開放,生產(chǎn)環(huán)境關(guān)閉。關(guān)注公眾號:碼猿技術(shù)專欄,回復關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊
安裝erl和rabbitMQ,具體步驟略(這個應(yīng)該沒人不會吧,逃~)。
將插件文件復制到RabbitMQ安裝目錄的plugins
目錄下,執(zhí)行以下命令后重啟rabbitMQ:
rabbitmq-plugins?enable?rabbitmq_delayed_message_exchange
實現(xiàn)延遲消息
以一個實際業(yè)務(wù)場景舉例:當客服狀態(tài)為在線且3分鐘未回復客戶消息時,自動重啟im會話機器人接管會話。這是一個常見的延遲消息使用場景。
首先在pom.xml
文件中添加AMQP
相關(guān)依賴
<!--消息隊列相關(guān)依賴-->
<dependency>
????<groupId>org.springframework.boot</groupId>
????<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml
添加RabbitMQ的相關(guān)配置
spring:
??rabbitmq:
????host:?localhost?#?rabbitmq的連接地址
????port:?5672?#?rabbitmq的連接端口號
????virtual-host:?/mall?#?rabbitmq的虛擬host
????username:?im?#?rabbitmq的用戶名
????password:?xxxxxx?#?rabbitmq的密碼
????publisher-confirms:?true?#如果對異步消息需要回調(diào)必須設(shè)置為true
接下來創(chuàng)建RabbitMQ的java配置,主要用于配置交換機、隊列和綁定關(guān)系
/**
?*?消息隊列配置
?*/
@Configuration
public?class?RabbitMqConfig?{
????/**
?????*?機器人消息重啟插件消息隊列所綁定的交換機
?????*/
????@Bean
????CustomExchange?chatPluginDirect()?{
????????//創(chuàng)建一個自定義交換機,可以發(fā)送延遲消息
????????Map<String,?Object>?args?=?new?HashMap<>();
????????args.put("x-delayed-type",?"direct");
????????return?new?CustomExchange(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(),?"x-delayed-message",?true,?false,?args);
????}
????/**
?????*?機器人消息重啟插件隊列
?????*/
????@Bean
????public?Queue?chatPluginQueue()?{
????????return?new?Queue(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getName());
????}
????/**
?????*?將機器人消息重啟插件隊列綁定到交換機
?????*/
????@Bean
????public?Binding?chatPluginBinding(CustomExchange?chatPluginDirect,?Queue?chatPluginQueue)?{
????????return?BindingBuilder
????????????????.bind(chatPluginQueue)
????????????????.to(chatPluginDirect)
????????????????.with(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey())
????????????????.noargs();
????}
}
創(chuàng)建一個消息的發(fā)出者,通過給消息設(shè)置x-delay
頭來設(shè)置消息從交換機發(fā)送到隊列的延遲時間
/**
?*?機器人重啟隊列發(fā)出者
?*/
@Component
@Slf4j
public?class?ChatQueueSender?{
????private?static?Logger?LOGGER?=?LoggerFactory.getLogger(ChatQueueSender.class);
????@Autowired
????private?AmqpTemplate?amqpTemplate;
????public?void?sendMessageToChat(Long?cmid,?final?long?delayTimes)?{
????????//給延遲隊列發(fā)送消息
????????amqpTemplate.convertAndSend(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(),?QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey(),?cmid,?new?MessagePostProcessor()?{
????????????@Override
????????????public?Message?postProcessMessage(Message?message)?throws?AmqpException?{
????????????????//給消息設(shè)置延遲毫秒值
????????????????message.getMessageProperties().setHeader("x-delay",?delayTimes);
????????????????return?message;
????????????}
????????});
????}
}
創(chuàng)建一個消息的接收者,用于處理延遲插件隊列中的消息。
/**
?*?機器人重啟隊列處理者
?*/
@Component
@Slf4j
@RabbitListener(queues?=?"im.chat.cancel")
public?class?ChatQueueReceiver?{
????@Autowired
????private?ChatRestartRobotService?chatRestartRobotService;
????@RabbitHandler
????public?void?handleOnChat(Long?cmid)?{
//????????log.info("機器人會話重啟");
????????chatRestartRobotService.restartRobot(cmid);
????}
}
最后,在對應(yīng)的地方調(diào)用即可:文章來源:http://www.zghlxwxcb.cn/news/detail-706037.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-706037.html
到了這里,關(guān)于RabbitMQ使用延遲插件,代碼量直接減少一半!的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!