国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

基于RocketMQ實(shí)現(xiàn)分布式事務(wù)

這篇具有很好參考價(jià)值的文章主要介紹了基于RocketMQ實(shí)現(xiàn)分布式事務(wù)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

基于RocketMQ實(shí)現(xiàn)分布式事務(wù)

背景

在一個(gè)微服務(wù)架構(gòu)的項(xiàng)目中,一個(gè)業(yè)務(wù)操作可能涉及到多個(gè)服務(wù),這些服務(wù)往往是獨(dú)立部署,構(gòu)成一個(gè)個(gè)獨(dú)立的系統(tǒng)。這種分布式的系統(tǒng)架構(gòu)往往面臨著分布式事務(wù)的問題。為了保證系統(tǒng)數(shù)據(jù)的一致性,我們需要確保這些服務(wù)中的操作要么全部成功,要么全部失敗。通過使用RocketMQ實(shí)現(xiàn)分布式事務(wù),我們可以協(xié)調(diào)這些服務(wù)的操作,保證數(shù)據(jù)的一致性。

功能原理

RocketMQ的分布式事務(wù)消息功能,在普通消息基礎(chǔ)上,支持二階段的提交。將二階段提交和本地事務(wù)綁定,實(shí)現(xiàn)全局提交結(jié)果的一致性。

整個(gè)事務(wù)消息的詳細(xì)交互流程如下圖所示:

基于RocketMQ實(shí)現(xiàn)分布式事務(wù)

1、生產(chǎn)者將消息發(fā)送至RocketMQ服務(wù)端。

2、RocketMQ服務(wù)端將消息持久化成功之后,向生產(chǎn)者返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息被標(biāo)記為"暫不能投遞",這種狀態(tài)下的消息即為半事務(wù)消息。

3、生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。

4、生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果(Commit或是Rollback),服務(wù)端收到確認(rèn)結(jié)果后處理邏輯如下:

  • 二次確認(rèn)結(jié)果為Commit:服務(wù)端將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費(fèi)者。

  • 二次確認(rèn)結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會(huì)將半事務(wù)消息投遞給消費(fèi)者。

5、在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到生產(chǎn)者提交的二次確認(rèn)結(jié)果,或服務(wù)端收到的二次確認(rèn)結(jié)果為Unknown未知狀態(tài),經(jīng)過固定時(shí)間后,服務(wù)端將對(duì)消息生產(chǎn)者集群中任一生產(chǎn)者實(shí)例發(fā)起消息回查。

6、生產(chǎn)者收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。

7、生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行處理。

注意問題

消息類型
事務(wù)消息僅支持在MessageType為Transaction的主題使用,即事務(wù)消息只能發(fā)送至類型為事務(wù)消息的主題中。

消息消費(fèi)
RocketMQ事務(wù)消息保證生產(chǎn)者本地事務(wù)和下游消息發(fā)送事務(wù)的一致性,但不保證消息消費(fèi)結(jié)果和上游事務(wù)的一致性。因此需要下游業(yè)務(wù)自行保證消息正確處理,建議消費(fèi)端做好消費(fèi)重試。

中間狀態(tài)
RocketMQ事務(wù)消息一致性為最終一致性,即在消息提交到下游消費(fèi)端處理完成之前,下游和上游事務(wù)之間的狀態(tài)會(huì)不一致。因此,事務(wù)消息僅適合能接受異步執(zhí)行的場(chǎng)景。

事務(wù)超時(shí)
RocketMQ事務(wù)消息的生命周期存在超時(shí)機(jī)制,即半事務(wù)消息被生產(chǎn)者發(fā)送服務(wù)端后,如果在指定時(shí)間內(nèi)服務(wù)端無法確認(rèn)提交或者回滾狀態(tài),則消息默認(rèn)會(huì)被回滾。

示例代碼

以下為RocketMQ 4.x版本事務(wù)消息示例代碼,

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.*;

