Redis Stream 簡介
Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)。
Stream從字面上看是流類型,但其實(shí)從功能上看,應(yīng)該是Redis對消息隊(duì)列(MQ,Message Queue)的完善實(shí)現(xiàn)。下文稱Stream為隊(duì)列
Stream 出現(xiàn)原因
:
Stream的出現(xiàn)是為了給Redis提供完善的消息隊(duì)列功能
基于Reids的消息隊(duì)列實(shí)現(xiàn)有很多種,例如:
- PUB/SUB,訂閱/發(fā)布模式
- 基于List的 LPUSH+BRPOP 的實(shí)現(xiàn)
- 基于有序集合的實(shí)現(xiàn)
類型 | 優(yōu)點(diǎn) | 缺點(diǎn) |
---|---|---|
List | 支持阻塞式的獲取消息 | 沒有消息多播功能,沒有ACK機(jī)制,無法重復(fù)消費(fèi)等等 |
Pub/Sub | 支持消息多播 | 消息無法持久化,只管發(fā)送,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis宕機(jī)等,消息就直接沒了,自然也沒有ACK機(jī)制。 |
Sorted Set | 支持延時消息 | 不支持阻塞式獲取消息、不允許重復(fù)消費(fèi)、不支持分組。 |
發(fā)布訂閱模式
Redis 發(fā)布訂閱 (pub/sub) 是一種消息通信模式:發(fā)送者 (pub) 發(fā)送消息,訂閱者 (sub) 接收消息。
當(dāng)發(fā)布者向channel中發(fā)布消息時,所有訂閱了channel的客戶端都會收到消息。
訂閱者首先訂閱channel
psubscribe news
發(fā)布者發(fā)布消息
publish news "hello world"
所有的訂閱者都收到了消息。
致命缺點(diǎn)
:
Redis的Pub/Sub為什么被拋棄?
最主要的原因是它無法持久化,沒有實(shí)現(xiàn)持久化機(jī)制的Pub/Sub,無法做到消息的不丟失,在客戶端宕機(jī)或者Redis服務(wù)宕機(jī)的情況下,都會導(dǎo)致消息丟失。
Stream
Stream彌補(bǔ)了Redis作為消息隊(duì)列技術(shù)選型上的不足之處。
Redis 5.0發(fā)布的Stream相比Pub/Sub模塊,Stream支持消息持久化,結(jié)合集群使其成為了一個比較可靠的消息隊(duì)列。
隊(duì)列結(jié)構(gòu)圖:
Stream 實(shí)現(xiàn)的功能包括如下:
-
提供了消息多播的功能,同一個消息可被分發(fā)給多個單消費(fèi)者和消費(fèi)者組
-
提供了消息持久化的功能,可以讓任何消費(fèi)者訪問任何時刻的歷史消息
-
提供了對于消費(fèi)者和消費(fèi)者組的阻塞、非阻塞的獲取消息的功能
-
提供了強(qiáng)大的消費(fèi)者組的功能:
- 消費(fèi)者組實(shí)現(xiàn)同組多個消費(fèi)者并行但不重復(fù)消費(fèi)消息的能力,提升消費(fèi)能力;
- 消費(fèi)者組能夠記住最新消費(fèi)的信息,保證消息連續(xù)消費(fèi);
- 消費(fèi)者組提供了ACK確認(rèn)機(jī)制,保證消息被成功消費(fèi),不丟失;
Stream本質(zhì)上是Redis中的key,相關(guān)指令根據(jù)可以分為兩類,分別是消息隊(duì)列相關(guān)指令,消費(fèi)組相關(guān)指令。
消息隊(duì)列相關(guān)指令:
指令名稱 | 指令作用 |
---|---|
XADD | 添加消息到隊(duì)列末尾 |
XTRIM | 限制Stream的長度,如果已經(jīng)超長會進(jìn)行截取 |
XDEL | 刪除消息 |
XLEN | 獲取Stream中的消息長度 |
XRANGE | 獲取消息列表(可以指定范圍),忽略刪除的消息 |
XREVRANGE | 和XRANGE相比區(qū)別在于反向獲取,ID從大到小 |
XREAD | 獲取消息(阻塞/非阻塞),返回大于指定ID的消息 |
消費(fèi)者相關(guān)指令:
指令名稱 | 指令作用 |
---|---|
XGROUP CREATE | 創(chuàng)建消費(fèi)者組 |
XREADGROUP | 讀取消費(fèi)者組中的消息 |
XACK | ack消息,消息被標(biāo)記為“已處理” |
XGROUP SETID | 設(shè)置消費(fèi)者組最后遞送消息的ID |
XGROUP DELCONSUMER | 刪除消費(fèi)者組 |
XPENDING | 打印待處理消息的詳細(xì)信息 |
XCLAIM | 轉(zhuǎn)移消息的?歸屬權(quán)(長期未被處理/無法處理的消息,轉(zhuǎn)交給其他消費(fèi)者組進(jìn)行處理) |
XINFO | 打印Stream\Consumer\Group的詳細(xì)信息 |
XINFO GROUPS | 打印消費(fèi)者組的詳細(xì)信息 |
XINFO STREAM | 打印Stream的詳細(xì)信息 |
消息隊(duì)列操作
XADD
使用XADD命令添加消息到隊(duì)列末尾,如果指定的 隊(duì)列不存在,則該命令執(zhí)行時會新建一個隊(duì)列。
添加的消息是一個和多個鍵值對。XADD也是唯一可以向隊(duì)列中添加數(shù)據(jù)的 Redis 命令。
語法格式:
XADD key ID field value [field value ...]
- key:隊(duì)列名稱,如果不存在就創(chuàng)建
- ID:消息id,使用*表示由redis生成。可以自定義,但是要自己保證遞增性
- field value:記錄,當(dāng)前消息內(nèi)容,由一個或多個key-value構(gòu)成
命令使用:
創(chuàng)建兩條消息,分別是(name=tom, age=22),(height=180, use=iphone)
127.0.0.1:6379> xadd mystream * name tom age 22
"1674984765438-0"
127.0.0.1:6379> xadd mystream * height 180 use iphone
"1674985213802-0"
創(chuàng)建消息時會生成一個序號,支持自定義序號和自動生成序號。*
表示自動生成序號
XLEN
使用XLEN獲取隊(duì)列包含的元素數(shù)量,即消息長度
語法格式:
XLEN key
命令使用:
127.0.0.1:6379> xlen mystream
(integer) 2
XDEL
使用XDEL刪除消息。語法格式:
XDEL key ID [ID ...]
XDEL刪除消息的指令,并不會從內(nèi)存上刪除消息,它只是給消息打上標(biāo)記位,下次通過XRANGE指令忽略這些消息
XRANGE
使用XRANGE獲取消息列表,會自動過濾已經(jīng)刪除的消息,語法格式:
XRANGE key start end [COUNT count]
- key:隊(duì)列名
- start:開始值,-表示最小值
- end:結(jié)束值,+表示最大值
- count:數(shù)量
命令使用:
不指定count默認(rèn)查詢所有
127.0.0.1:6379> xrange mystream - +
1) 1) "1674984765438-0"
2) 1) "name"
2) "tom"
3) "age"
4) "22"
2) 1) "1674985213802-0"
2) 1) "height"
2) "180"
3) "use"
4) "iphone"
127.0.0.1:6379>
XREAD
XREAD命令提供讀取隊(duì)列消息的能力,返回大于指定ID的消息。
XREAD常用于用于迭代隊(duì)列的消息,所以傳遞給 XREAD 的通常是上一次從該隊(duì)列接收到的最后一個消息的ID。
語法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count:用于限定獲取的消息數(shù)量
- BLOCK milliseconds:用于設(shè)置XREAD為阻塞模式以及阻塞的時長,單位毫秒,默認(rèn)為非阻塞模式
- ID:設(shè)置開始讀取的消息ID,使用0表示從第一條消息開始。
消息隊(duì)列ID是單調(diào)遞增的,所以通過設(shè)置起點(diǎn),可以向后讀取。
在阻塞模式中,可以使用$
,表示最新的消息ID, block 0表示永久阻塞。(非阻塞模式下$無意義)。
命令使用:
非阻塞讀取
從第一條消息開始
127.0.0.1:6379> xread streams mystream 0
1) 1) "mystream"
2) 1) 1) "1674984765438-0"
2) 1) "name"
2) "tom"
3) "age"
4) "22"
2) 1) "1674985213802-0"
2) 1) "height"
2) "180"
3) "use"
4) "iphone"
127.0.0.1:6379>
阻塞讀取
127.0.0.1:6379> xread block 10000 streams mystream $
(nil)
(10.04s)
127.0.0.1:6379>
阻塞模式讀,阻塞時長為10s。如果10s內(nèi)未讀取到消息則退出阻塞。另開一個終端向隊(duì)列中寫入一條消息,阻塞讀的終端就能接收到消息。
消費(fèi)者操作
XGROUP CREATE
創(chuàng)建消費(fèi)組。消費(fèi)組用于管理消費(fèi)者和隊(duì)列讀取記錄。Stream中的消費(fèi)組有兩個特點(diǎn):
- 從資源結(jié)構(gòu)上說消費(fèi)者從屬于一個消費(fèi)組
- 一個隊(duì)列可以擁有多個消費(fèi)組。不同消費(fèi)組之間讀取隊(duì)列互不干擾
語法格式:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
- key:隊(duì)列名稱,如果不存在就創(chuàng)建
- groupname:組名
- id: $表示從尾部開始消費(fèi),只接受新消息,當(dāng)前Stream消息會全部忽略
命令使用:
為隊(duì)列mystream創(chuàng)建一個消費(fèi)組 mqGroup,從第一個消息開始讀
127.0.0.1:6379> XGROUP CREATE mystream mqGroup 0
OK
XREADGROUP
讀取隊(duì)列的消息。在讀取消息時需要指定消費(fèi)者,只需要指定名字,不用預(yù)先創(chuàng)建。
語法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
[NOACK] STREAMS key [key ...] id [id ...]
- group:消費(fèi)組名
- consumer:消費(fèi)者名
- count:讀取數(shù)量
- BLOCK milliseconds:阻塞讀以及阻塞毫秒數(shù)。默認(rèn)非阻塞。和XREAD類似
- key:隊(duì)列名
- id:消息ID。ID可以填寫特殊符號
>
,表示未被組內(nèi)消費(fèi)的起始消息
命令使用:
創(chuàng)建消費(fèi)者consumerA和consumerB,各讀取一條消息
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1674984765438-0"
2) 1) "name"
2) "tom"
3) "age"
4) "22"
127.0.0.1:6379> XREADGROUP group mqGroup consumerB count 1 streams mystream >
1) 1) "mystream"
2) 1) 1) "1674985213802-0"
2) 1) "height"
2) "180"
3) "use"
4) "iphone"
可以進(jìn)行組內(nèi)消費(fèi)的基本原理是,STREAM類型會為每個組記錄一個最后讀取的消息ID(last_delivered_id),這樣在組內(nèi)消費(fèi)時,就可以從這個值后面開始讀取,保證不重復(fù)消費(fèi)。
消費(fèi)組消費(fèi)時,還有一個必須要考慮的問題,就是若某個消費(fèi)者,消費(fèi)了某條消息,但是并沒有處理成功時(例如消費(fèi)者進(jìn)程宕機(jī)),這條消息可能會丟失,因?yàn)榻M內(nèi)其他消費(fèi)者不能再次消費(fèi)到該消息了
XPENDING
為了解決組內(nèi)消息讀取但處理期間消費(fèi)者崩潰帶來的消息丟失問題,Stream 設(shè)計了 Pending 列表,用于記錄讀取但并未確認(rèn)完畢的消息。
語法格式:
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
- key:隊(duì)列名
- group: 消費(fèi)組名
- start:開始值,-表示最小值
- end:結(jié)束值,+表示最大值
- count:數(shù)量
命令使用:
首先查看隊(duì)列中的消息數(shù)量有3個,然后查看已讀取未處理的消息有兩個。
127.0.0.1:6379> xlen mystream
(integer) 3
127.0.0.1:6379> xpending mystream mqGroup
1) (integer) 2 # 2個已讀取但未處理的消息
2) "1674984765438-0" # 起始ID
3) "1674985213802-0" # 結(jié)束ID
4) 1) 1) "consumerA" # 消費(fèi)者A有1個
2) "1"
2) 1) "consumerB" # 消費(fèi)者B有1個
2) "1"
隊(duì)列中一共三條信息,有兩條被消費(fèi)但未處理完畢,也就是上面XREADGROUP消費(fèi)的兩條。一個是消費(fèi)者consumerA,另一個是consumerB。
獲取未確認(rèn)的詳細(xì)信息
127.0.0.1:6379> xpending mystream mqGroup - + 10
1) 1) "1674984765438-0"
2) "consumerA"
3) (integer) 12110001
4) (integer) 1
2) 1) "1674985213802-0"
2) "consumerB"
3) (integer) 89140701
4) (integer) 1
XACK
對于已讀取未處理的消息,使用命令 XACK 完成告知消息處理完成
XACK 命令確認(rèn)消費(fèi)的信息,一旦信息被確認(rèn)處理,就表示信息被完善處理。
語法格式:
XACK key group id [id ...]
- key: stream 名
- group:消費(fèi)組
- id:消息ID
命令使用:
確認(rèn)消息1674985213802-0
127.0.0.1:6379> XACK mystream mqGroup 1674985213802-0
(integer) 1
127.0.0.1:6379>
XCLAIM
某個消費(fèi)者讀取了消息但沒有處理,這時消費(fèi)者宕機(jī)或重啟等就會導(dǎo)致該消息失蹤。那么就需要該消息轉(zhuǎn)移給其他的消費(fèi)者處理,就是消息轉(zhuǎn)移。XCLAIM來實(shí)現(xiàn)消息轉(zhuǎn)移的操作。
語法格式:
XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms]
[TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID]
[LASTID id]
- key: 隊(duì)列名稱
- group :消費(fèi)組
- consumer:消費(fèi)組里的消費(fèi)者
- min-idle-time 最小時間。空閑時間大于min-idle-time的消息才會被轉(zhuǎn)移成功
- id:消息的ID
轉(zhuǎn)移除了要指定ID外,還需要指定min-idle-time,min-idle-time是最小空閑時間,該值要小于消息的空閑時間,這個參數(shù)是為了保證是多于多長時間的消息未處理的才被轉(zhuǎn)移。比如超過24小時的處于pending未xack的消息要進(jìn)行轉(zhuǎn)移
同時min-idle-time還有一個功能是能夠避免兩個消費(fèi)者同時轉(zhuǎn)移一條消息。被轉(zhuǎn)移的消息的IDLE會被重置為0。假設(shè)兩個消費(fèi)者都以2min來轉(zhuǎn)移,第一個成功之后IDLE被重置為0,第二個消費(fèi)者就會因?yàn)閙in-idle-time大與空閑時間而是失敗。
命令使用:
目前未確認(rèn)的消息
127.0.0.1:6379> xpending mystream mqGroup - + 10
1) 1) "1674984765438-0"
2) "consumerA"
3) (integer) 12145595
4) (integer) 1
id: 1674984765438-0
空閑時間:12145595,單位ms
讀取次數(shù):1
將cosumerA未處理的消息轉(zhuǎn)移給consumerB。
127.0.0.1:6379> XCLAIM mystream mqGroup consumerB 3600000 1674984765438-0
1) 1) "1674984765438-0"
2) 1) "name"
2) "tom"
3) "age"
4) "22"
查看未確認(rèn)的消息
消息已經(jīng)從consumerA轉(zhuǎn)移給consumerB,IDLE重置,讀取次數(shù)加1。轉(zhuǎn)移之后就可以繼續(xù)處理這條消息。
127.0.0.1:6379> xpending mystream mqGroup - + 10
1) 1) "1674984765438-0"
2) "consumerB"
3) (integer) 5729 # 注意IDLE,被重置了
4) (integer) 2 # 注意,讀取次數(shù)也累加了1次
通常轉(zhuǎn)移操作的完整流程是:
- 先用xpending命令找出所有未確認(rèn)的消息
- 再用xclaim命令轉(zhuǎn)移所有未確認(rèn)消息
在redis6.2.0之后有一個命令XAUTOCLAIM
,可以將xpending查找未確認(rèn)消息和xclaim轉(zhuǎn)移消息合并成一個操作。
XINFO
Stream提供了XINFO來實(shí)現(xiàn)對服務(wù)器信息的監(jiān)控
查看隊(duì)列信息
127.0.0.1:6379> xinfo stream mystream
1) "length"
2) (integer) 3
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1
9) "last-generated-id"
10) "1674985995856-0"
11) "first-entry"
12) 1) "1674984765438-0"
2) 1) "name"
2) "tom"
3) "age"
4) "22"
13) "last-entry"
14) 1) "1674985995856-0"
2) 1) "name"
2) "jack"
消費(fèi)組信息
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
2) "mqGroup"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 1
7) "last-delivered-id"
8) "1674985213802-0"
消費(fèi)者組成員信息
127.0.0.1:6379> xinfo consumers mystream mqGroup
1) 1) "name"
2) "consumerA"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 12904777
2) 1) "name"
2) "consumerB"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 696457
127.0.0.1:6379>
項(xiàng)目中中Stream的使用
項(xiàng)目中部分web請求的處理是異步處理,web服務(wù)調(diào)用底層模塊異步執(zhí)行。當(dāng)?shù)讓幽K處理完成后需要保存結(jié)果并通知web服務(wù),所以使用Stream作為保存的載體。
Stream 的生產(chǎn)和消費(fèi)
生產(chǎn)
向隊(duì)列中寫消息
def batch_xadd(self, name: str, payloads: List[Dict]) -> None:
pipe = self._redis.pipeline()
for payload in payloads:
pipe.xadd(name, payload)
pipe.execute()
消費(fèi)
定時任務(wù)間隔10s從隊(duì)列中讀消息
while True:
_, payloads = await self._conn.xautoclaim(
self.stream_name, self.group_name, self.consumer_name, min_idle_time
)
id_ = last_id if check_backlog else ">"
for _, messages in await self._conn.xreadgroup(
groupname=self.group_name,
consumername=self.consumer_name,
streams={self.stream_name: id_},
block=block_timeout,
):
...
last_id = messages[-1][0]
payloads += messages
# 處理隊(duì)列中取出的消息,耗時操作
successful_ids = await f_processor(payloads)
if successful_ids:
await self._conn.xack(self.stream_name, self.group_name, *successful_ids)
Stream和專業(yè)消息隊(duì)列對比
專業(yè)的消息隊(duì)列包括:
- RabbitMQ
- RocketMQ
- Kafka
一個專業(yè)的消息隊(duì)列,必須要滿足兩個條件:
- 消息不丟
- 消息可堆積
下面從這兩個方面來對比Stream和專業(yè)消息隊(duì)列。
消息不丟
消息隊(duì)列的使用模型如下:
要保證消息不丟,就需要在生產(chǎn)者、中間件、消費(fèi)者這三個方面來分析。
生產(chǎn)者
:消息發(fā)送失敗或發(fā)送超時,這兩種情況會導(dǎo)致數(shù)據(jù)丟失,可以使用重試來解決。不依賴消息中間件,需要業(yè)務(wù)實(shí)現(xiàn)。
消費(fèi)者
:消費(fèi)者存在讀取消息未處理完就異常宕機(jī)了,消費(fèi)者要還能重新讀取消息。Stream和其他消息中間件都能做到。
隊(duì)列中間件
:中間件要保證數(shù)據(jù)不丟失。 Redis 在以下 2 個場景下,都會導(dǎo)致數(shù)據(jù)丟失:
- AOF 持久化配置為每秒寫盤,Redis 宕機(jī)時會存在丟失最后1秒數(shù)據(jù)的可能
- 主從復(fù)制的集群,主從切換時,從庫還未同步完成主庫發(fā)來的數(shù)據(jù),就被提成主庫,也存在丟失數(shù)據(jù)的可能。
基于以上原因可以推斷出,Redis 本身的無法保證嚴(yán)格的數(shù)據(jù)完整性。
專業(yè)隊(duì)列如何解決數(shù)據(jù)丟失問題:
RabbitMQ 或 Kafka 這類專業(yè)的隊(duì)列中間件,在使用時一般是部署一個集群。生產(chǎn)者在發(fā)布消息時,隊(duì)列中間件通常會寫「多個節(jié)點(diǎn)」,以此保證消息冗余。這樣一來,即便其中一個節(jié)點(diǎn)掛了,集群也能的數(shù)據(jù)不丟失。
消息積壓
因?yàn)?Redis 的數(shù)據(jù)都存儲在內(nèi)存中,這就意味著一旦發(fā)生消息積壓,則會導(dǎo)致 Redis 的內(nèi)存持續(xù)增長,如果超過機(jī)器內(nèi)存上限,就會面臨 OOM 的風(fēng)險。
所以,Redis 的 Stream 提供了可以指定隊(duì)列最大長度的功能,就是為了避免這種情況發(fā)生。
但 Kafka、RabbitMQ 這類消息隊(duì)列就不一樣了,它們的數(shù)據(jù)都會存儲在磁盤上,磁盤的成本要比內(nèi)存小得多,當(dāng)消息積壓時,無非就是多占用一些磁盤空間,磁盤相比于內(nèi)存在面對積壓時能輕松應(yīng)對。
總結(jié)
綜上可以看到,把 Redis 當(dāng)作隊(duì)列來使用時,始終面臨兩個問題:
- Redis 本身可能會丟數(shù)據(jù)
- 面對消息積壓,Redis 內(nèi)存資源緊張
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
:
- 使用成本低。幾乎每一個項(xiàng)目都會使用Redis,用Stream做消息隊(duì)列就不需要額外再引入中間件,減少系統(tǒng)復(fù)雜性,運(yùn)維成本,硬件資源。
缺點(diǎn)
:
- Redis 的數(shù)據(jù)都存儲在內(nèi)存中,內(nèi)存持續(xù)增長超過機(jī)器內(nèi)存上限,就會面臨 OOM 的風(fēng)險
- Stream 作為Redis的一種數(shù)據(jù)結(jié)構(gòu),Redis 在持久化或主從切換時有丟失數(shù)據(jù)的風(fēng)險,所以Stream也有丟失消息的風(fēng)險
- 所有的消息會一直保存在Stream中,沒有刪除機(jī)制。要么定時清除,那么設(shè)置隊(duì)列的長度自動丟棄先入列消息
使用場景
適用
適用業(yè)務(wù)場景:
- 場景足夠簡單
- 對于數(shù)據(jù)丟失不敏感
- 消息積壓概率比較小
滿足以上場景把 Redis 當(dāng)作隊(duì)列是完全可以的。
基于redis的高性能和使用內(nèi)存的機(jī)制使得其的性能優(yōu)于大部分消息隊(duì)列。在小規(guī)模場景會有更出色的表現(xiàn)。
不適用
不適用業(yè)務(wù)場景:
- 對于數(shù)據(jù)丟失非常敏感,如訂單系統(tǒng)
- 寫入量非常大,并發(fā)請求大
- 消息積壓時會占用很多的內(nèi)存資源,消息數(shù)據(jù)量大
這些業(yè)務(wù)場景下建議使用專業(yè)的消息隊(duì)列中間件。
題外話
技術(shù)選型出了技術(shù)本身之外還要考慮公司團(tuán)隊(duì)能否匹配技術(shù)。
Kafka、RabbitMQ 是非常專業(yè)的消息中間件,但它們的部署和運(yùn)維,相比于 Redis 來說,也會更復(fù)雜一些。
如果在一個大公司,公司本身就有優(yōu)秀的運(yùn)維團(tuán)隊(duì),那么使用這些中間件肯定沒問題,因?yàn)橛凶銐騼?yōu)秀的人能 hold 住這些中間件,公司也會投入人力和時間在這個方向上。
但是在一個初創(chuàng)公司,業(yè)務(wù)正處在快速發(fā)展期,暫時沒有能 hold 住這些中間件的團(tuán)隊(duì)和人,如果貿(mào)然使用這些組件,當(dāng)發(fā)生故障時,排查問題也會變得很困難,甚至?xí)璧K業(yè)務(wù)的發(fā)展。
實(shí)際案例討論
同一個大型項(xiàng)目中子項(xiàng)目的互相調(diào)用。TMS調(diào)用ATS獲取數(shù)據(jù)集
改用Stream完成
理由:
- 丟失數(shù)據(jù)不敏感
- 業(yè)務(wù)場景簡單
- 消息積壓概率比較小
參考:
https://zhuanlan.zhihu.com/p/60501638
https://redis.io/commands/xclaim/
https://liziba.blog.csdn.net/article/details/120320018
https://juejin.cn/post/6962423461071290375#heading-2文章來源:http://www.zghlxwxcb.cn/news/detail-448890.html
準(zhǔn)備連載一系列關(guān)于python異步編程的文章。包括同異步框架性能對比、異步事情驅(qū)動原理等。首發(fā)微信公眾號,歡迎關(guān)注第一時間閱讀。文章來源地址http://www.zghlxwxcb.cn/news/detail-448890.html
到了這里,關(guān)于Redis 高級特性 Redis Stream使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!