前言
· 分布式事務的問題常在業(yè)務與面試中被提及, 近日摸魚看到這篇文章, 闡述的非常通俗易懂, 固持久化下來我博客中, 也以便于我二刷
轉載源: 基于RocketMQ分布式事務 - 完整示例
- 本文代碼不只是簡單的demo,考慮到一些異常情況、冪等性消費和死信隊列等情況,盡量向可靠業(yè)務場景靠攏。
事務消息
在這里,筆者不想使用大量的文字贅述 RocketMQ事務消息的原理,我們只需要搞明白兩個概念。
-
Half Message
,半消息
暫時不能被 Consumer消費的消息。Producer已經(jīng)把消息發(fā)送到 Broker端,但是此消息的狀態(tài)被標記為不能投遞,處于這種狀態(tài)下的消息稱為半消息。事實上,該狀態(tài)下的消息會被放在一個叫做 RMQ_SYS_TRANS_HALF_TOPIC
的主題下
當 Producer端對它二次確認后,也就是 Commit之后,Consumer端才可以消費到;那么如果是Rollback,該消息則會被刪除,永遠不會被消費到。
- 事務狀態(tài)回查
我們想,可能會因為網(wǎng)絡原因、應用問題等,導致Producer端一直沒有對這個半消息進行確認,那么這時候 Broker服務器會定時掃描這些半消息,主動找Producer端查詢該消息的狀態(tài)。
當然,什么時候去掃描,包含掃描幾次,我們都可以配置,在后文我們再細說。
簡而言之,RocketMQ事務消息的實現(xiàn)原理就是基于兩階段提交和事務狀態(tài)回查,來決定消息最終是提交還是回滾的。
在本文,我們的代碼就以 訂單服務、積分服務 為例。結合上文來看,整體流程如下:
場景代碼示例
訂單服務
在訂單服務中,我們接收前端的請求創(chuàng)建訂單,保存相關數(shù)據(jù)到本地數(shù)據(jù)庫。
事務日志表
在訂單服務中,除了有一張訂單表之外,還需要一個事務日志表。 它的定義如下:
CREATE TABLE `transaction_log` (
`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '事務ID',
`business` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '業(yè)務標識',
`foreign_key` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '對應業(yè)務表中的主鍵',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
這張表專門作用于事務狀態(tài)回查。當提交業(yè)務數(shù)據(jù)時,此表也插入一條數(shù)據(jù),它們共處一個本地事務中。通過事務ID查詢該表,如果返回記錄,則證明本地事務已提交;如果未返回記錄,則本地事務可能是未知狀態(tài)或者是回滾狀態(tài)。
TransactionMQProducer
我們知道,通過 RocketMQ發(fā)送消息,需先創(chuàng)建一個消息發(fā)送者。值得注意的是,如果發(fā)送事務消息,在這里我們的創(chuàng)建的實例必須是 TransactionMQProducer
。
@Component
public class TransactionProducer {
private String producerGroup = "order_trans_group";
private TransactionMQProducer producer;
//用于執(zhí)行本地事務和事務狀態(tài)回查的監(jiān)聽器
@Autowired
OrderTransactionListener orderTransactionListener;
//執(zhí)行任務的線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
@PostConstruct
public void init(){
producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendMsgTimeout(Integer.MAX_VALUE);
producer.setExecutorService(executor);
producer.setTransactionListener(orderTransactionListener);
this.start();
}
private void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
//事務消息發(fā)送
public TransactionSendResult send(String data, String topic) throws MQClientException {
Message message = new Message(topic,data.getBytes());
return this.producer.sendMessageInTransaction(message, null);
}
}
上面的代碼中,主要就是創(chuàng)建事務消息的發(fā)送者。在這里,我們重點關注 OrderTransactionListener
,它負責執(zhí)行本地事務和事務狀態(tài)回查。
OrderTransactionListener
@Component
public class OrderTransactionListener implements TransactionListener {
@Autowired
OrderService orderService;
@Autowired
TransactionLogService transactionLogService;
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
logger.info("開始執(zhí)行本地事務....");
LocalTransactionState state;
try{
String body = new String(message.getBody());
OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
orderService.createOrder(order,message.getTransactionId());
state = LocalTransactionState.COMMIT_MESSAGE;
logger.info("本地事務已提交。{}",message.getTransactionId());
}catch (Exception e){
logger.info("執(zhí)行本地事務失敗。{}",e);
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
logger.info("開始回查本地事務狀態(tài)。{}",messageExt.getTransactionId());
LocalTransactionState state;
String transactionId = messageExt.getTransactionId();
if (transactionLogService.get(transactionId)>0){
state = LocalTransactionState.COMMIT_MESSAGE;
}else {
state = LocalTransactionState.UNKNOW;
}
logger.info("結束本地事務狀態(tài)查詢:{}",state);
return state;
}
}
在通過 producer.sendMessageInTransaction
發(fā)送事務消息后,如果消息發(fā)送成功,就會調(diào)用到這里的executeLocalTransaction
方法,來執(zhí)行本地事務。在這里,它會完成訂單數(shù)據(jù)和事務日志的插入。
該方法返回值 LocalTransactionState
代表本地事務狀態(tài),它是一個枚舉類。
public enum LocalTransactionState {
//提交事務消息,消費者可以看到此消息
COMMIT_MESSAGE,
//回滾事務消息,消費者不會看到此消息
ROLLBACK_MESSAGE,
//事務未知狀態(tài),需要調(diào)用事務狀態(tài)回查,確定此消息是提交還是回滾
UNKNOW;
}
那么, checkLocalTransaction
方法就是用于事務狀態(tài)查詢。在這里,我們通過事務ID查詢transaction_log這張表,如果可以查詢到結果,就提交事務消息;如果沒有查詢到,就返回未知狀態(tài)。
注意,這里還涉及到另外一個問題。如果是返回未知狀態(tài),RocketMQ Broker服務器會以1分鐘的間隔時間不斷回查,直至達到事務回查最大檢測數(shù),如果超過這個數(shù)字還未查詢到事務狀態(tài),則回滾此消息。
當然,事務回查的頻率和最大次數(shù),我們都可以配置。在 Broker 端,可以通過這樣來配置它:
brokerConfig.setTransactionCheckInterval(10000); //回查頻率10秒一次
brokerConfig.setTransactionCheckMax(3); //最大檢測次數(shù)為3
業(yè)務實現(xiàn)類
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
OrderMapper orderMapper;
@Autowired
TransactionLogMapper transactionLogMapper;
@Autowired
TransactionProducer producer;
Snowflake snowflake = new Snowflake(1,1);
Logger logger = LoggerFactory.getLogger(this.getClass());
//執(zhí)行本地事務時調(diào)用,將訂單數(shù)據(jù)和事務日志寫入本地數(shù)據(jù)庫
@Transactional
@Override
public void createOrder(OrderDTO orderDTO,String transactionId){
//1.創(chuàng)建訂單
Order order = new Order();
BeanUtils.copyProperties(orderDTO,order);
orderMapper.createOrder(order);
//2.寫入事務日志
TransactionLog log = new TransactionLog();
log.setId(transactionId);
log.setBusiness("order");
log.setForeignKey(String.valueOf(order.getId()));
transactionLogMapper.insert(log);
logger.info("訂單創(chuàng)建完成。{}",orderDTO);
}
//前端調(diào)用,只用于向RocketMQ發(fā)送事務消息
@Override
public void createOrder(OrderDTO order) throws MQClientException {
order.setId(snowflake.nextId());
order.setOrderNo(snowflake.nextIdStr());
producer.send(JSON.toJSONString(order),"order");
}
}
在訂單業(yè)務服務類中,我們有兩個方法。一個用于向RocketMQ發(fā)送事務消息,一個用于真正的業(yè)務數(shù)據(jù)落庫。
至于為什么這樣做,其實有一些原因的,我們后面再說。
調(diào)用
@RestController
public class OrderController {
@Autowired
OrderService orderService;
Logger logger = LoggerFactory.getLogger(this.getClass());
@PostMapping("/create_order")
public void createOrder(@RequestBody OrderDTO order) throws MQClientException {
logger.info("接收到訂單數(shù)據(jù):{}",order.getCommodityCode());
orderService.createOrder(order);
}
}
總結
目前已經(jīng)完成了訂單服務的業(yè)務邏輯。我們總結流程如下:
考慮到異常情況,這里的要點如下:
- 第一次調(diào)用createOrder,發(fā)送事務消息。如果發(fā)送失敗,導致報錯,則將異常返回,此時不會涉及到任何數(shù)據(jù)安全。
- 如果事務消息發(fā)送成功,但在執(zhí)行本地事務時發(fā)生異常,那么訂單數(shù)據(jù)和事務日志都不會被保存,因為它們是一個本地事務中。
- 如果執(zhí)行完本地事務,但未能及時的返回本地事務狀態(tài)或者返回了未知狀態(tài)。那么,會由Broker定時回查事務狀態(tài),然后根據(jù)事務日志表,就可以判斷訂單是否已完成,并寫入到數(shù)據(jù)庫。
基于這些要素,我們可以說,已經(jīng)保證了訂單服務和事務消息的一致性。那么,接下來就是積分服務如何正確的消費訂單數(shù)據(jù)并完成相應的業(yè)務操作。
積分服務
在積分服務中,主要就是消費訂單數(shù)據(jù),然后根據(jù)訂單內(nèi)容,給相應用戶增加積分。
積分記錄表
CREATE TABLE `t_points` (
`id` bigint(16) NOT NULL COMMENT '主鍵',
`user_id` bigint(16) NOT NULL COMMENT '用戶id',
`order_no` bigint(16) NOT NULL COMMENT '訂單編號',
`points` int(4) NOT NULL COMMENT '積分',
`remarks` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '備注',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
在這里,我們重點關注order_no字段,它是實現(xiàn)冪等消費的一種選擇。
消費者啟動
@Component
public class Consumer {
String consumerGroup = "consumer-group";
DefaultMQPushConsumer consumer;
@Autowired
OrderListener orderListener;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("order","*");
consumer.registerMessageListener(orderListener);
consumer.start();
}
}
啟動一個消費者比較簡單,我們指定要消費的 topic 和監(jiān)聽器就好了。
消費者監(jiān)聽器
@Component
public class OrderListener implements MessageListenerConcurrently {
@Autowired
PointsService pointsService;
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
logger.info("消費者線程監(jiān)聽到消息。");
try{
for (MessageExt message:list) {
logger.info("開始處理訂單數(shù)據(jù),準備增加積分....");
OrderDTO order = JSONObject.parseObject(message.getBody(), OrderDTO.class);
pointsService.increasePoints(order);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
logger.error("處理消費者數(shù)據(jù)發(fā)生異常。{}",e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
監(jiān)聽到消息之后,調(diào)用業(yè)務服務類處理即可。處理完成則返回CONSUME_SUCCESS以提交,處理失敗則返回RECONSUME_LATER
來重試。
增加積分
在這里,主要就是對積分數(shù)據(jù)入庫。但注意,入庫之前需要先做判斷,來達到冪等性消費。
@Service
public class PointsServiceImpl implements PointsService {
@Autowired
PointsMapper pointsMapper;
Snowflake snowflake = new Snowflake(1,1);
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void increasePoints(OrderDTO order) {
//入庫之前先查詢,實現(xiàn)冪等
if (pointsMapper.getByOrderNo(order.getOrderNo())>0){
logger.info("積分添加完成,訂單已處理。{}",order.getOrderNo());
}else{
Points points = new Points();
points.setId(snowflake.nextId());
points.setUserId(order.getUserId());
points.setOrderNo(order.getOrderNo());
Double amount = order.getAmount();
points.setPoints(amount.intValue()*10);
points.setRemarks("商品消費共【"+order.getAmount()+"】元,獲得積分"+points.getPoints());
pointsMapper.insert(points);
logger.info("已為訂單號碼{}增加積分。",points.getOrderNo());
}
}
}
冪等性消費
實現(xiàn)冪等性消費的方式有很多種,具體怎么做,根據(jù)自己的情況來看。
比如,在本例中,我們直接將訂單號和積分記錄綁定在同一個表中,在增加積分之前,就可以先查詢此訂單是否已處理過。
或者,我們也可以額外創(chuàng)建一張表,來記錄訂單的處理情況。
再者,也可以將這些信息直接放到redis緩存里,在入庫之前先查詢緩存。
不管以哪種方式來做,總的思路就是在執(zhí)行業(yè)務前,必須先查詢該消息是否被處理過。那么這里就涉及到一個數(shù)據(jù)主鍵問題,在這個例子中,我們以訂單號為主鍵,也可以用事務ID作主鍵,如果是普通消息的話,我們也可以創(chuàng)建唯一的消息ID作為主鍵。
消費異常
我們知道,當消費者處理失敗后會返回 RECONSUME_LATER ,讓消息來重試,默認最多重試16次。
那,如果真的由于特殊原因,消息一直不能被正確處理,那怎么辦 ?
我們考慮兩種方式來解決這個問題。
第一,在代碼中設置消息重試次數(shù),如果達到指定次數(shù),就發(fā)郵件或者短信通知業(yè)務方人工介入處理。
@Component
public class OrderListener implements MessageListenerConcurrently {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
logger.info("消費者線程監(jiān)聽到消息。");
for (MessageExt message:list) {
if (!processor(message)){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 消息處理,第3次處理失敗后,發(fā)送郵件通知人工介入
* @param message
* @return
*/
private boolean processor(MessageExt message){
String body = new String(message.getBody());
try {
logger.info("消息處理....{}",body);
int k = 1/0;
return true;
}catch (Exception e){
if(message.getReconsumeTimes()>=3){
logger.error("消息重試已達最大次數(shù),將通知業(yè)務人員排查問題。{}",message.getMsgId());
sendMail(message);
return true;
}
return false;
}
}
}
第二,等待消息重試最大次數(shù)后,進入死信隊列。
消息重試最大次數(shù)默認是16次,我們也可以在消費者端設置這個次數(shù)。
consumer.setMaxReconsumeTimes(3);//設置消息重試最大次數(shù)
死信隊列的主題名稱是 %DLQ% + 消費者組名稱,比如在訂單數(shù)據(jù)中,我們設置了消費者組名:
String consumerGroup = "order-consumer-group";
那么這個消費者,對應的死信隊列主題名稱就是%DLQ%order-consumer-group
如上圖,我們還需要點擊TOPIC配置,來修改里面的 perm 屬性,改為 6 即可。文章來源:http://www.zghlxwxcb.cn/news/detail-605689.html
最后就可以通過程序代碼監(jiān)聽這個主題,來通知人工介入處理或者直接在控制臺查看處理了。通過冪等性消費和對死信消息的處理,基本上就能保證消息一定會被處理。文章來源地址http://www.zghlxwxcb.cn/news/detail-605689.html
到了這里,關于RocketMQ分布式事務 -> 最終一致性實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!