public class RocketMqTransactionDemo {
	public static void main(String[] args) throws Exception {
		// 創(chuàng)建事務(wù)消息生產(chǎn)者
		TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
		producer.setNamesrvAddr("127.0.0.1:9876");

		// 設(shè)置事務(wù)監(jiān)聽器
		TransactionListener transactionListener = new MyTransactionListener();
		producer.setTransactionListener(transactionListener);

		// 設(shè)置事務(wù)回查的線程池,可以不必設(shè)置,如果不設(shè)置也會(huì)默認(rèn)生成一個(gè)
		ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue <Runnable> (2000), new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				Thread thread = new Thread(r);
				thread.setName("client-transaction-msg-check-thread");
				return thread;
			}
		});
		producer.setExecutorService(executorService);

		// 啟動(dòng)生產(chǎn)者
		producer.start();

		// 發(fā)送事務(wù)消息
		Message message = new Message("transaction_topic", "test_tag", "test_key", "Hello RocketMQ".getBytes());
		producer.sendMessageInTransaction(message, null);

		// 關(guān)閉生產(chǎn)者
		producer.shutdown();
	}
}

/**
 * 事務(wù)監(jiān)聽器
 */
class MyTransactionListener implements TransactionListener {
	@Override
	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		// 執(zhí)行本地事務(wù)操作
		System.out.println("執(zhí)行本地事務(wù)操作,消息內(nèi)容:" + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE; // 提交事務(wù),允許消費(fèi)者消費(fèi)該消息
		// return LocalTransactionState.ROLLBACK_MESSAGE;// 回滾事務(wù),消息將被丟棄不允許消費(fèi)。
		// return LocalTransactionState.UNKNOW;// 暫時(shí)無法判斷狀態(tài),等待固定時(shí)間以后Broker端根據(jù)回查規(guī)則向生產(chǎn)者進(jìn)行消息回查。
	}

	@Override
	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
		// 檢查本地事務(wù)狀態(tài)
		System.out.println("檢查本地事務(wù)狀態(tài),消息內(nèi)容:" + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE;
	}
}

代碼解釋:
1、事務(wù)消息的生產(chǎn)者使用TransactionMQProducer創(chuàng)建。
2、MyTransactionListener作為事務(wù)監(jiān)聽器,實(shí)現(xiàn)了接口TransactionListener,該接口有兩個(gè)方法,分別是:

  • executeLocalTransaction
    半事務(wù)消息發(fā)送成功后,執(zhí)行本地事務(wù)的方法,具體執(zhí)行完本地事務(wù)后,可以在該方法中返回以下三種狀態(tài):
    LocalTransactionState.COMMIT_MESSAGE: 提交事務(wù),允許消費(fèi)者消費(fèi)該消息。
    LocalTransactionState.ROLLBACK_MESSAGE: 回滾事務(wù),消息將被丟棄不允許消費(fèi)。
    LocalTransactionState.UNKNOW: 暫時(shí)無法判斷狀態(tài),等待固定時(shí)間以后RocketMQ服務(wù)端根據(jù)回查規(guī)則向生產(chǎn)者進(jìn)行消息回查。

  • checkLocalTransaction
    二次確認(rèn)消息沒有收到,RocketMQ服務(wù)端回查生產(chǎn)者端事務(wù)結(jié)果的方法?;夭橐?guī)則:本地事務(wù)執(zhí)行完成后,若RocketMQ服務(wù)端收到的本地事務(wù)返回狀態(tài)為L(zhǎng)ocalTransactionState.UNKNOW,或生產(chǎn)者應(yīng)用退出導(dǎo)致本地事務(wù)未提交任何狀態(tài)。則RocketMQ服務(wù)端會(huì)向消息生產(chǎn)者發(fā)起事務(wù)回查,第一次回查后仍未獲取到事務(wù)狀態(tài),則之后每隔一段時(shí)間會(huì)再次回查。文章來源地址http://www.zghlxwxcb.cn/news/detail-838864.html

