作者主頁:Designer 小鄭
作者簡介:3年JAVA全棧開發(fā)經(jīng)驗,專注JAVA技術、系統(tǒng)定制、遠程指導,致力于企業(yè)數(shù)字化轉(zhuǎn)型,CSDN博客專家,藍橋云課認證講師。
一、前言
1.1 什么是消息隊列
消息隊列是一種在應用程序之間傳遞數(shù)據(jù)的通信機制,它基于 發(fā)布-訂閱
模式,將消息發(fā)送者(發(fā)布者)和消息接收者(訂閱者)解耦,使得它們可以獨立地進行消息的發(fā)送和接收。
在消息隊列中,消息發(fā)送者將消息發(fā)送到隊列中,而消息接收者則從隊列中獲取消息進行處理。消息隊列提供了一種異步的通信方式,即發(fā)送者發(fā)送消息后不需要等待接收者的回復,而可以立即繼續(xù)執(zhí)行其他操作。同時,消息隊列還可以實現(xiàn)消息的持久化存儲,確保消息在發(fā)送和接收過程中的可靠性。
消息隊列的應用場景非常廣泛,例如:
- 在分布式系統(tǒng)中,可以用消息隊列來實現(xiàn)不同模塊之間的解耦;
- 在高并發(fā)系統(tǒng)中,可以利用消息隊列來緩解系統(tǒng)壓力;
- 在實時數(shù)據(jù)處理中,可以將數(shù)據(jù)存儲在消息隊列中,再由數(shù)據(jù)處理模塊進行處理。
1.2 RabbitMQ 是什么
RabbitMQ是一個開源的消息隊列中間件,它實現(xiàn)了高級消息隊列協(xié)議),并提供了可靠的消息傳遞機制。
RabbitMQ使用Erlang語言編寫,具有高度可靠、可擴展、靈活和可插拔的特性,被廣泛應用于分布式系統(tǒng)、微服務架構、異步任務處理等場景。
RabbitMQ基于生產(chǎn)者和消費者模型工作。生產(chǎn)者將消息發(fā)送到RabbitMQ的交換機,然后交換機將消息路由到一個或多個隊列,消費者從隊列中獲取消息并進行處理。
RabbitMQ 支持多種消息傳遞模式,同時還提供了消息的持久化、消息優(yōu)先級、消息確認機制等特性,確保消息的可靠性和可靠傳輸。
RabbitMQ是一個成熟、可靠的消息隊列中間件,提供了強大的消息傳遞機制和豐富的特性,被廣泛應用于分布式系統(tǒng)和異步消息處理中。
1.3 為什么需要用到 RabbitMQ
-
解耦:RabbitMQ通過消息隊列實現(xiàn)了生產(chǎn)者和消費者的解耦。生產(chǎn)者將消息發(fā)送到隊列中,而消費者從隊列中獲取消息并進行處理。這種解耦使得系統(tǒng)中的不同模塊能夠獨立進行開發(fā)和部署,提高了系統(tǒng)的靈活性和可維護性。
-
異步通信:RabbitMQ提供了一種異步通信機制。生產(chǎn)者發(fā)送消息到隊列后,不需要等待消費者立即處理,而可以繼續(xù)執(zhí)行其他操作。這種異步通信能夠提升系統(tǒng)的并發(fā)性能和響應速度。
-
緩沖和削峰:RabbitMQ可以作為一個緩沖區(qū),用于存儲來自生產(chǎn)者的消息。這樣可以避免生產(chǎn)者和消費者之間的直接耦合,同時也能夠應對瞬時的高并發(fā)請求,減輕系統(tǒng)壓力。
-
可靠性和可恢復性:RabbitMQ提供了持久化消息的功能,即使在消息隊列或消費者故障的情況下,消息也可以得到保留和恢復。這種可靠性保證了消息的不丟失和可靠傳遞。
-
擴展性:RabbitMQ是一個可擴展的消息隊列中間件,可以在需要的時候增加更多的消息隊列和消費者節(jié)點,以應對不斷增長的業(yè)務需求。
-
多語言支持:RabbitMQ提供了多種編程語言的客戶端,如Java、Python、C#等,使得開發(fā)者可以選擇合適自己的編程語言與RabbitMQ進行交互。
1.4 RabbitMQ 相比 Kafka 的優(yōu)勢
RabbitMQ 提供了簡單易用的 API 和管理界面,使得開發(fā)者可以快速上手并進行配置和管理,相比之下,Kafka 的配置和管理相對復雜一些。
RabbitMQ 支持多種消息傳遞模式和消息的路由選擇機制,可以根據(jù)需求進行靈活的消息處理,而Kafka更適用于大規(guī)模的高吞吐量流式處理,通常使用發(fā)布-訂閱模式。
RabbitMQ 具備持久化消息、消息確認機制等特性,可以確保消息的可靠傳輸,而 Kafka 通過多副本機制和消息日志的方式,提供了高度可靠性的消息傳遞。
RabbitMQ 可以通過設置隊列的限流策略和消費者的消費速率來進行流量控制和削峰處理,能夠保護消費者免受過多的消息推送,而Kafka則將消費者的消費速率控制交給消費者自身,在高并發(fā)場景下可能需要額外處理。
RabbitMQ 提供了多種編程語言的客戶端,開發(fā)者可以根據(jù)自己的編程需求選擇合適的客戶端進行交互,而Kafka的客戶端主要集中在Java語言上,對其他語言的支持相對較少。
RabbitMQ 擁有龐大的開源社區(qū)和豐富的生態(tài)系統(tǒng),提供了豐富的插件和集成工具,方便開發(fā)者進行擴展和集成,Kafka的生態(tài)系統(tǒng)相對較小,但在大數(shù)據(jù)領域有廣泛的應用和支持。
二、搭建 RabbitMQ 環(huán)境
2.1 安裝 Erlang
Erlang 是 RabbitMQ 消息服務的基礎環(huán)境,就像 Java 的 JDK 一樣,是必須安裝的。
2.1.1 下載
Erlang 官網(wǎng)下載地址:下載地址。
因為我們要把 RabbitMQ 服務裝在服務器上,所以同學們可以在服務器上下載 Erlang 安裝包,或者下載后手動上傳至服務器。
2.1.2 安裝
下載完成后雙擊安裝包,按照提示流程正常安裝即可,截圖如下所示。
2.1.3 環(huán)境變量配置
變量名如下,變量值是安裝的路徑,如下圖所示。
ERLANG_HOME
驗證命令如下:
erl -v
2.2 安裝 RabbitMQ
2.2.1 下載
RabbitMQ 需要在 Github 中下載,下載地址。
RabbitMQ 版本需要和 Erlang 對應,本文安裝的是
3.9.5
版本。
2.2.2 安裝
安裝流程如下圖所示。
2.2.3 初始化
安裝完成后,使用 cmd 窗口,進入 RabbitMQ 的 sbin 目錄,如下圖所示。
接著輸入以下命令,完成初始化安裝。
rabbitmq-plugins enable rabbitmq_management
2.2.4 驗證
打開瀏覽器,輸入:
http://localhost:15672
賬號密碼都是:
guest
2.3 配置外網(wǎng)訪問
2.3.1 添加新用戶
RabbitMQ 默認端口為15672,用戶名和密碼都為guest,是不允許外部訪問的。
所以我們要添加新用戶,實現(xiàn)外網(wǎng)訪問,操作流程如下圖所示。
點擊添加后,輸入新用戶的賬號和密碼,如下圖所示。
2.3.2 Virtual Host 配置
Virtual Host 需要允許添加的用戶訪問,如下圖所示。
進入子界面后,選擇用戶后提交,如下圖所示。
然后,我們就完成了外網(wǎng)訪問的配置。
三、整合 RabbitMQ 消息服務
3.1 創(chuàng)建新的 SpringBoot 項目
打開 IDEA 工具,新建項目,如下圖所示。
新項目創(chuàng)建完成后,如下圖所示。
3.2 引入依賴
首先,請在 pom.xml
中引入依賴,代碼如下。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.3 配置文件
application.yml 配置如下。
spring:
rabbitmq:
host: 118.126.82.167
port: 5672
username: zwz
password: 123456
listener:
simple:
retry:
enabled: true
max-attempts: 5
initial-interval: 2s
3.4 創(chuàng)建消息發(fā)送工具類
請同學們創(chuàng)建
SimpleProducer
工具類,代碼如下。
package cn.zwz.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class SimpleProducer {
public static void main(String[] args) {
//1. 創(chuàng)建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//1.1 設置連接IP
connectionFactory.setHost("118.126.82.167");
//1.2 設置連接端口
connectionFactory.setPort(5672);
//1.3 設置用戶名
connectionFactory.setUsername("zwz");
//1.4 設置密碼
connectionFactory.setPassword("123456");
//1.5 設置虛擬訪問節(jié)點,就是消息發(fā)送的目標路徑
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2. 創(chuàng)建連接Connection
connection = connectionFactory.newConnection("ZWZ-Connection");
//3. 通過連接獲取通道Channel
channel = connection.createChannel();
//4. 通過通道創(chuàng)建交換機,聲明隊列,綁定關系,路由key,發(fā)送消息,接收消息
String queueName = "ZWZ-TOPIC";
/**
* channel.queueDeclare有5個參數(shù)
* params1: 隊列的名稱
* params2: 是否要持久化, false:非持久化 true:持久化
* params3: 排他性,是否獨占隊列
* params4: 是否自動刪除,如果為true,隊列會隨著最后一個消費消費完后將隊列自動刪除,false:消息全部消費完后,隊列保留
* params5: 攜帶的附加參數(shù)
*/
channel.queueDeclare(queueName, true, false, false, null);
//5. 消息內(nèi)容
String message = "HELLO World!";
//6. 將消息發(fā)送到隊列
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息發(fā)送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
//7. 關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8. 關閉連接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
3.5 發(fā)消息功能測試
發(fā)消息很簡單,運行 main
函數(shù)即可。
發(fā)送成功后,后臺可以接收到數(shù)據(jù),如下圖所示。
3.6 創(chuàng)建消息接收工具類
請同學們創(chuàng)建
SimpleConsumer
工具類,代碼如下。
package cn.zwz.send;
import com.rabbitmq.client.*;
import java.io.IOException;
public class SimpleConsumer {
public static void work() {
//1. 創(chuàng)建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//1.1 設置連接IP
connectionFactory.setHost("118.126.82.167");
//1.2 設置連接端口
connectionFactory.setPort(5672);
//1.3 設置用戶名
connectionFactory.setUsername("zwz");
//1.4 設置密碼
connectionFactory.setPassword("123456");
//1.5 設置虛擬訪問節(jié)點,就是消息發(fā)送的目標路徑
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2. 創(chuàng)建連接Connection
connection = connectionFactory.newConnection("ZWZ-Connection");
//3. 通過連接獲取通道Channel
channel = connection.createChannel();
//4. 通過通道創(chuàng)建交換機,聲明隊列,綁定關系,路由key,發(fā)送消息,接收消息
String queueName = "ZWZ-TOPIC";
//5. 接收消息并消費消息
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收到的消息內(nèi)容是:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失敗。。。");
}
});
System.out.println("開始接受消息。。。。");
//阻斷程序
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
//7. 關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8. 關閉連接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
接著在啟動類上配置運行,代碼如下。
package cn.zwz.send;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SendApplication {
public static void main(String[] args) {
new SimpleConsumer().work();
SpringApplication.run(SendApplication.class, args);
}
}
3.7 收消息功能測試
請同學們運行 SpringBoot 啟動類,然后再次發(fā)送消息,就可以看到消息內(nèi)容了,如下圖所示。
四、總結(jié)
本文首先簡單介紹了 RabbitMQ,然后和 Kafka 等熱門消息隊列進行對比,最后演示了 RabbitMQ 的完整安裝配置整合流程,幫助零基礎的小白入門 RabbitMQ 開發(fā)。文章來源:http://www.zghlxwxcb.cn/news/detail-704814.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-704814.html
到了這里,關于搭建RabbitMQ消息服務,整合SpringBoot實現(xiàn)收發(fā)消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!