前言:本文通過(guò)springBoot -maven 框架,對(duì)Rabbitmq 進(jìn)行整合,完成客戶端消息的發(fā)送和消費(fèi);
1 為什么要使用Rabbitmq:
RabbitMQ 是一個(gè)可靠的、靈活的、開(kāi)源的消息中間件,具有以下優(yōu)點(diǎn):
-
異步通信:RabbitMQ 支持異步通信,使得消息發(fā)送者和接收者能夠異步處理,提高了系統(tǒng)性能和吞吐量。
-
解耦合:RabbitMQ 的消息隊(duì)列機(jī)制可以將發(fā)送者和接收者解耦合,減少了應(yīng)用程序之間的耦合度。
-
可靠性高:RabbitMQ 支持事務(wù)和持久化,能夠確保消息不會(huì)丟失。
-
高吞吐量:RabbitMQ 支持多種吞吐量調(diào)優(yōu)方法,能夠處理高并發(fā)的消息通訊。
-
可擴(kuò)展性:RabbitMQ 支持集群和分布式部署,可以擴(kuò)展到大規(guī)模的消息通訊場(chǎng)景。
RabbitMQ 提供了易用、高效、靈活、可靠的消息傳遞機(jī)制,可以幫助開(kāi)發(fā)者更快地構(gòu)建系統(tǒng)并實(shí)現(xiàn)各種復(fù)雜的業(yè)務(wù)場(chǎng)景。
2 springboot 整合:
2.1 pom 引入依賴:
<!-- rabbitmq 自動(dòng)裝配 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 提供web訪問(wèn) 默認(rèn)端口8080 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- loomback 用于生成get set 方法 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 阿里的json 數(shù)據(jù)轉(zhuǎn)換 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.31</version>
</dependency>
2.2 連接參數(shù)配置:
2.2.1 基礎(chǔ)配置:
基礎(chǔ)配置后springboot 的自動(dòng)裝載機(jī)制會(huì)注冊(cè)一個(gè)RabbitTemplate rabbitTemplate 對(duì)象用于消息的接收和發(fā)送;
############# 基礎(chǔ)配置
# mq 服務(wù)器的地址
spring.rabbitmq.host=localhost
# mq 服務(wù)器的端口
spring.rabbitmq.port=5672
# mq 服務(wù)器的連接使用的用戶名
spring.rabbitmq.username=admin
# mq 服務(wù)器的連接使用的密碼
spring.rabbitmq.password=rabbitmq
# mq 服務(wù)器的連接使用的虛擬機(jī)
spring.rabbitmq.virtual-host=my_vhost
注意: 其中 spring.rabbitmq.virtual-host 為隔離的虛擬機(jī),需要根據(jù)自己業(yè)務(wù)進(jìn)行配置,如果rabbitmq 有web 端可以在web端創(chuàng)建需要的v_host:
2.2.2 可擴(kuò)展的連接參數(shù)配置:
############# 連接和管道配置
# When the cache mode is 'CHANNEL', the connection cache size cannot be configured.
# spring.rabbitmq.cache.connection.mode 為connection 生效 ,connection 連接池的大小
#spring.rabbitmq.cache.connection.size=3
# 與broker 連接的 模式 channel 或者 connection 默認(rèn)channel
spring.rabbitmq.cache.connection.mode=channel
# 與broker 連接的默認(rèn)時(shí)間,默認(rèn)為 60000即 60 秒,超時(shí)會(huì)會(huì)中斷并拋出異常,單位毫秒
spring.rabbitmq.connection-timeout=1000
# 每個(gè)連接中可以建立的channel 數(shù)量,默認(rèn)值25
spring.rabbitmq.cache.channel.size=50
# 如果已達(dá)到channel緩存大小,等待獲取channel的時(shí)間。 如果為0,則始終創(chuàng)建一個(gè)新channel
# 默認(rèn)值為 -1,表示不限制等待時(shí)間,即一直等待直到獲取到可用的 Channel,單位毫秒
spring.rabbitmq.cache.channel.checkout-timeout=2000
# 指定心跳超時(shí),單位秒,0為不指定;默認(rèn)60s
spring.rabbitmq.requested-heartbeat=60
# 客戶端總共可以創(chuàng)建總的channel 數(shù)量
spring.rabbitmq.requested-channel-max=1024
默認(rèn)與rabbitmq 的連接為channel,多個(gè)channel 公用一個(gè)connection , 每個(gè)線程都從緩存池中獲取channel ,每個(gè)線程中持有的channel 是互相隔離的;
2.3 生產(chǎn)者發(fā)送消息:
生產(chǎn)者發(fā)送消息主要是通過(guò) 引入 RabbitTemplate 模版對(duì)象來(lái)完成;這里按照發(fā)送消息發(fā)送的場(chǎng)景分別進(jìn)行介紹:
2.3.1 交換機(jī)和隊(duì)列的綁定:
因?yàn)橄⒆铋_(kāi)始是要發(fā)送到交換機(jī)上的,然后在通過(guò)交換機(jī)通過(guò)routkey 路由鍵到匹配的隊(duì)列中;所以我們需要先在項(xiàng)目中使用的
virtual-host 中去分別創(chuàng)建交換機(jī)和隊(duì)列,然后進(jìn)行綁定;一幫情況下,我們應(yīng)該向運(yùn)維去申請(qǐng)自己的虛擬機(jī),交換機(jī),隊(duì)列,然后通過(guò)后,項(xiàng)目中直接使用即可;當(dāng)然通過(guò)代碼也完全可以進(jìn)行交換機(jī)和隊(duì)列的創(chuàng)建和綁定,這里我們通過(guò)web 頁(yè)面來(lái)進(jìn)行處理:
2.3.1.1交換機(jī)的創(chuàng)建:
-
Virtual host : 對(duì)應(yīng)隔離的虛擬機(jī),所以需要選擇項(xiàng)目中 通過(guò)spring.rabbitmq.virtual-host 參數(shù)連接的虛擬機(jī);
-
Name: 虛擬機(jī)的名稱,見(jiàn)名知意即可;
-
Type: 虛擬機(jī)的類型:比較常用的有直連 direct; 主題topic,廣播fanout;
這里對(duì)交換機(jī)的類型進(jìn)行簡(jiǎn)單的介紹: -
直連direct的交換機(jī),交換機(jī)直接與隊(duì)列完成綁定,通過(guò)發(fā)送消息是攜帶的Routing Key 和隊(duì)列與 Exchange 綁定時(shí)指定的 Routing Key 精準(zhǔn)匹配,然后路由消息到指定隊(duì)列中:
-
Direct Exchange
Direct Exchange 是最簡(jiǎn)單的交換機(jī)類型,交換機(jī)直接與隊(duì)列完成綁定,它根據(jù)消息攜帶的 Routing Key 和隊(duì)列與 Exchange 綁定時(shí)指定的 Routing Key 精準(zhǔn)匹配,然后路由消息到指定隊(duì)列中。 Direct Exchange 可以理解為一張路由表,交換機(jī)通過(guò) Routing Key 在路由表中查找匹配隊(duì)列,將消息從生產(chǎn)者處發(fā)送到匹配隊(duì)列。 -
Topic Exchange
Topic Exchange 根據(jù) Routing Key 的匹配規(guī)則將消息路由到對(duì)應(yīng)的隊(duì)列中。Topic Exchange 支持兩種匹配規(guī)則:* 代表通配符,表示可以匹配一個(gè)單詞,# 代表通配符,表示可以匹配多個(gè)單詞。例如,Routing Key 為 com.XXX.# 的消息會(huì)被路由到匹配 com.XXX. 開(kāi)頭的所有隊(duì)列中,Routing Key 為 # ,會(huì)匹配到所有的消息;列如 user.* 匹配 user. 后跟一個(gè)單詞的消息,可以匹配到user.a 但是匹配不到user.a.b 。 -
Fanout Exchange
Fanout Exchange 會(huì)將消息路由到所有綁定到它上面的隊(duì)列中。Fanout Exchange 的路由方式與路由表無(wú)關(guān),會(huì)忽略 Routing Key,與 Direct Exchange 和 Topic Exchange 相比,它具有更高的傳輸效率和更低的消耗。 -
Headers Exchange
Headers Exchange 根據(jù)消息頭中的鍵值對(duì)匹配規(guī)則將消息路由到對(duì)應(yīng)的隊(duì)列中。Headers Exchange 的匹配規(guī)則相對(duì)較復(fù)雜,需要在綁定時(shí)指定鍵值對(duì)的匹配方式。 -
Durability : 交換機(jī)是否持久化到磁盤的屬性值設(shè)置
-
如果將 Durability 屬性設(shè)置為 durable ,表示交換器會(huì)被持久化到磁盤上,即使 RabbitMQ 服務(wù)器在交換機(jī)定義被創(chuàng)建之后終止,交換機(jī)定義仍然能夠在服務(wù)器重新啟動(dòng)時(shí)得到恢復(fù),從而保證交換機(jī)在重啟后仍然存在。
-
如果將 Durability 屬性設(shè)置為 transient ,表示交換器不會(huì)被持久化到磁盤上,如果 RabbitMQ 服務(wù)器重啟,則該交換器定義將會(huì)丟失。
-
Auto delete 用于指定該交換機(jī)是否自動(dòng)刪除。當(dāng)一個(gè)交換機(jī)關(guān)聯(lián)的所有隊(duì)列都被刪除時(shí),如果交換機(jī)的 Auto Delete 屬性為 true,則該交換機(jī)也會(huì)被自動(dòng)刪除
-
Internal 是否為內(nèi)部交換機(jī):
內(nèi)部交換機(jī)的 internal 屬性設(shè)置為 true,使其只能被通過(guò) AMQP 協(xié)議連接到相同 Virtual Host 的客戶端使用,不能被直連類型的 Exchange 或 Headers 類型的 Exchange 所使用。
內(nèi)部交換機(jī)只能用于消費(fèi)者和生產(chǎn)者在同一個(gè) RabbitMQ 實(shí)例中的場(chǎng)景,而不能用于服務(wù)器和客戶端之間傳遞消息。
內(nèi)部交換機(jī)主要用于應(yīng)用程序之間傳遞消息,而不是用于服務(wù)器和客戶端之間傳遞消息。 -
Arguments:交換機(jī)的額外屬性,比較常用的屬性如alternate-exchange:指定備用交換機(jī)。如果一條消息無(wú)法被路由到任何隊(duì)列中,那么它將被發(fā)送到備用交換機(jī)中;
一般我們創(chuàng)建交換機(jī)時(shí)只需要選擇Virtual host:,填入交換機(jī)的名稱,選擇交互機(jī)的類型這3項(xiàng),其它都默認(rèn)即可:
2.3.1.2 隊(duì)列的創(chuàng)建:- type 隊(duì)列的類型:
在 RabbitMQ 中,隊(duì)列的 type 參數(shù)共有三種,分別是 classic、quorum 和 stream。它們的區(qū)別可以簡(jiǎn)單概括如下:
classic 隊(duì)列:
最早的、經(jīng)典的隊(duì)列類型,支持多個(gè)消費(fèi)者競(jìng)爭(zhēng)消費(fèi)消息,但是在節(jié)點(diǎn)宕機(jī)時(shí)可能會(huì)出現(xiàn)消息丟失的情況。適用于簡(jiǎn)單的消息處理場(chǎng)景。
quorum 隊(duì)列:
支持高可用性、多個(gè)消費(fèi)者競(jìng)爭(zhēng)消費(fèi)的隊(duì)列類型。它通過(guò)復(fù)制機(jī)制保證消息的可靠性,可以在節(jié)點(diǎn)宕機(jī)時(shí)自動(dòng)進(jìn)行故障轉(zhuǎn)移,避免消息丟失。適用于需要高可用特性的分布式環(huán)境中使用,但相對(duì)來(lái)說(shuō),quorum 隊(duì)列性能較 classic 隊(duì)列有所下降。
stream 隊(duì)列:
支持無(wú)限緩存的消息流隊(duì)列,可以通過(guò)隊(duì)列中的緩存來(lái)處理各種等待中的問(wèn)題。傳統(tǒng)隊(duì)列中當(dāng)消息進(jìn)入隊(duì)列時(shí),它就被立即寫(xiě)入了內(nèi)存中,并等待處理。這樣做的問(wèn)題是,當(dāng)生產(chǎn)者不斷地發(fā)送消息時(shí),很容易將內(nèi)存撐滿。 stream 隊(duì)列則允許隊(duì)列的緩存區(qū)域隨著時(shí)間和隊(duì)列大小的增長(zhǎng)而擴(kuò)展,使得待處理的消息可以在緩存區(qū)域中有所體現(xiàn)。適用于需要處理海量時(shí)間序列數(shù)據(jù)的場(chǎng)景。
需要注意的是,stream 隊(duì)列是從 RabbitMQ 3.8 開(kāi)始引入的新類型,目前還不是很成熟,可能在穩(wěn)定性和性能方面還需要更多的優(yōu)化和改進(jìn)。因此,在選擇隊(duì)列類型時(shí),需要結(jié)合具體的業(yè)務(wù)情況和系統(tǒng)限制,選擇采用 classic、quorum 還是 stream 隊(duì)列,以達(dá)到最優(yōu)的性能和可用性。
- Name 隊(duì)列的名稱;
- Durability 隊(duì)列是否持久化,參數(shù)意義同交換機(jī);
- Auto delete:
在 RabbitMQ 中,隊(duì)列的 auto-delete 參數(shù)用于控制隊(duì)列的自動(dòng)刪除行為。如果將 auto-delete 參數(shù)設(shè)置為 true,則在最后一個(gè)消費(fèi)者斷開(kāi)連接時(shí),隊(duì)列會(huì)自動(dòng)被刪除。 - Arguments 隊(duì)列參數(shù)的額外選擇;
通常創(chuàng)建隊(duì)列時(shí)只需要選擇Virtual host,填入隊(duì)列的名稱,其它項(xiàng)默認(rèn)即可:
2.3.1.3 交換機(jī)和隊(duì)列的綁定:完成交換機(jī)和隊(duì)列關(guān)系的綁定
2.3.2 發(fā)送消息:
2.3.2.2 生產(chǎn)者參數(shù)的配置:
########## 生產(chǎn)者配置
spring.rabbitmq.template.exchange=my_exchange
# 啟用消息投遞結(jié)果確認(rèn)
spring.rabbitmq.publisher-returns=true
# 啟用強(qiáng)制消息投遞,即生產(chǎn)者發(fā)送消息成功或者失敗,需要返回確認(rèn)消息
spring.rabbitmq.template.mandatory=true
# 消息發(fā)布者確認(rèn)模式
spring.rabbitmq.publisher-confirm-type=correlated
# 發(fā)送重試是否可用
spring.rabbitmq.template.retry.enabled= true
# 最大重試次數(shù),默認(rèn)值為 3
spring.rabbitmq.template.retry.max-attempts=3
# 第一次和第二次嘗試發(fā)布或傳遞消息之間的間隔,默認(rèn)值為 1000 毫秒
spring.rabbitmq.template.retry.initial-interval=1000
#表示時(shí)間間隔的倍數(shù)系數(shù),默認(rèn)值為 1 當(dāng)進(jìn)行第 n 次重試時(shí),
# 會(huì)將時(shí)間間隔設(shè)置為 initial-interval * multiplier^(n-1) ,用于控制重試時(shí)間間隔逐漸增加
spring.rabbitmq.template.retry.multiplier=1
# 表示時(shí)間間隔的最大值,默認(rèn)值為 10000 毫秒
spring.rabbitmq.template.retry.max-interval= 1000
2.3.2.3 使用RabbitTemplate 模版發(fā)送單條消息,發(fā)送多條消息,發(fā)送延遲消息,使用自定義的RabbitTemplate 發(fā)送事務(wù)消息:
1) 定義一個(gè)類來(lái)封裝我們要發(fā)送的消息結(jié)構(gòu):
package com.example.rabbitmqdemo.rabbitmq.msgDto;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class MsgDto implements Serializable {
// 消息類型
private String msgType;
// 消息體
private Object body;
}
2) 對(duì)RabbitTemplate 模版對(duì)象配置消息確認(rèn):
如果消息投遞失敗,我們需要對(duì)此類消息進(jìn)行記錄,方便后續(xù)進(jìn)行數(shù)據(jù)補(bǔ)償;
package com.example.rabbitmqdemo.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component("rabbitMqCustomerConfig")
public class BatchConfig {
@Value("${env:prod}")
private String env;
@Autowired
SimpleRabbitListenerContainerFactory containerFactory;
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void simpleListenerBatchInit() {
log.info("設(shè)置批量-----");
containerFactory.setBatchListener(true);
if ("prod".equals(env)) {
// 依照不同的環(huán)境進(jìn)行開(kāi)啟
containerFactory.setAutoStartup(true);
}
// 設(shè)置 ConfirmCallback 回調(diào)函數(shù) 確認(rèn)消息是否成功發(fā)送到 Exchang
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
if (null == correlationData) {
// 延遲消息 correlationData 為null
return;
}
log.debug("Message sent successfully:{} ", correlationData.getId());
} else {
if (null == correlationData && null == cause) {
// 延遲消息 correlationData 為null
return;
}
log.error("Message sent failed: {}", correlationData.getId() + ", cause: " + cause);
}
});
// ReturnCallback 處理的是未路由的消息返回的情況
rabbitTemplate.setReturnCallback((oneMessage, replyCode, replyText, exchange, routingKey) -> {
// 判斷是否是延遲消息
if (routingKey.indexOf("delay") != -1) {
// 是一個(gè)延遲消息,忽略這個(gè)錯(cuò)誤提示
return;
}
log.debug("Message returned: {}", new String(oneMessage.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
});
}
}
3) 因?yàn)榘l(fā)送事務(wù)需要關(guān)閉消息的確認(rèn),所以這里重新定義一個(gè)RabbitTemplate 模版用來(lái)發(fā)送事務(wù)消息:
package com.example.rabbitmqdemo.rabbitmq.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TxRabbitTemplate {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Bean(value = "txRabbitTemplat")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
private ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setChannelCacheSize(10);
// 關(guān)閉消息的ack 確認(rèn)
connectionFactory.setPublisherConfirms(false);
connectionFactory.setPublisherReturns(false);
return connectionFactory;
}
}
4)使用自動(dòng)裝配的RabbitTemplate 模版來(lái)進(jìn)行 消息發(fā)送 :
package com.example.rabbitmqdemo.rabbitmq.producer;
import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.UUID;
@Slf4j
@Component
public class MessageProducer {
// 這里可以指定一個(gè)默認(rèn)發(fā)送使用的交換機(jī)
@Value("${amqp-binding.exchange-name:my_exchange}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("txRabbitTemplat")
private RabbitTemplate txRabbitTemplate;
/**
* 指定的routKey 發(fā)送信息
*
* @param message
*/
public void sendMessage(String routKey, Object message) {
this.sendMessage(exchangeName, routKey, JSONObject.toJSONString(message));
}
/**
* 通過(guò)交換機(jī),路由key 發(fā)送消息
*
* @param exchangeName
* @param routKey
* @param message
*/
public void sendMessage(String exchangeName, String routKey, Object message) {
// 設(shè)置消息的唯一標(biāo)識(shí)符
long deliveryTag = System.currentTimeMillis();
rabbitTemplate.convertAndSend(exchangeName, routKey, message, messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + deliveryTag));
return messagePostProcessor;
}, new CorrelationData(UUID.randomUUID().toString()));
}
/**
* 指定的routKey 發(fā)送批量信息
*
* @param messages
*/
public void sendMessageBatch(String routKey, MsgDto messages) {
this.sendMessageBatch(exchangeName, routKey, JSONObject.toJSONString(messages));
}
/**
* 通過(guò)交換機(jī),路由key 發(fā)送批量信息
*
* @param exchangeName
* @param routKey
* @param messages
*/
public void sendMessageBatch(String exchangeName, String routKey, Object messages) {
rabbitTemplate.convertSendAndReceive(exchangeName, routKey, messages, messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + 1));
return messagePostProcessor;
}, new CorrelationData(UUID.randomUUID().toString()));
}
/**
* 指定的routKey 發(fā)送信息
*
* @param message
*/
public void sendDelayMessage(String routKey, Object message, long delayTime) {
this.sendDelayMessage(exchangeName, routKey, message, delayTime);
}
/**
* 指定的routKey 發(fā)送延遲信息
*
* @param message
*/
public void sendDelayMessage(String exchangeName, String routKey, Object message, long delayTime) {
log.debug("producer send delay message:{}", message);
rabbitTemplate.convertAndSend(exchangeName, routKey, message, header -> {
header.getMessageProperties().setHeader("x-delay", delayTime);
return header;
});
}
/**
* 指定的routKey 發(fā)送事務(wù)信息
*
* @param message
*/
@SneakyThrows
public void sendTxMessage(String exchangeName, String routKey, Object message) {
log.debug("producer send delay message:{}", message);
String messageStr = JSONObject.toJSONString(message);
// method 1:
// sendTransactedMsgByNewChannel(exchangeName,routKey,message);
// method2:
sendTransactedMsgByNTemplate(exchangeName, routKey, messageStr);
}
private void sendTransactedMsgByNTemplate(String exchangeName, String routKey, String message) {
txRabbitTemplate.execute(channel -> {
try {
String messageId = UUID.randomUUID().toString() + "_messageId";
String correlationId = UUID.randomUUID().toString() + "_correId";
// 創(chuàng)建 BasicProperties 對(duì)象并設(shè)置屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.messageId(messageId)
.correlationId(correlationId)
.build();
channel.txSelect(); // 開(kāi)啟事務(wù)
channel.basicPublish(exchangeName, routKey, properties, message.getBytes(Charset.forName("UTF-8"))); // 發(fā)送消息
// "124".substring(7);
channel.txCommit(); // 提交事務(wù)
} catch (Exception e) {
channel.txRollback(); // 回滾事務(wù)
}
return true;
});
}
@SneakyThrows
private void sendTransactedMsgByNewChannel(String exchangeName, String routKey, String message) {
// 獲取新的channel 對(duì)象
Channel channel = txRabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
// 開(kāi)啟事務(wù)
channel.txSelect();
try {
// 消息格式化
channel.basicPublish(exchangeName, routKey, null, message.getBytes(Charset.forName("UTF-8")));
// 消息提交
channel.txCommit();
} catch (IOException e) {
channel.txRollback();
throw e;
}
}
}
5)測(cè)試代碼:
package com.example.rabbitmqdemo.rabbitmq.controller;
import com.example.rabbitmqdemo.rabbitmq.enums.RabbitRoutKeyEnum;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.example.rabbitmqdemo.rabbitmq.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
@RestController
public class TestSendMsgController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/sendMsg")
public boolean sendMsg(@RequestParam String content,@RequestParam String routKey) {
List<Object> msgs = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
msgs.add(content+"_"+i);
}
msgs.stream().forEach(e->{
MsgDto msgDto = new MsgDto("user",e);
messageProducer.sendMessage(RabbitRoutKeyEnum.業(yè)務(wù)_單條消息.getRoutKey(),msgDto);
});
return true;
}
@GetMapping("/sendBatchMsg")
public boolean sendBatchMsg(@RequestParam String content,@RequestParam String routKey) {
List<Object> msgs = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
msgs.add(content+"_"+i);
}
MsgDto msgDto = new MsgDto("test",msgs);
messageProducer.sendMessageBatch(RabbitRoutKeyEnum.業(yè)務(wù)_多條消息.getRoutKey(), msgDto);
return true;
}
@GetMapping("/sendDelayMsg")
public boolean sendDelayMsg(@RequestParam String content,@RequestParam long delayTime) {
List<Object> msgs = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
msgs.add(content+"_"+i);
}
msgs.stream().forEach(e->{
messageProducer.sendDelayMessage("my_delay_exchange",RabbitRoutKeyEnum.業(yè)務(wù)_延遲.getRoutKey(),e,delayTime);
});
return true;
}
@GetMapping("/sendTxMsg")
public boolean sendTxMsg(@RequestParam String content) {
List<Object> msgs = new ArrayList<>(10);
for (int i = 0; i < 2; i++) {
msgs.add(content+"_"+i);
}
msgs.stream().forEach(e->{
MsgDto msgDto = new MsgDto("tx",e);
messageProducer.sendTxMessage("my_tx_exchange",RabbitRoutKeyEnum.業(yè)務(wù)_事務(wù).getRoutKey(),msgDto);
// messageProducer.sendMessage(RabbitRoutKeyEnum.業(yè)務(wù)_單條消息.getRoutKey(),msgDto);
});
return true;
}
}
這里分別測(cè)試了單條消息,多條消息,延遲消息,事務(wù)消息的發(fā)送,將其封裝為MsgDto對(duì)象,在發(fā)送時(shí)將其轉(zhuǎn)為json 字符串;基本上滿足了大部分的業(yè)務(wù)場(chǎng)景;需要注意的是rabbitmq 中所謂批量發(fā)送的消息實(shí)際上會(huì)被消息壓縮為1條消息進(jìn)行發(fā)送,到達(dá)隊(duì)列是也是1條消息;
6 )routKey 的枚舉類:
package com.example.rabbitmqdemo.rabbitmq.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum RabbitRoutKeyEnum {
業(yè)務(wù)_單條消息("my_routKey"),
業(yè)務(wù)_多條消息("my_batch_routKey"),
業(yè)務(wù)_1("my_one_routKey"),
業(yè)務(wù)_延遲("my_delay_routKey"),
業(yè)務(wù)_事務(wù)("my_tx_routKey"),
;
private String routKey;
}
至此我們已基本完成生產(chǎn)端消息的發(fā)送以及發(fā)送結(jié)果的監(jiān)聽(tīng)處理;需要注意的是對(duì)于延遲消息,返回的確認(rèn)消息correlationData 是一個(gè)null 值,所以這里對(duì)其消息的確認(rèn)進(jìn)行了一次特殊的判斷;
3 消費(fèi)者接收消息:
3.1 消費(fèi)者參數(shù)的配置:
########## 消費(fèi)者配置
# 是否自動(dòng)啟動(dòng)消息的監(jiān)聽(tīng)
spring.rabbitmq.listener.simple.auto-startup=false
# 消費(fèi)消息確認(rèn)模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 批量預(yù)取條數(shù) 默認(rèn)值250
spring.rabbitmq.listener.simple.prefetch=50
# 開(kāi)啟批量消費(fèi)
spring.rabbitmq.listener.simple.consumer-batch-enabled=true
# 批量消費(fèi)的條數(shù)
spring.rabbitmq.listener.simple.batch-size=2
# 并發(fā)消費(fèi)最小線程數(shù)
spring.rabbitmq.listener.simple.concurrency=1
# 并發(fā)消費(fèi)最大線程數(shù)
spring.rabbitmq.listener.simple.max-concurrency=1
### 消費(fèi)失敗 重試參數(shù)
# 開(kāi)啟重試
spring.rabbitmq.listener.simple.retry.enabled=true
# 表示最大重試次數(shù),默認(rèn)值為 3
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 表示第一次重試的時(shí)間間隔,默認(rèn)值為 1000 毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000
#表示時(shí)間間隔的倍數(shù)系數(shù),默認(rèn)值為 1 當(dāng)進(jìn)行第 n 次重試時(shí),
# 會(huì)將時(shí)間間隔設(shè)置為 initial-interval * multiplier^(n-1) ,用于控制重試時(shí)間間隔逐漸增加
spring.rabbitmq.listener.simple.retry.multiplier=1
# 表示時(shí)間間隔的最大值,默認(rèn)值為 10000 毫秒
spring.rabbitmq.listener.simple.retry.max-interval=1000
# 消息監(jiān)聽(tīng)器是否啟用無(wú)狀態(tài)(stateless)重試 默認(rèn)true
spring.rabbitmq.listener.simple.retry.stateless=false
# 控制當(dāng)消息消費(fèi)失敗后,RabbitMQ 是否需要將消息重新入隊(duì)。該參數(shù)的默認(rèn)值為 true,即消息將被重新入隊(duì)
spring.rabbitmq.listener.simple.default-requeue-rejected=true
以上參數(shù),配置了消費(fèi)端消費(fèi)消息后的ack 機(jī)制為手動(dòng)提交,并且設(shè)定了 批量預(yù)取條數(shù) 和每次批量消費(fèi)的條數(shù),以及消費(fèi)失敗的重試機(jī)制配置;
3.2 消費(fèi)消息:
消費(fèi)者監(jiān)聽(tīng)某個(gè)或者幾個(gè)隊(duì)列,然后通過(guò)channel 獲取要消費(fèi)的消息:
package com.example.rabbitmqdemo.rabbitmq.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
@Slf4j
@Component
public class MessageConsumer {
/**
* 逐條/批量 消費(fèi)
*
* @param messages
*/
// @RabbitListener(queues = "my_queue_one")
public void receiveMessage(List<Message> messages, Channel channel) throws IOException {
log.debug("逐條消費(fèi)消息:{}", messages);
for (Message message : messages) {
try {
// // 處理消息
log.debug("Received message: {}", message);
String jsonMessage = new String(message.getBody(), "UTF-8");
MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);
// 數(shù)據(jù)處理
// 手動(dòng)發(fā)送 ack 消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception ex) {
// 發(fā)生異常,手動(dòng)發(fā)送 nack 消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
/**
* 逐條消費(fèi)--延時(shí)消息
*
* @param messages
*/
@RabbitListener(queues = "my_deay_queue")
public void receiveDelayMessage(List<Message> messages, Channel channel) throws IOException {
for (Message message : messages) {
try {
// 處理消息
log.debug("Received delay message: {}", message);
String jsonMessage = new String(message.getBody(), "UTF-8");
MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);
// 手動(dòng)發(fā)送 ack 消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception ex) {
// 發(fā)生異常,手動(dòng)發(fā)送 nack 消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
/**
* 逐條消費(fèi)--事務(wù)消息
*
* @param messages
*/
@RabbitListener(queues = "my_tx_queue")
public void receiveTxMessage(List<Message> messages, Channel channel) throws IOException {
for (Message message : messages) {
try {
// 處理消息
log.debug("Received delay message: {}", message);
String jsonMessage = new String(message.getBody(), "UTF-8");
MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);
// 手動(dòng)發(fā)送 ack 消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception ex) {
// 發(fā)生異常,手動(dòng)發(fā)送 nack 消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
這里我們接收到消息后,然后通過(guò)"UTF-8"編碼(生產(chǎn)者默認(rèn)按照UTF-8 對(duì)數(shù)據(jù)編碼后進(jìn)行發(fā)送)將字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串,然后通過(guò)阿里的json jar 完成java 對(duì)象的轉(zhuǎn)換,進(jìn)行業(yè)務(wù)處理,最后手動(dòng)提交消息;
4 總結(jié):
- Rabbitmq 對(duì)于消息的發(fā)送依賴于交換機(jī),通過(guò)routKey 綁定不同的queue 完成消息的路由工作;
- Rabbitmq 發(fā)送消息可以為其配置ack確認(rèn)機(jī)制,以及發(fā)送失敗重試機(jī)制參數(shù)可以配合完成消息的發(fā)送;
- Rabbitmq 發(fā)送消息可以進(jìn)行批量發(fā)送,但是本質(zhì)上會(huì)被合并到一條消息進(jìn)行發(fā)送;
- Rabbitmq 對(duì)于消息的消費(fèi),依賴于構(gòu)建channel 管道 ,綁定queue 完成消息的消費(fèi);
- Rabbitmq 消費(fèi)消息,可以進(jìn)行手動(dòng)的ack 確認(rèn),并且可以設(shè)置消費(fèi)重試參數(shù),應(yīng)便于消費(fèi)失敗的場(chǎng)景;
5 擴(kuò)展:
5.1 rabbitmq 發(fā)送事務(wù)消息為什么要關(guān)閉 消息的確認(rèn)回調(diào)?
在RabbitMQ中,如果發(fā)送事務(wù)消息,并且開(kāi)啟了確認(rèn)模式,那么需要特別注意關(guān)閉消息的確認(rèn)回調(diào),以避免一些潛在的問(wèn)題。
在RabbitMQ中,開(kāi)啟事務(wù)模式后,生產(chǎn)者發(fā)送消息時(shí),RabbitMQ會(huì)將消息緩存在生產(chǎn)者端。在事務(wù)提交之前,不會(huì)直接將消息發(fā)送到隊(duì)列。如果在事務(wù)未提交的情況下,RabbitMQ服務(wù)器異常中斷或者連接被關(guān)閉,那么消息將會(huì)丟失。為了避免這種情況的發(fā)生,可以采用事務(wù)提交確認(rèn)和確認(rèn)模式,在確認(rèn)之后才將消息發(fā)送到隊(duì)列中。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-724887.html
然而,在發(fā)送事務(wù)消息時(shí),開(kāi)啟確認(rèn)模式后,需要關(guān)閉消息的確認(rèn)回調(diào)。這是因?yàn)樵谑聞?wù)提交之前,消息并沒(méi)有發(fā)送到隊(duì)列中,確認(rèn)回調(diào)將在消息發(fā)送到隊(duì)列后才觸發(fā)。而在事務(wù)模式下,消息已經(jīng)被緩存到生產(chǎn)者端,沒(méi)有被發(fā)送到隊(duì)列中,所以確認(rèn)回調(diào)不應(yīng)該被觸發(fā)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-724887.html
到了這里,關(guān)于Idea+maven+springboot項(xiàng)目搭建系列--2 整合Rabbitmq完成客戶端&服務(wù)器端消息收發(fā)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!