?????歡迎光臨????
??我是蘇澤,一位對(duì)技術(shù)充滿熱情的探索者和分享者。????
??特別推薦給大家我的最新專欄《Redis實(shí)戰(zhàn)與進(jìn)階》
本專欄純屬為愛發(fā)電永久免費(fèi)?。。?/span>
這是蘇澤的個(gè)人主頁可以看到我其他的內(nèi)容哦????
努力的蘇澤http://suzee.blog.csdn.net/
最近工作室的一個(gè)業(yè)務(wù)跟另一個(gè)業(yè)務(wù)合并 自然要用到MQ(消息隊(duì)列Message Queue)那么很顯然 就要部署個(gè)RabbitMQ到服務(wù)器上了??
我們用的是云托管的的服務(wù) 那自然是部署中間件到云服務(wù)上去了 服務(wù)是一路開通 結(jié)果到了需要調(diào)試的時(shí)候 怎么也連不上 (說是內(nèi)網(wǎng)直連,但關(guān)鍵是 同事們都在線下做本地測(cè)試的呀)
直接無語了 面對(duì)這一場(chǎng)景 怎么辦?業(yè)務(wù)還要繼續(xù) 等著交貨的? 于是我想起了之前學(xué)過的技術(shù)棧?
Redis 也能作為消息隊(duì)列的(不過用的比較少所以不大容易記起來 或者也沒啥人知道)?于是一頓卡卡操作? 步驟還比MQ簡單? 下面就來看是如何實(shí)現(xiàn)的
正片
目錄
最近工作室的一個(gè)業(yè)務(wù)跟另一個(gè)業(yè)務(wù)合并 自然要用到MQ(消息隊(duì)列Message Queue)那么很顯然 就要部署個(gè)RabbitMQ到服務(wù)器上了??
Redis 也能作為消息隊(duì)列的(不過用的比較少所以不大容易記起來 或者也沒啥人知道)?于是一頓卡卡操作? 步驟還比MQ簡單? 下面就來看是如何實(shí)現(xiàn)的
正片
Redis作為消息隊(duì)列的優(yōu)缺點(diǎn):
使用Redis作為消息隊(duì)列的選擇相對(duì)于使用專門的消息隊(duì)列系統(tǒng)(如RabbitMQ、Kafka等)有以下優(yōu)點(diǎn)和:
缺點(diǎn)也很明顯:
應(yīng)用場(chǎng)景:
Redis實(shí)現(xiàn)消息隊(duì)列系統(tǒng) 實(shí)現(xiàn)步驟:
配置Redis:
設(shè)置RedisTemplate的序列化器。在消息隊(duì)列中,你可以使用默認(rèn)的序列化器,即StringRedisSerializer,它會(huì)將消息以字符串的形式進(jìn)行存儲(chǔ)和傳輸??梢酝ㄟ^以下代碼設(shè)置默認(rèn)的序列化器:
實(shí)現(xiàn)消息的發(fā)布和訂閱功能。
實(shí)戰(zhàn)與改良
代碼解釋
我把消息處理的系統(tǒng)中心化處理,也就是說是這個(gè)監(jiān)聽系統(tǒng)他可以監(jiān)聽reserved通道的所有業(yè)務(wù)類型,我這里列舉了四種wait,agree,refuse,over四種 但如果是更大的業(yè)務(wù)體系 同一個(gè)通道可能面臨著更多可能性分支? 那如果按照第一套的方案 需要對(duì)每一個(gè)具體業(yè)務(wù)實(shí)現(xiàn)一個(gè)監(jiān)聽者 工作量就很大(可能這樣耦合會(huì)低一些吧)
但是我這樣把消息集中處理 然后短信發(fā)送系統(tǒng)就專門只做短信發(fā)送的事情 xx系統(tǒng)就只做對(duì)應(yīng)的工作 就能把工作上的耦合度大大降低
那么大家應(yīng)該也注意到我的兩個(gè)負(fù)責(zé)序列化和反序列化的方法了吧 這是因?yàn)闃I(yè)務(wù)需求 要把對(duì)象封裝成一個(gè)類 所以這里的方案就是在信息中心處理器上 自定義一個(gè)序列化方案(如果再做得好一點(diǎn)其實(shí)可以把這個(gè)序列器抽理出來封裝為一個(gè)抽象方法 用泛型來定義返回結(jié)果和參數(shù) 這樣就能序列化所有引用的類型了)??
遇到的問題:
對(duì)了 中途遇到了這樣一個(gè)錯(cuò)誤
原因與分析:
實(shí)際業(yè)務(wù)中的測(cè)試
發(fā)布服務(wù)
訂閱服務(wù)(監(jiān)聽服務(wù))
測(cè)試結(jié)果
Redis作為消息隊(duì)列的優(yōu)缺點(diǎn):
使用Redis作為消息隊(duì)列的選擇相對(duì)于使用專門的消息隊(duì)列系統(tǒng)(如RabbitMQ、Kafka等)有以下優(yōu)點(diǎn)和:
-
簡單輕量:Redis是一個(gè)內(nèi)存中的數(shù)據(jù)存儲(chǔ)系統(tǒng),具有輕量級(jí)和簡單的特點(diǎn)。相比較專門的消息隊(duì)列系統(tǒng),使用Redis作為消息隊(duì)列不需要引入額外的組件和依賴,可以減少系統(tǒng)的復(fù)雜性。
-
速度快:由于Redis存儲(chǔ)在內(nèi)存中,它具有非常高的讀寫性能。這對(duì)于需要低延遲的應(yīng)用程序非常有優(yōu)勢(shì)。
-
多種數(shù)據(jù)結(jié)構(gòu)支持:Redis提供了豐富的數(shù)據(jù)結(jié)構(gòu),如列表、發(fā)布/訂閱、有序集合等。這使得Redis在處理不同類型的消息和任務(wù)時(shí)更加靈活。
-
數(shù)據(jù)持久化:Redis可以通過將數(shù)據(jù)持久化到磁盤來提供數(shù)據(jù)的持久性。這意味著即使Redis重啟,之前的消息也不會(huì)丟失。
-
廣泛的應(yīng)用場(chǎng)景:Redis不僅可以用作消息隊(duì)列,還可以用作緩存、數(shù)據(jù)庫、分布式鎖等多種用途。如果你的應(yīng)用程序已經(jīng)使用了Redis,那么使用Redis作為消息隊(duì)列可以減少技術(shù)棧的復(fù)雜性。
缺點(diǎn)也很明顯:
-
缺少一些高級(jí)特性:相對(duì)于專門的消息隊(duì)列系統(tǒng),Redis在消息隊(duì)列方面的功能可能相對(duì)簡單。例如,它可能缺乏一些高級(jí)消息傳遞功能,如消息重試、消息路由、持久化消息等。
-
可靠性和一致性:Redis的主要設(shè)計(jì)目標(biāo)是提供高性能和低延遲,而不是強(qiáng)一致性和高可靠性。在某些情況下,Redis可能會(huì)丟失消息,或者在出現(xiàn)故障時(shí)可能無法提供持久性保證。
應(yīng)用場(chǎng)景:
適用于簡單的中小型項(xiàng)目 如果功能簡單,訪問量并不大可以考慮
如果你的應(yīng)用程序?qū)?span style="background-color:#a2e043;">可靠性和高級(jí)功能有嚴(yán)格要求,并且需要處理大量的消息和復(fù)雜的消息路由,那么使用專門的消息隊(duì)列系統(tǒng)可能更合適。
Redis實(shí)現(xiàn)消息隊(duì)列系統(tǒng) 實(shí)現(xiàn)步驟:
配置Redis:
-
首先,確保你已經(jīng)正確地配置了Redis和Lettuce依賴項(xiàng),并創(chuàng)建了LettuceConnectionFactory對(duì)象。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
redis: host: port: 6379 password: lettuce: pool: max-active: 1000 max-idle: 1000 min-idle: 0 time-between-eviction-runs: 10s max-wait: 10000
-
創(chuàng)建一個(gè)RedisTemplate對(duì)象,并將LettuceConnectionFactory設(shè)置為其連接工廠:
@Bean
public RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory connectionFactory) {
RedisTemplate<String, String> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
template.setDefaultSerializer(new StringRedisSerializer());
return template;
}
設(shè)置RedisTemplate的序列化器。在消息隊(duì)列中,你可以使用默認(rèn)的序列化器,即StringRedisSerializer,它會(huì)將消息以字符串的形式進(jìn)行存儲(chǔ)和傳輸??梢酝ㄟ^以下代碼設(shè)置默認(rèn)的序列化器:
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
實(shí)現(xiàn)消息的發(fā)布和訂閱功能。
- 發(fā)布消息:
redisTemplate.convertAndSend("channel_name", "message_payload");
在上述代碼中,"channel_name"是消息的通道名稱,"message_payload"是要發(fā)布的消息內(nèi)容。
- 訂閱消息:
-
首先,創(chuàng)建一個(gè)MessageListener實(shí)現(xiàn)類來處理接收到的消息:
public class MessageListenerImpl implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
// 處理接收到的消息
String channel = new String(message.getChannel());
String payload = new String(message.getBody());
// 執(zhí)行自定義的邏輯
}
}
創(chuàng)建一個(gè)LettuceMessageListenerAdapter對(duì)象,并提供MessageListener實(shí)現(xiàn)類:
LettuceMessageListenerAdapter listenerAdapter = new LettuceMessageListenerAdapter(new MessageListenerImpl());
listenerAdapter.afterPropertiesSet();
創(chuàng)建一個(gè)RedisMessageListenerContainer對(duì)象,并配置它的LettuceConnectionFactory和監(jiān)聽適配器:
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(lettuceConnectionFactory);
listenerContainer.addMessageListener(listenerAdapter, new ChannelTopic("通道名稱"));
listenerContainer.start();
通過以上步驟,我們創(chuàng)建了一個(gè)LettuceConnectionFactory對(duì)象來與Redis服務(wù)器建立連接。然后,我們創(chuàng)建了一個(gè)MessageListener實(shí)現(xiàn)類來處理接收到的消息。接下來,我們創(chuàng)建了一個(gè)LettuceMessageListenerAdapter對(duì)象,并提供MessageListener實(shí)現(xiàn)類。最后,我們創(chuàng)建了一個(gè)RedisMessageListenerContainer對(duì)象,并配置它的LettuceConnectionFactory和監(jiān)聽適配器,然后啟動(dòng)容器以開始監(jiān)聽指定通道上的消息。
以上的方案 好處就是 可以很明顯的知道監(jiān)聽者在哪個(gè)部分 監(jiān)聽對(duì)應(yīng)通道的信息 然而 業(yè)務(wù)當(dāng)中 如果每一個(gè)對(duì)應(yīng)模塊的業(yè)務(wù)和通道都建立一個(gè)監(jiān)聽者來進(jìn)行監(jiān)聽(我們假設(shè)每一個(gè)就業(yè)務(wù)所要得到消息以后所執(zhí)行的邏輯都不相同) 那這個(gè)工作量就會(huì)暴增?
于是就有了第二種寫法 :
實(shí)戰(zhàn)與改良
/***
* @title MessageManager
* @author SUZE
* @Date 2-17
**/
@Component
public class ReservedMessageManager {
private String ListenerId;
private String UserId;
private String message;
private final RedisTemplate<String, String> redisTemplate;
@Autowired
public ReservedMessageManager(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
subscribeToChannel("reserved");
}
@Resource
private SmsServer smsServer;
public void publishMessage(String channel, reserveMessage message) {
String Message=serialize(message);
redisTemplate.convertAndSend("channel_name", "message_payload");
redisTemplate.convertAndSend(channel, Message);
}
// 接收到消息時(shí)觸發(fā)的事件
private void handleReserveMessage(String channel, reserveMessage reserveMessage) {
if (reserveMessage != null) {
String userId = reserveMessage.getUserId();
String ListenerId=reserveMessage.getListenerId();
String message = reserveMessage.getMessage();
//TODO 處理接收到的消息邏輯 這里后續(xù)要對(duì)Message進(jìn)行一個(gè)檢測(cè)他有wait agree和refused和over四種狀態(tài) 思種狀態(tài)就是不一樣的發(fā)送內(nèi)容
switch (message){
//TODO 消息要給兩邊都發(fā) 所以要發(fā)兩份 發(fā)信息的文案
case "wait":
smsServer.sendSms(userId,ListenerId,message);
break;
case "agree":
smsServer.sendSms(userId,ListenerId,message);
break;
case "refuse":
smsServer.sendSms(userId,ListenerId,message);
break;
case "over":
//這里要操作文檔系統(tǒng)了
//拒絕的話 那就要監(jiān)聽一下
smsServer.sendSms(userId,ListenerId,message);
break;
}
//smsServer.sendSms(userId,ListenerId,message);
// 其他處理邏輯...
}
}
public void subscribeToChannel(String channel) {
redisTemplate.execute((RedisCallback<Object>) (connection) -> {
connection.subscribe((message, pattern) -> {
String channelName = new String(message.getChannel());
byte[] body = message.getBody();
// 解析接收到的消息
switch (channelName){
case "reserved":
reserveMessage reserveMessage = deserializeMessage(new String(body));
handleReserveMessage(channelName, reserveMessage);
break;
//還有其他的通道 例如refuse就是一個(gè) 拒絕通道 專門監(jiān)聽拒絕的理由
}
}, channel.getBytes());
return null;
});
}
// 反序列化
private reserveMessage deserializeMessage(String body) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(body, reserveMessage.class);
} catch (IOException e) {
// 處理反序列化異常
e.printStackTrace();
return null;
}
}
// 序列化
public String serialize(reserveMessage reserveMessage) throws SerializationException {
if (reserveMessage == null) {
return null;
}
try {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(reserveMessage);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing object", e);
}
}
}
代碼解釋
subscribeToChannel
方法接受一個(gè)channel
參數(shù),用于指定要訂閱的通道名稱。redisTemplate.execute
方法用于執(zhí)行Redis操作,并傳入一個(gè)RedisCallback
回調(diào)函數(shù)。- 回調(diào)函數(shù)使用lambda表達(dá)式的形式實(shí)現(xiàn),接受一個(gè)
connection
參數(shù),表示與Redis的連接。- 在回調(diào)函數(shù)中,調(diào)用
connection.subscribe
方法來訂閱通道。該方法接受一個(gè)回調(diào)函數(shù)作為參數(shù),用于處理接收到的消息。- 在消息回調(diào)函數(shù)中,首先從
message
對(duì)象中獲取通道名稱和消息體。- 使用
new String(message.getChannel())
將通道名稱轉(zhuǎn)換為字符串類型,并存儲(chǔ)在channelName
變量中。- 使用
message.getBody()
獲取消息體的字節(jié)數(shù)組表示,并存儲(chǔ)在body
變量中。- 在
switch
語句中,根據(jù)通道名稱進(jìn)行不同的處理。在這個(gè)例子中,僅處理"reserved"通道。- 對(duì)于"reserved"通道的處理,調(diào)用
deserializeMessage
方法將消息體反序列化為reserveMessage
對(duì)象,并將其存儲(chǔ)在名為reserveMessage
的局部變量中。- 調(diào)用
handleReserveMessage
方法,將通道名稱和反序列化后的reserveMessage
對(duì)象作為參數(shù)進(jìn)行處理。handleReserveMessage
方法用于處理接收到的保留消息的邏輯。它檢查消息類型,并根據(jù)類型執(zhí)行不同的操作。根據(jù)消息類型,它調(diào)用smsServer.sendSms
方法向指定的userId
和listenerId
發(fā)送短信。
我把消息處理的系統(tǒng)中心化處理,也就是說是這個(gè)監(jiān)聽系統(tǒng)他可以監(jiān)聽reserved通道的所有業(yè)務(wù)類型,我這里列舉了四種wait,agree,refuse,over四種 但如果是更大的業(yè)務(wù)體系 同一個(gè)通道可能面臨著更多可能性分支? 那如果按照第一套的方案 需要對(duì)每一個(gè)具體業(yè)務(wù)實(shí)現(xiàn)一個(gè)監(jiān)聽者 工作量就很大(可能這樣耦合會(huì)低一些吧)
但是我這樣把消息集中處理 然后短信發(fā)送系統(tǒng)就專門只做短信發(fā)送的事情 xx系統(tǒng)就只做對(duì)應(yīng)的工作 就能把工作上的耦合度大大降低
那么大家應(yīng)該也注意到我的兩個(gè)負(fù)責(zé)序列化和反序列化的方法了吧 這是因?yàn)闃I(yè)務(wù)需求 要把對(duì)象封裝成一個(gè)類 所以這里的方案就是在信息中心處理器上 自定義一個(gè)序列化方案(如果再做得好一點(diǎn)其實(shí)可以把這個(gè)序列器抽理出來封裝為一個(gè)抽象方法 用泛型來定義返回結(jié)果和參數(shù) 這樣就能序列化所有引用的類型了)??
遇到的問題:
對(duì)了 中途遇到了這樣一個(gè)錯(cuò)誤
錯(cuò)誤信息:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `TopOne.MessageSystem.entity.reserveMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
原因與分析:
reserveMessage類缺少默認(rèn)構(gòu)造函數(shù),這導(dǎo)致Jackson庫無法構(gòu)造該類的實(shí)例。錯(cuò)誤消息中提到了以下內(nèi)容:"Cannot construct instance of?TopOne.MessageSystem.entity.reserveMessage?(no Creators, like default constructor, exist)"。
為了使Jackson能夠正確地反序列化對(duì)象,需要在reserveMessage類中添加一個(gè)默認(rèn)構(gòu)造函數(shù)。默認(rèn)構(gòu)造函數(shù)是一個(gè)無參數(shù)的構(gòu)造函數(shù),它不需要任何參數(shù)來創(chuàng)建對(duì)象。
在你的reserveMessage類中
這個(gè)是改好的封裝類:
?
@Data
public class reserveMessage {
private String UserId;
private String ListenerId;
private String message;
public reserveMessage() {
// 默認(rèn)構(gòu)造函數(shù)
}
public reserveMessage(String userId, String ListenerId,String message) {
this.UserId = userId;
this.ListenerId = ListenerId;
this.message=message;
}
}
實(shí)際業(yè)務(wù)中的測(cè)試
發(fā)布服務(wù)
訂閱服務(wù)(監(jiān)聽服務(wù))
測(cè)試結(jié)果
成功
這里面的打印是代替了原本業(yè)務(wù)中的短信發(fā)送 也算是成了文章來源:http://www.zghlxwxcb.cn/news/detail-829495.html
這一期就到這 感謝觀看文章來源地址http://www.zghlxwxcb.cn/news/detail-829495.html
到了這里,關(guān)于【Redis實(shí)戰(zhàn)】有MQ為啥不用?用Redis作消息隊(duì)列!?Redis作消息隊(duì)列使用方法及底層原理高級(jí)進(jìn)階的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!