目錄
商城業(yè)務(wù)-訂單服務(wù)-RabbitMQ延時隊列
商城業(yè)務(wù)-訂單服務(wù)-延時隊列定時關(guān)單模擬
商城業(yè)務(wù)-訂單服務(wù)-創(chuàng)建業(yè)務(wù)交換機&隊列
商城業(yè)務(wù)-訂單服務(wù)-監(jiān)聽庫存解鎖
商城業(yè)務(wù)-訂單服務(wù)-庫存解鎖邏輯
商城業(yè)務(wù)-訂單服務(wù)-庫存自動解鎖完成
商城業(yè)務(wù)-訂單服務(wù)-測試庫存自動解鎖
商城業(yè)務(wù)-訂單服務(wù)-定時關(guān)單完成
商城業(yè)務(wù)-訂單服務(wù)-消息丟失、積壓、重復(fù)等解決方案
商城業(yè)務(wù)-訂單服務(wù)-RabbitMQ延時隊列
使用消息隊列的目的是:保證數(shù)據(jù)的最終一致性
采用定時任務(wù)的方式:每隔一段時間進(jìn)行全表的掃描,會消耗系統(tǒng)內(nèi)存和增加數(shù)據(jù)庫的壓力,最致命的是存在較大的時間誤差
假如:10:00定時任務(wù)開始執(zhí)行,則10:01有用戶下訂單但未支付,10:30的時候定時任務(wù)再次執(zhí)行,這個訂單還差1分鐘才能進(jìn)行關(guān)單操作,因此,下一次掃描到它要等到11:00,存在著29分鐘的誤差時間。
采用消息隊列可以完美的解決定時任務(wù)所帶來的缺陷?
假如:10:00下訂單,再下訂單之前先給消息隊列發(fā)送一條下單消息,等30分鐘自動發(fā)送關(guān)閉訂單消息,監(jiān)聽服務(wù)收到消息,去查看此訂單是否完成支付,若未完成支付則關(guān)閉訂單。誤差也就一兩分鐘。對于解鎖庫存也是同理。
設(shè)置消息的過期時間: 在過期時間內(nèi)都沒有被消費則此消息將會被丟棄并稱之為死信
設(shè)置隊列的過期時間:在此過期時間內(nèi)都沒有隊列被客戶端連接則隊列里的所有消息都被成為死信
死信路由: 消息過期未被消費的,則消息會被交給一個指定的路由器,這個路由器由于只接收死信所以被成為死信路由
RabbitMQ實現(xiàn)延時隊列的原理:通過設(shè)置隊列的過期時間使消息都變成死信,此隊列是不能被任何服務(wù)監(jiān)聽的,當(dāng)消息過期時,通過死信路由將死信路由給指定隊列,指定隊列只接收死信也就是延時消息,服務(wù)器專門監(jiān)聽指定隊列從而達(dá)到定時任務(wù)的效果。
實現(xiàn)1:給隊列設(shè)置過期時間,推薦使用
實現(xiàn)方式2:給消息設(shè)置過期時間,不推薦使用
不推薦使用的原因是:RabbitMQ采用的是懶檢查,假如第一個消息設(shè)置的是5分鐘過期,第二個消息設(shè)置的是2分鐘過期,第三個消息設(shè)置的是30s過期,RabbitMQ過來一看消息5分鐘后才過期,那么5分鐘之后才會來將消息路由并不會關(guān)注后面消息的過期時間。
商城業(yè)務(wù)-訂單服務(wù)-延時隊列定時關(guān)單模擬
按照下圖邏輯,模擬下單成功1分鐘后,收到關(guān)閉訂單的消息
編寫隊列、交換機,綁定關(guān)系?
容器中的 Binding、Queue、Exchange 都會自動創(chuàng)建(RabbitMQ沒有的情況)
RabbitMQ只要有,@Bean聲明的屬性發(fā)生變化也不會覆蓋
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyMQConfig {
@Bean
public Queue orderDelayQueue(){
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","order-event-exchange");
arguments.put("x-dead-letter-routing-key","order.release.order");
arguments.put("x-message-ttl",60000);
// String name, boolean durable, boolean exclusive, boolean autoDelete,
// @Nullable Map<String, Object> arguments
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
@Bean
public Queue orderReleaseOrderQueue(){
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
@Bean
public Exchange orderEventExchange(){
// String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
TopicExchange exchange = new TopicExchange("order-event-exchange", true, false);
return exchange;
}
@Bean
public Binding orderCreateOrderBinding(){
// String destination, DestinationType destinationType, String exchange, String routingKey,
// @Nullable Map<String, Object> arguments
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE, "order-event-exchange",
"order.create.order",
null);
}
@Bean
public Binding orderReleaseOrderBinding(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE, "order-event-exchange",
"order.release.order",
null);
}
}
監(jiān)聽關(guān)單事件
模擬訂單完成?
商城業(yè)務(wù)-訂單服務(wù)-創(chuàng)建業(yè)務(wù)交換機&隊列
解鎖庫存的實現(xiàn):
①庫存服務(wù)導(dǎo)入RabbitMQ的依賴
<!-- RabbitMQ的依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
② RabbitMQ的配置
spring:
rabbitmq:
host: 192.168.56.22
virtual-host: /
③?配置RabbitMQ的序列化機制
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitMQConfig {
@Bean
public AbstractMessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
④ 開啟RabbitMQ
⑤ 按照下圖創(chuàng)建交換機、隊列、綁定關(guān)系
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyRabbitMQConfig {
@RabbitListener(queues = "stock.release.stock.queue")
public void handle(Message message){
}
@Bean
public AbstractMessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public Exchange stockEventExchange(){
// String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
return new TopicExchange("stock-event-exchange",true,false);
}
@Bean
public Queue stockReleaseStockQueue(){
// String name, boolean durable, boolean exclusive, boolean autoDelete,@Nullable Map<String, Object> arguments
return new Queue("stock.release.stock.queue",true,false,false,null);
}
@Bean
public Queue stockDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","stock-event-exchange");
arguments.put("x-dead-letter-routing-key","stock.release");
arguments.put("x-message-ttl",120000);
// String name, boolean durable, boolean exclusive, boolean autoDelete,@Nullable Map<String, Object> arguments
return new Queue("stock.delay.queue",true,false,false,arguments);
}
@Bean
public Binding stockReleaseBinding(){
// String destination, DestinationType destinationType, String exchange, String routingKey,@Nullable Map<String, Object> arguments
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);
}
@Bean
public Binding stockLockedBinding(){
return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,
"stock-event-exchange","stock.locked",null);
}
}
出現(xiàn)問題:?并沒創(chuàng)建交換機、隊列、綁定關(guān)系
出現(xiàn)問題的原因:只有當(dāng)?shù)谝淮芜B接上RabbitMQ時,發(fā)現(xiàn)沒有這些東西才會創(chuàng)建
解決方案:監(jiān)聽隊列
交換機、隊列、綁定關(guān)系創(chuàng)建成功后,將上述代碼注釋
商城業(yè)務(wù)-訂單服務(wù)-監(jiān)聽庫存解鎖
庫存解鎖的兩種場景:
①下單成功,訂單過期沒有支付被系統(tǒng)自動取消、被用戶手動取消。都要解鎖
②下單成功,庫存鎖定成功,接下來的業(yè)務(wù)調(diào)用失敗,導(dǎo)致訂單回滾。之前鎖定的庫存就要自動解鎖
①?加上全參和無參構(gòu)造器注解
② 保存工作單詳情方便回溯
③ Common服務(wù)中創(chuàng)建To,方便MQ發(fā)送消息
如果To僅僅保存這個兩個數(shù)據(jù)的話,會存在一些問題, 當(dāng)1號訂單在1號倉庫扣減1件商品成功,2號訂單在2號倉庫扣減2件商品成功,3號訂單在3號倉庫扣減3件商品失敗時,庫存工作單的數(shù)據(jù)將會回滾,此時,數(shù)據(jù)庫中將查不到1號和2號訂單的庫存工作單的數(shù)據(jù),但是庫存扣減是成功的,導(dǎo)致無法解鎖庫存
解決方案: 保存庫存工作詳情To
④ 向MQ發(fā)送庫存鎖定成功的消息
?
商城業(yè)務(wù)-訂單服務(wù)-庫存解鎖邏輯&庫存自動解鎖完成&測試庫存自動解鎖
解鎖場景:
1.下單成功,庫存鎖定成功,接下來的業(yè)務(wù)調(diào)用失敗導(dǎo)致訂單回滾。之前鎖定的庫存就要自動解鎖。
2.鎖庫存失敗無需解鎖
解決方案:通過查詢訂單的鎖庫存信息,如果有則僅僅說明庫存鎖定成功,還需判斷是否有訂單信息,如果有訂單信息則判斷訂單狀態(tài),若訂單狀態(tài)已取消則解鎖庫存,反之:不能解鎖庫存,如果沒有訂單信息則需要解鎖庫存,如果沒有鎖庫存信息則無需任何操作。
1.編寫Vo,通過拷貝訂單實體,用于接收訂單信息
2. 遠(yuǎn)程服務(wù)編寫,獲取訂單狀態(tài)
?3.監(jiān)聽事件
/**
* 解鎖庫存服務(wù)
* @param stockLockedTo
*/
@Override
public void unlockStock(StockLockedTo stockLockedTo){
StockDetailTo detail = stockLockedTo.getDetail();
Long detailId = detail.getId();
// 查詢庫存工作單的信息 有:解鎖庫存 沒有:庫存鎖定失敗,數(shù)據(jù)自定義回滾無需解鎖
WareOrderTaskDetailEntity detailEntity = wareOrderTaskDetailService.getById(detailId);
if (null!=detailEntity){
// 有,解鎖庫存
Long id = stockLockedTo.getId(); // 庫存工作單的id
WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskService.getById(id);
String orderSn = wareOrderTaskEntity.getOrderSn();
// 遠(yuǎn)程服務(wù)調(diào)用,獲取訂單狀態(tài)信息
// 先判斷訂單是否存在
R r = orderFeignService.getOrderStatus(orderSn);
if (r.getCode().equals(0)){
OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {});
if (null==orderVo || orderVo.getStatus().equals(4)){
// 訂單不存在或者訂單被關(guān)閉,都需要去解鎖庫存
// 當(dāng)且僅當(dāng) 鎖定狀態(tài)為 已鎖定 時 才去解鎖
if (detailEntity.getLockStatus().equals(1)){
releaseStock(detailEntity.getSkuId(),detailEntity.getWareId(),detail.getSkuNum(),detailEntity.getId());
}
}
}else {
// 遠(yuǎn)程服務(wù)調(diào)用失敗,拋出異常 將消息放回消息隊列中
throw new RuntimeException("遠(yuǎn)程調(diào)用訂單服務(wù)失?。。?!");
}
}else {
// 訂單的庫存詳情信息不存在,無需解鎖
}
}
private void releaseStock(Long skuId, Long wareId, Integer skuNum,Long taskDetailId) {
// 解鎖庫存
wareSkuDao.unLockStock(skuId,wareId,skuNum);
// 更新庫存工作單狀態(tài)
WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();
wareOrderTaskDetailEntity.setId(taskDetailId);
wareOrderTaskDetailEntity.setLockStatus(2);
wareOrderTaskDetailService.updateById(wareOrderTaskDetailEntity);
}
4. 遠(yuǎn)程服務(wù)調(diào)用可能會出現(xiàn)失敗,需要設(shè)置手動ACK,確保其它服務(wù)能消費此消息
#手動ACK設(shè)置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
出現(xiàn)問題: 遠(yuǎn)程調(diào)用訂單服務(wù)時被攔截器攔截
解決方案:請求路徑適配放行
商城業(yè)務(wù)-訂單服務(wù)-定時關(guān)單完成
1.定時關(guān)單代碼編寫
①訂單創(chuàng)建成功,給MQ發(fā)送關(guān)單消息
② 監(jiān)聽事件,進(jìn)行關(guān)單
??
訂單釋放和庫存解鎖邏輯: 當(dāng)訂單創(chuàng)建成功之后,向MQ發(fā)送關(guān)單消息,過期時間為1分鐘,向MQ發(fā)送解鎖庫存消息,過期時間為2分鐘,關(guān)單操作完成之后,過了1分鐘解鎖庫存操作。
存在問題:由于機器卡頓、消息延遲等導(dǎo)致關(guān)單消息未延遲發(fā)送,解鎖庫存消息正常發(fā)送和監(jiān)聽,導(dǎo)致解鎖庫存消息被消費,當(dāng)執(zhí)行完關(guān)單操作后便無法再執(zhí)行解鎖庫存操作,導(dǎo)致卡頓的訂單永遠(yuǎn)無法解鎖庫存。
解決方案:采取主動補償?shù)牟呗浴.?dāng)關(guān)單操作正常完成之后,主動去發(fā)送解鎖庫存消息給MQ,監(jiān)聽解鎖庫存消息進(jìn)行解鎖。
③ 按上圖創(chuàng)建綁定關(guān)系
④ common服務(wù)中,創(chuàng)建CreateTo(拷貝order實體)?
⑤ 向MQ發(fā)送解鎖庫存消息
⑥ 解鎖庫存操作
商城業(yè)務(wù)-訂單服務(wù)-消息丟失、積壓、重復(fù)等解決方案
情況一: 消息發(fā)送出去但是由于網(wǎng)絡(luò)原因未到達(dá)服務(wù)器,解決方案:采用try-catch將發(fā)送失敗的消息持久化到數(shù)據(jù)庫中,采用定期掃描重發(fā)的方式。
drop table if exists mq_message;
CREATE TABLE `mq_message` (
`message_id` CHAR(32) NOT NULL,
`content` TEXT,
`to_exchange` VARCHAR(255) DEFAULT NULL,
`routing_key` VARCHAR(255) DEFAULT NULL,
`class_type` VARCHAR(255) DEFAULT NULL,
`message_status` INT(1) DEFAULT '0' COMMENT '0-新建 1-已發(fā)送 2-錯誤抵達(dá) 3-已抵達(dá)',
`create_time` DATETIME DEFAULT NULL,
`update_time` DATETIME DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4
情況二:消息抵達(dá)服務(wù)器的隊列中才算完成消息的持久化,解決方案publish的ack機制
情況三: 防止自動ack帶來的缺陷,采用手動ack,解決方案上面都有這里不再細(xì)說
消息被成功消費,ack時宕機,消息由unack變成ready,Broker又重新發(fā)送。解決方案:將消費者的業(yè)務(wù)消費接口應(yīng)該設(shè)計為冪等性的,比如扣庫存有工作單的狀態(tài)標(biāo)志。
文章來源:http://www.zghlxwxcb.cn/news/detail-403066.html
消息積壓即消費者的消費能力不夠, 上線更多的消費者進(jìn)行正常的消費。文章來源地址http://www.zghlxwxcb.cn/news/detail-403066.html
到了這里,關(guān)于谷粒商城-訂單服務(wù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!