1.業(yè)務(wù)場(chǎng)景
在實(shí)際業(yè)務(wù)場(chǎng)景中,我們經(jīng)常會(huì)碰到類似一下場(chǎng)景:
- 淘寶等購(gòu)物平臺(tái)在訂單支付時(shí),如果30分鐘內(nèi)未支付自動(dòng)取消。
- 騰訊會(huì)議預(yù)約會(huì)議后,在會(huì)議開(kāi)始前15分鐘提醒。
- 未使用的優(yōu)惠券有效期結(jié)束后,自動(dòng)將優(yōu)惠券狀態(tài)更新為已過(guò)期。
- 等等。。。
像這種支付超時(shí)取消的場(chǎng)景需求,其實(shí)有很多種實(shí)現(xiàn)方法,比如定時(shí)任務(wù)輪詢、Java中的延時(shí)隊(duì)列、時(shí)間輪算法、Redis過(guò)期監(jiān)聽(tīng)等,如下圖所示。
2.定時(shí)任務(wù)(Quartz)
Java中常見(jiàn)的定時(shí)任務(wù)框架包括 Quartz、Spring Task、Elastic-Job、XXL-Job等。下面將以 Quartz 為例實(shí)現(xiàn)業(yè)務(wù)場(chǎng)景(有關(guān)Elastic-Job 的使用可見(jiàn) Elastic Job 開(kāi)發(fā)使用篇)。
2.1.依賴導(dǎo)入
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
2.2.任務(wù)類
@Slf4j
public class PaymentJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
log.info("查詢數(shù)據(jù)庫(kù)獲取超時(shí)支付訂單,并取消該訂單");
}
}
2.3.任務(wù)調(diào)度類
public class RoundRobin {
private static Scheduler defaultScheduler;
public void timedTask() {
// 創(chuàng)建任務(wù)明細(xì)
JobDetail jobDetail = JobBuilder.newJob(PaymentJob.class)
.withIdentity("支付超時(shí)取消訂單任務(wù)", "payment_timeout_group")
.build();
// 創(chuàng)建觸發(fā)器
Trigger trigger = TriggerBuilder.newTrigger()
.withDescription("這是支付超時(shí)取消訂單任務(wù)觸發(fā)器")
.startNow()
// 設(shè)置任務(wù)執(zhí)行調(diào)度周期:cron表達(dá)式,每3秒執(zhí)行一次
.withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ?"))
.build();
// 創(chuàng)建scheduler調(diào)度器
try {
if (defaultScheduler == null) {
synchronized (this) {
if (defaultScheduler == null) {
defaultScheduler = StdSchedulerFactory.getDefaultScheduler();
}
}
}
// 執(zhí)行任務(wù)
defaultScheduler.scheduleJob(jobDetail, trigger);
defaultScheduler.start();
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
}
2.4.小結(jié)
定時(shí)任務(wù)輪詢的方式簡(jiǎn)單易行,但是這種方式也存在著顯著的局限性:
1.在支付訂單數(shù)量龐大的情況下,每次獲取超時(shí)訂單會(huì)走全表掃描,給數(shù)據(jù)庫(kù)帶來(lái)很大的IO負(fù)擔(dān)和CPU占用,特別是這種需要小時(shí)間間隔任務(wù)輪詢的全表掃描。其實(shí)這種也有不走全表掃描的方法,犧牲空間,就是對(duì)訂單創(chuàng)建時(shí)間建立索引,設(shè)過(guò)期時(shí)間為 當(dāng)前時(shí)間 - 30分鐘(假設(shè)是超時(shí)時(shí)間),走索引查詢過(guò)期時(shí)間之前的所有訂單,最后執(zhí)行取消訂單的操作。
2.精度問(wèn)題。如果將定時(shí)任務(wù)的時(shí)間間隔設(shè)置的比較長(zhǎng),會(huì)導(dǎo)致超時(shí)訂單取消延遲較長(zhǎng),影響業(yè)務(wù)流程。如果間隔時(shí)間過(guò)于短,在大量訂單的情況下,可能會(huì)出現(xiàn)大量重復(fù)訂單,需要考慮并發(fā)問(wèn)題和事務(wù)沖突。
3.延遲隊(duì)列(DelayQueue)
DelayQueue是一個(gè)無(wú)界的BlockingQueue,用于放置實(shí)現(xiàn)了Delayed接口的對(duì)象,其中的對(duì)象只能在其到期時(shí)才能從隊(duì)列中取走。當(dāng)生產(chǎn)者線程調(diào)用插入元素的方法加入元素時(shí),會(huì)觸發(fā)Delayed接口中的compareTo
方法進(jìn)行排序,也就是說(shuō)隊(duì)列中元素的順序是按到期時(shí)間排序的,而非它們進(jìn)入隊(duì)列的順序。排在隊(duì)列頭部的元素是最早到期的,越往后到期時(shí)間越晚。
3.1.任務(wù)類
@Slf4j
public class OrderDelay implements Delayed {
// 訂單id
private String orderId;
// 超時(shí)的最后時(shí)刻(單位毫秒)
private long timeout;
public OrderDelay(String orderId, long timeout) {
this.orderId = orderId;
this.timeout = timeout+System.currentTimeMillis();
}
// 返回距離超時(shí)還剩多少毫秒
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 和其他的訂單比較時(shí)間
@Override
public int compareTo(Delayed o) {
if (o == this) {
return 0;
} else {
OrderDelay t = (OrderDelay) o;
long l = this.timeout - t.timeout;
return l == 0 ? 0 : (l > 0 ? 1 : -1);
}
}
// 超時(shí)取消處理
public void timeoutCancel(){
log.info("訂單{}超時(shí),處理完畢",orderId);
}
}
3.2.測(cè)試案例
public class CancelTimeoutOrder {
public static void main(String[] args) {
// 先創(chuàng)建3個(gè)訂單
OrderDelay o1 = new OrderDelay("1", 2 * 1000);
OrderDelay o2 = new OrderDelay("2", 4 * 1000);
OrderDelay o3 = new OrderDelay("3", 6 * 1000);
// 創(chuàng)建延遲隊(duì)列
DelayQueue<Delayed> delayQueue = new DelayQueue<>();
delayQueue.put(o1);
delayQueue.put(o2);
delayQueue.put(o3);
// 開(kāi)始判斷訂單
while (true){
try {
OrderDelay take = (OrderDelay) delayQueue.take();
take.timeoutCancel();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
3.3.日志輸出
19:20:36.591 [main] INFO com.payment.demo.delay.OrderDelay - 訂單1超時(shí),處理完畢
19:20:38.588 [main] INFO com.payment.demo.delay.OrderDelay - 訂單2超時(shí),處理完畢
19:20:40.588 [main] INFO com.payment.demo.delay.OrderDelay - 訂單3超時(shí),處理完畢
3.4.小結(jié)
這種方式彌補(bǔ)了精度問(wèn)題,并且任務(wù)處理更加高效,也不需要考慮多線程并發(fā)性的問(wèn)題。但是所有訂單都需要保留在內(nèi)存,在大量訂單的情況下會(huì)有很大的內(nèi)存消耗,如果此時(shí)系統(tǒng)重啟或者崩潰,那么剩余未處理的訂單將會(huì)丟失。
4.時(shí)間輪算法
時(shí)間輪算法(Time Wheel Algorithm)是一種用于處理定時(shí)任務(wù)調(diào)度的算法,它使用循環(huán)數(shù)組和指針來(lái)實(shí)現(xiàn),在每個(gè)時(shí)刻都有一個(gè)指針指向當(dāng)前時(shí)間槽,每個(gè)時(shí)間槽中保存了需要執(zhí)行的任務(wù)列表。時(shí)間輪算法的核心是輪詢線程不再負(fù)責(zé)遍歷所有任務(wù),而是僅僅遍歷時(shí)間刻度。
時(shí)間輪算法主要原理如下:
-
時(shí)間輪的構(gòu)造:時(shí)間輪由多個(gè)槽(slot)組成,每個(gè)槽表示一個(gè)時(shí)間間隔。整個(gè)時(shí)間輪可以看作是一個(gè)環(huán)狀結(jié)構(gòu),每個(gè)槽都有一個(gè)索引來(lái)標(biāo)識(shí)。
-
時(shí)間輪的轉(zhuǎn)動(dòng):時(shí)間輪按照固定的速度不斷地轉(zhuǎn)動(dòng),每次轉(zhuǎn)動(dòng)一個(gè)槽的間隔(例如,每秒轉(zhuǎn)動(dòng)一次)。
-
任務(wù)插入:當(dāng)需要添加一個(gè)延遲任務(wù)時(shí),根據(jù)任務(wù)的延遲時(shí)間,計(jì)算應(yīng)該插入到哪個(gè)槽中。任務(wù)會(huì)被插入到離當(dāng)前時(shí)間一定間隔的槽中。
-
任務(wù)觸發(fā):時(shí)間輪的每次轉(zhuǎn)動(dòng)都會(huì)檢查當(dāng)前位置的槽是否有任務(wù),如果有,就執(zhí)行任務(wù)。
-
時(shí)間輪的級(jí)聯(lián):如果有多個(gè)時(shí)間輪,可以將多個(gè)時(shí)間輪級(jí)聯(lián),即把一個(gè)時(shí)間輪的一個(gè)槽作為下一個(gè)時(shí)間輪的一個(gè)槽。這樣可以擴(kuò)展時(shí)間輪的范圍和精度。
-
任務(wù)的刪除:當(dāng)延遲任務(wù)被取消或者執(zhí)行完成時(shí),需要從時(shí)間輪中刪除對(duì)應(yīng)的任務(wù)。
時(shí)間輪算法在實(shí)際應(yīng)用中有很多用途,比如網(wǎng)絡(luò)延遲調(diào)度、定時(shí)任務(wù)調(diào)度、消息隊(duì)列等。通過(guò)合理地調(diào)整時(shí)間輪的大小和刻度,可以實(shí)現(xiàn)高效的任務(wù)調(diào)度和處理。
4.1.依賴導(dǎo)入
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.94.Final</version>
</dependency>
4.2.任務(wù)類
@Slf4j
public class OrderTask implements TimerTask {
// 訂單id
private String orderId;
public OrderTask(String orderId) {
this.orderId = orderId;
}
// 任務(wù)執(zhí)行方法
@Override
public void run(Timeout timeout) throws Exception {
log.info("訂單{}超時(shí),處理完畢",orderId);
}
}
4.3.測(cè)試案例
public class TimeWheelUtil {
public static void main(String[] args) {
// 創(chuàng)建任務(wù)
OrderTask o1 = new OrderTask("1");
OrderTask o2 = new OrderTask("2");
OrderTask o3 = new OrderTask("3");
// 時(shí)間輪算法實(shí)現(xiàn)類
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
// 添加任務(wù)
hashedWheelTimer.newTimeout(o1,2, TimeUnit.SECONDS);
hashedWheelTimer.newTimeout(o2,4, TimeUnit.SECONDS);
hashedWheelTimer.newTimeout(o3,6, TimeUnit.SECONDS);
}
}
4.4.日志輸出
20:04:17.611 [pool-1-thread-1] INFO com.payment.demo.wheel.OrderTask - 訂單1超時(shí),處理完畢
20:04:19.610 [pool-1-thread-1] INFO com.payment.demo.wheel.OrderTask - 訂單2超時(shí),處理完畢
20:04:21.612 [pool-1-thread-1] INFO com.payment.demo.wheel.OrderTask - 訂單3超時(shí),處理完畢
4.5.小結(jié)
時(shí)間輪算法其實(shí)和延遲隊(duì)列比較相似。與延遲隊(duì)列相比,其性能更優(yōu)越,任務(wù)觸發(fā)時(shí)間延遲時(shí)間更低,代碼復(fù)雜度更簡(jiǎn)單。同樣,由于信息存儲(chǔ)于內(nèi)存中,所以容易因?yàn)橄到y(tǒng)重啟或宕機(jī)而丟失訂單信息。
5.Redis
我們都知道 Redis 中的 key 可以設(shè)置過(guò)期時(shí)間,顯而易見(jiàn),通過(guò)設(shè)置過(guò)期時(shí)間然后監(jiān)聽(tīng)這個(gè) key 是否過(guò)期就能判斷支付訂單是否超時(shí)了。而Redis本身就具備key過(guò)期監(jiān)聽(tīng)功能,即利用 Redis 的Keyspace Notifications
功能,當(dāng)一個(gè) key 過(guò)期時(shí),Redis 會(huì)向已訂閱了相關(guān) channel 的客戶端發(fā)送一個(gè)通知。
5.1.修改配置
首先我們需要打開(kāi) redis.conf 文件,開(kāi)啟Keyspace Notifications
功能,即修改如下配置。
notify-keyspace-events Ex
如圖所示。
隨后啟動(dòng) redis 服務(wù)端。
5.2.導(dǎo)入依賴
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
5.3.測(cè)試案例
@Slf4j
public class RedisKeyNotify {
static JedisPool jedisPool = null;
public static void main(String args[]) throws InterruptedException {
new Thread(() -> {
// 配置redis連接
jedisPool = new JedisPool("localhost", 6379);
// 訂閱redis的key過(guò)期通知
jedisPool.getResource().subscribe(new RedisSub(),"__keyevent@0__:expired");
}).start();
// 等待jedis初始化完
TimeUnit.SECONDS.sleep(1);
// 模擬一些數(shù)據(jù)
jedisPool.getResource().setex("1",3,"1");
jedisPool.getResource().setex("2",6,"2");
}
static class RedisSub extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
log.info("訂單{}超時(shí),處理完畢",message);
}
}
}
5.4.日志輸出
23:50:57.500 [Thread-0] INFO com.payment.demo.redis.RedisKeyNotify - 訂單1超時(shí),處理完畢
23:51:00.382 [Thread-0] INFO com.payment.demo.redis.RedisKeyNotify - 訂單2超時(shí),處理完畢
5.5.小結(jié)
Redis的鍵過(guò)期事件處理機(jī)制天然支持高并發(fā)場(chǎng)景,只要Redis集群足夠強(qiáng)大,可以輕松處理大量訂單的過(guò)期處理。但是這種方式有一個(gè)很嚴(yán)重的弊端,在官方網(wǎng)站中有如下提醒:
Note: Redis Pub/Sub is fire and forget that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.
注意:Redis 的發(fā)布/訂閱目前是即發(fā)即棄(fire and forget)模式的,也就是說(shuō),如果您的Pub/Sub客戶端斷開(kāi)連接,稍后再重新連接,則客戶端斷開(kāi)時(shí)傳遞的所有事件都將丟失。因此無(wú)法實(shí)現(xiàn)事件的可靠通知。
6.消息隊(duì)列(RocketMQ)
延遲隊(duì)列可以直接處理延遲消息,即消息在指定的延遲時(shí)間過(guò)后才被投遞給消費(fèi)者。在支付超時(shí)取消訂單的場(chǎng)景中,訂單創(chuàng)建時(shí)將訂單信息封裝成消息,并設(shè)置消息的延遲時(shí)間,當(dāng)訂單超時(shí)時(shí),消息自動(dòng)被投遞到處理超時(shí)訂單的隊(duì)列,消費(fèi)者接收到消息后執(zhí)行取消操作。
以 RocketMQ 為例,在 RocketMQ 中沒(méi)有延遲隊(duì)列這一概念,但是我們可以通過(guò)延遲消息(Delayed Message)實(shí)現(xiàn)這一功能。有關(guān)RocketMQ的安裝部署請(qǐng)移步 《RocketMQ安裝部署+簡(jiǎn)單實(shí)戰(zhàn)開(kāi)發(fā)》
6.1.延遲級(jí)別
RocketMQ 一共支持18個(gè)等級(jí)的延時(shí)投遞。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-855651.html
投遞等級(jí)(delay level) | 延遲時(shí)間 | 投遞等級(jí)(delay level) | 延遲時(shí)間 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
6.2.生產(chǎn)者代碼
延時(shí)消息的實(shí)現(xiàn)邏輯需要先經(jīng)過(guò)定時(shí)存儲(chǔ)等待觸發(fā),延時(shí)時(shí)間到達(dá)后才會(huì)被投遞給消費(fèi)者。因此,如果將大量延時(shí)消息的定時(shí)時(shí)間設(shè)置為同一時(shí)刻,則到達(dá)該時(shí)刻后會(huì)有大量消息同時(shí)需要被處理,會(huì)造成系統(tǒng)壓力過(guò)大,導(dǎo)致消息分發(fā)延遲,影響定時(shí)精度。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-855651.html
@Component
@Slf4j
public class DelayMsgSend {
/**
* 導(dǎo)入RocketMQ模版工具
*/
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 發(fā)送延遲消息
*
* @param topic 主題
* @param msg 消息內(nèi)容 (本次案例為支付訂單id)
* @param timeout 超時(shí)時(shí)間(單位:毫秒)
* @param delayLevel 延遲級(jí)別
*/
public void sendDelayMsg(String topic, String msg, int timeout, int delayLevel) {
// 創(chuàng)建消息載體
Message<String> build = MessageBuilder.withPayload(msg).build();
// 同步發(fā)送(也可以選擇異步發(fā)送)
SendResult sendResult = rocketMQTemplate.syncSend(topic, build, timeout, delayLevel);
log.info("延遲消息發(fā)送成功。發(fā)送結(jié)果:{}",sendResult);
}
}
6.3.消費(fèi)者代碼
@Slf4j
@Component
@RocketMQMessageListener(
topic = "delay_topic",
consumerGroup = "order_consumer_group",
selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING
)
public class DelayMsgConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("接收到訂單id[{}]。判斷是否超時(shí),并執(zhí)行相關(guān)邏輯",message);
}
}
6.4.小結(jié)
- 優(yōu)點(diǎn)
- 訂單創(chuàng)建、消息發(fā)送、支付取消等業(yè)務(wù)功能都是獨(dú)立的,有利于系統(tǒng)的模塊化和拓展。
- RocketMQ采用了多種機(jī)制保證消息的可靠性傳輸,如同步刷盤、主從復(fù)制等。這意味著一旦消息發(fā)送成功,將會(huì)被可靠地傳輸?shù)较㈥?duì)列中,不易丟失。
- RocketMQ具備高吞吐量的特點(diǎn),能夠處理大量的消息,并且能動(dòng)態(tài)隨訂單量調(diào)整消費(fèi)速度。
- 缺點(diǎn)
- 由于引入了消息中間件,所以會(huì)涉及到消息中間件配置和管理,增加了系統(tǒng)的復(fù)雜性。
- 高度依賴消息中間件的可用性和穩(wěn)定性。
- 仍有小概率會(huì)丟失信息,這個(gè)也是不可避免的,任何方式都沒(méi)有絕對(duì)保證。
到了這里,關(guān)于支付超時(shí)取消訂單實(shí)現(xiàn)方案 - 定時(shí)任務(wù)、延遲隊(duì)列、消息隊(duì)列等的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!