到了這里,關(guān)于基于RocketMQ實(shí)現(xiàn)分布式事務(wù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【分布式技術(shù)專題】RocketMQ延遲消息實(shí)現(xiàn)原理和源碼分析

    【分布式技術(shù)專題】RocketMQ延遲消息實(shí)現(xiàn)原理和源碼分析

    痛點(diǎn)背景 業(yè)務(wù)場(chǎng)景 假設(shè)有這么一個(gè)需求,用戶下單后如果30分鐘未支付,則該訂單需要被關(guān)閉。你會(huì)怎么做? 之前方案 最簡(jiǎn)單的做法,可以服務(wù)端啟動(dòng)個(gè)定時(shí)器,隔個(gè)幾秒掃描數(shù)據(jù)庫(kù)中待支付的訂單,如果(當(dāng)前時(shí)間-訂單創(chuàng)建時(shí)間)30分鐘,則關(guān)閉訂單。 方案評(píng)估 優(yōu)點(diǎn):是實(shí)

    2024年02月13日
    瀏覽(24)
  • (快手一面)分布式系統(tǒng)是什么?為什么要分布式系統(tǒng)?分布式環(huán)境下會(huì)有哪些問題?分布式系統(tǒng)是如何實(shí)現(xiàn)事務(wù)的?

    《分布式系統(tǒng)原理與泛型》中這么定義分布式系統(tǒng): “ 分布式系統(tǒng)是若干獨(dú)立計(jì)算機(jī)的集合, 這些計(jì)算機(jī)對(duì)于用戶來說就像單個(gè)相關(guān)系統(tǒng) ”, 分布式系統(tǒng)(distributed system)是建立在網(wǎng)絡(luò)之上的軟件系統(tǒng)。 就比如:用戶在使用京東這個(gè)分布式系統(tǒng)的時(shí)候,會(huì)感覺是在使用一

    2024年02月08日
    瀏覽(26)
  • JAVA微服務(wù)分布式事務(wù)的幾種實(shí)現(xiàn)方式

    JAVA微服務(wù)分布式事務(wù)的幾種實(shí)現(xiàn)方式

    一致性(Consistency) :在分布式系統(tǒng)中所有的數(shù)據(jù)備份,在同一時(shí)刻都保持一致狀態(tài),如無法保證狀態(tài)一致,直接返回錯(cuò)誤; 可用性(Availability):在集群中一部分節(jié)點(diǎn)故障,也能保證客戶端訪問系統(tǒng)并得到正確響應(yīng),允許一定時(shí)間內(nèi)數(shù)據(jù)狀態(tài)不一致; 分區(qū)容錯(cuò)性(Partiti

    2024年02月12日
    瀏覽(20)
  • Spring Boot實(shí)現(xiàn)分布式事務(wù)的協(xié)調(diào)和管理

    在現(xiàn)代的分布式系統(tǒng)中,往往存在多個(gè)服務(wù)協(xié)同完成一個(gè)業(yè)務(wù)操作的情況。而在這種情況下,如何保證所有服務(wù)的數(shù)據(jù)一致性成為了一個(gè)重要的問題。Spring Boot作為一個(gè)流行的Java開發(fā)框架,提供了多種方法來實(shí)現(xiàn)分布式事務(wù)的協(xié)調(diào)和管理。本文將介紹一些常用的方式和技術(shù)來

    2024年02月08日
    瀏覽(23)
  • 分布式系統(tǒng)的多數(shù)據(jù)庫(kù),實(shí)現(xiàn)分布式事務(wù)回滾(1.7.0 seata整合2.0.4nacos)

    分布式系統(tǒng)的多數(shù)據(jù)庫(kù),實(shí)現(xiàn)分布式事務(wù)回滾(1.7.0 seata整合2.0.4nacos)

    1、解決的應(yīng)用場(chǎng)景是分布式事務(wù),每個(gè)服務(wù)有獨(dú)立的數(shù)據(jù)庫(kù)。 2、例如:A服務(wù)的數(shù)據(jù)庫(kù)是A1,B服務(wù)的數(shù)據(jù)庫(kù)是B2,A服務(wù)通過feign接口調(diào)用B服務(wù),B涉及提交數(shù)據(jù)到B2,業(yè)務(wù)是在B提交數(shù)據(jù)之后,在A服務(wù)內(nèi)報(bào)錯(cuò)。 所以,希望B能回滾事務(wù)。這就是跨庫(kù)的數(shù)據(jù)回滾 seata下載地址 注意

    2024年02月11日
    瀏覽(24)
  • springboot dubbo seata nacos集成 分布式事務(wù)seata實(shí)現(xiàn)

    springboot dubbo seata nacos集成 分布式事務(wù)seata實(shí)現(xiàn)

    官網(wǎng):http://seata.io/zh-cn/docs/overview/what-is-seata.html Seata 是一款開源的分布式事務(wù)解決方案,致力于提供高性能和簡(jiǎn)單易用的分布式事務(wù)服務(wù)。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務(wù)模式,為用戶打造一站式的分布式解決方案。 官網(wǎng);https://cn.dubbo.apache.org/zh-cn/overview/what/

    2024年02月13日
    瀏覽(28)
  • 實(shí)現(xiàn)聲明式鎖,支持分布式鎖自定義鎖、SpEL和結(jié)合事務(wù)

    實(shí)現(xiàn)聲明式鎖,支持分布式鎖自定義鎖、SpEL和結(jié)合事務(wù)

    目錄 2.實(shí)現(xiàn) 2.1 定義注解 2.2 定義鎖接口 2.3 鎖的實(shí)現(xiàn) 2.3.1 什么是SPI 2.3.2 通過SPI實(shí)現(xiàn)鎖的多個(gè)實(shí)現(xiàn)類 2.3.3 通過SPI自定義實(shí)現(xiàn)鎖 3.定義切面 3.1 切面實(shí)現(xiàn) 3.2 SpEL表達(dá)式獲取動(dòng)態(tài)key 3.3 鎖與事務(wù)的結(jié)合 4.測(cè)試 4.1 ReentrantLock測(cè)試 4.2 RedissonClient測(cè)試 4.3 自定義鎖測(cè)試 5.尾聲 5.1 todo list

    2023年04月19日
    瀏覽(82)
  • 【103期】RabbitMQ 實(shí)現(xiàn)多系統(tǒng)間的分布式事務(wù),保證數(shù)據(jù)一致性

    【103期】RabbitMQ 實(shí)現(xiàn)多系統(tǒng)間的分布式事務(wù),保證數(shù)據(jù)一致性

    org.springframework.boot spring-boot-starter-amqp mysql mysql-connector-java runtime org.projectlombok lombok true org.springframework.boot spring-boot-starter-jdbc com.alibaba fastjson 1.2.17 3.2.1.2配置文件內(nèi)容: server: port:?8080 spring: datasource: driver-class-name:?com.mysql.cj.jdbc.Driver url:?jdbc:mysql://localhost:3306/test?useUnicode=tru

    2024年04月14日
    瀏覽(32)
  • 分布式:一文吃透分布式事務(wù)和seata事務(wù)

    分布式:一文吃透分布式事務(wù)和seata事務(wù)

    什么是事務(wù) 事務(wù)是并發(fā)控制的單位,是用戶定義的一個(gè)操作序列。 事務(wù)特性 原子性(Atomicity): 事務(wù)是數(shù)據(jù)庫(kù)的邏輯工作單位,事務(wù)中包括的諸操作要么全做,要么全不做。 一致性(Consistency): 事務(wù)執(zhí)行的結(jié)果必須是使數(shù)據(jù)庫(kù)從一個(gè)一致性狀態(tài)變到另一個(gè)一致性狀態(tài)。一致性

    2024年02月07日
    瀏覽(21)
  • 【萬字長(zhǎng)文】SpringBoot整合Atomikos實(shí)現(xiàn)多數(shù)據(jù)源分布式事務(wù)(提供Gitee源碼)

    前言:在最近的實(shí)際開發(fā)的過程中,遇到了在多數(shù)據(jù)源的情況下要保證原子性的問題,這個(gè)問題當(dāng)時(shí)遇到了也是思考了一段時(shí)間,后來通過搜集大量資料與學(xué)習(xí),最后是采用了分布式事務(wù)來解決這個(gè)問題,在講解之前,在我往期的博客提前搭好了一個(gè)SpringBoot整合MyBatis搭建M

    2024年02月14日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包