国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

原來kafka也有事務(wù)啊,再也不擔(dān)心消息不一致了

這篇具有很好參考價值的文章主要介紹了原來kafka也有事務(wù)啊,再也不擔(dān)心消息不一致了。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

前言

現(xiàn)在假定這么一個業(yè)務(wù)場景,從kafka中的topic獲取消息數(shù)據(jù),經(jīng)過一定加工處理后,發(fā)送到另外一個topic中,要求整個過程消息不能丟失,也不能重復(fù)發(fā)送,即實(shí)現(xiàn)端到端的Exactly-Once精確一次消息投遞。這該如何實(shí)現(xiàn)呢?

原來kafka也有事務(wù)啊,再也不擔(dān)心消息不一致了

kafka事務(wù)介紹

針對上面的業(yè)務(wù)場景,kafka已經(jīng)替我們想到了,在kafka 0.11版本以后,引入了一個重大的特性:冪等性和事務(wù)。

冪等性

這里提到冪等性的原因,主要是因?yàn)槭聞?wù)的啟用必須要先開啟冪等性,那么什么是冪等性呢?

冪等性是指生產(chǎn)者無論向kafka broker發(fā)送多少次重復(fù)的數(shù)據(jù),broker 端只會持久化一條,保證數(shù)據(jù)不會重復(fù)。

冪等性通過生產(chǎn)者配置項(xiàng)enable.idempotence=true開啟,默認(rèn)情況下為true。

冪等性實(shí)現(xiàn)原理

原來kafka也有事務(wù)啊,再也不擔(dān)心消息不一致了

  1. 每條消息都有一個主鍵,這個主鍵由 <PID, Partition, SeqNumber>組成。
  • PIDProducerID,每個生產(chǎn)者啟動時,Kafka 都會給它分配一個 ID,ProducerID 是生產(chǎn)者的唯一標(biāo)識,需要注意的是,Kafka 重啟也會重新分配 PID。
  • Partition:消息需要發(fā)往的分區(qū)號。
  • SeqNumber:生產(chǎn)者,他會記錄自己所發(fā)送的消息,給他們分配一個自增的 ID,這個 ID 就是 SeqNumber,是該消息的唯一標(biāo)識,每發(fā)送一條消息,序列號加 1。
  1. 對于主鍵相同的數(shù)據(jù),kafka 是不會重復(fù)持久化的,它只會接收一條。

冪等性缺點(diǎn)

根據(jù)冪等性的原理,我們發(fā)現(xiàn)它存在下面的缺點(diǎn):

  • 只能保證單分區(qū)、單會話內(nèi)的數(shù)據(jù)不重復(fù)
  • kafka 掛掉,重新給生產(chǎn)者分配了 PID,還是有可能產(chǎn)生重復(fù)的數(shù)據(jù)

那么如何實(shí)現(xiàn)跨分區(qū)、kafka broker重啟也能保證不重復(fù)呢?這就要使用事務(wù)了。

事務(wù)

所謂事務(wù),就是要求保證原子性,要么全部成功,要么全部失敗。那么具體該如何開啟呢?

  1. kafka要想開啟事務(wù)必須要啟用冪等性,即生產(chǎn)者配置enable.idempotence=true
  2. kafka生產(chǎn)者需要配置唯一的事務(wù)idtransactional.id, 最好為其設(shè)置一個有意義的名字。
  3. kafka消費(fèi)端也有一個配置項(xiàng)isolation.level和事務(wù)有很大關(guān)系。
  • read_uncommitted:默認(rèn)值,消費(fèi)端應(yīng)用可以看到(消費(fèi)到)未提交的事務(wù),當(dāng)然對于已提交的事務(wù)也是可見的。
  • read_committed:消費(fèi)端應(yīng)用只能消費(fèi)到提交的事務(wù)內(nèi)的消息。

kafka事務(wù) API

現(xiàn)在我們用java的api來實(shí)現(xiàn)一下前面這個“消費(fèi)-處理-生產(chǎn)“的例子吧。

  1. 引入依賴
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>
  1. 創(chuàng)建事務(wù)的生產(chǎn)者
Properties prodcuerProps = new Properties();
// kafka地址
prodcuerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
// key序列化
prodcuerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value序列化
prodcuerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 啟用冪等性
producerProps.put("enable.idempotence", "true");
// 設(shè)置事務(wù)id
producerProps.put("transactional.id", "prod-1");
KafkaProducer<String, String> producer = new KafkaProducer(prodcuerProps);
  • enable.idempotence配置項(xiàng)目為true
  • 設(shè)置transactional.id
  1. 創(chuàng)建事務(wù)的消費(fèi)者
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put("group.id", "my-group-id");
// 設(shè)置consumer手動提交
consumerProps.put("enable.auto.commit", "false");
// 設(shè)置隔離級別,讀取事務(wù)已提交的消息
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
//訂閱主題
consumer.subscribe(Collections.singletonList("topic1"));
  • enable.auto.commit=false,設(shè)置手動提交消費(fèi)者offset
  • 設(shè)置isolation.level=read_committed,消費(fèi)事務(wù)已提交的消息
  1. 核心邏輯
// 初始化事務(wù) 
producer.initTransactions();
while(true) {
	// 拉取消息 
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
    if(!records.isEmpty()){
        // 準(zhǔn)備一個 hashmap 來記錄:"分區(qū)-消費(fèi)位移" 鍵值對
        HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
        // 開啟事務(wù) 
        producer.beginTransaction();
        try {
            // 獲取本批消息中所有的分區(qū)
            Set<TopicPartition> partitions = records.partitions();
            // 遍歷每個分區(qū)
            for (TopicPartition partition : partitions) {
                // 獲取該分區(qū)的消息
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                // 遍歷每條消息
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    // 執(zhí)行數(shù)據(jù)的業(yè)務(wù)處理邏輯
                    ProducerRecord<String, String> outRecord = new ProducerRecord<>("topic2", record.key(), record.value().toUpperCase());
                    // 將處理結(jié)果寫入 kafka
                    producer.send(outRecord);
                }

                // 將處理完的本分區(qū)對應(yīng)的消費(fèi)位移記錄到 hashmap 中
                long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 事務(wù)提交的是即將到來的偏移量,這意味著我們需要加 1
                offsetsMap.put(partition,new OffsetAndMetadata(offset+1));
            }
            // 向事務(wù)管理器提交消費(fèi)位移 
            producer.sendOffsetsToTransaction(offsetsMap,"groupid");
            // 提交事務(wù) 
            producer.commitTransaction();
        } catch(Exeception e) {
            e.printStackTrace();
            // 終止事務(wù) 
            producer.abortTransaction();
        }
    }
}
  • initTransactions(): 初始化事務(wù)
  • beginTransaction(): 開啟事務(wù)
  • sendOffsetsToTransaction(): 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
  • commitTransaction(): 提交事務(wù)
  • abortTransaction(): 放棄事務(wù)

kafka事務(wù)實(shí)現(xiàn)原理

kafka事務(wù)的實(shí)現(xiàn)引入了事務(wù)協(xié)調(diào)器,如下圖所示:

原來kafka也有事務(wù)啊,再也不擔(dān)心消息不一致了

  1. 生產(chǎn)者使用事務(wù)必須配置事務(wù)id, kafka根據(jù)事務(wù)id計算分配事務(wù)協(xié)調(diào)器
  2. 事務(wù)協(xié)調(diào)器返回pid,前面的冪等性中需要
  3. 開始發(fā)送消息到topic中,不過這些消息與普通的消息不同,它們帶著一個字段標(biāo)識自己是事務(wù)消息
  4. 當(dāng)生產(chǎn)者事務(wù)內(nèi)的消息發(fā)送完畢,會向事務(wù)協(xié)調(diào)器發(fā)送 commitabort 請求,等待 kafka 響應(yīng)
  5. 事務(wù)協(xié)調(diào)器收到請求后先持久化到內(nèi)置事務(wù)主題__transaction_state中,__transaction_state默認(rèn)有50個分區(qū),每個分區(qū)負(fù)責(zé)一部分事務(wù)。事務(wù)劃分是根據(jù)transactional.idhashcode%50,計算出該事務(wù)屬于哪個分區(qū)。 該分區(qū)Leader副本所在的broker節(jié)點(diǎn)即為這個transactional.id對應(yīng)的Transaction Coordinator節(jié)點(diǎn),這也是上面第一步中的計算邏輯。
  6. 事務(wù)協(xié)調(diào)器后臺會跟topic通信,告訴它們事務(wù)是成功還是失敗的。
  • 如果是成功,topic會匯報自己已經(jīng)收到消息,協(xié)調(diào)者收到主題的回應(yīng)便確認(rèn)了事務(wù)完成,并持久化這一結(jié)果。
  • 如果是失敗的,主題會把這個事務(wù)內(nèi)的消息丟棄,并匯報給協(xié)調(diào)者,協(xié)調(diào)者收到所有結(jié)果后再持久化這一信息,事務(wù)結(jié)束。
  1. 持久化第6步中的事務(wù)成功或者失敗的信息, 如果kafka broker配置max.transaction.timeout.ms之前既不提交也不中止事務(wù), kafka broker將中止事務(wù)本身。 此屬性的默認(rèn)值為 15 分鐘。

總結(jié)

本文講解了通過kafka事務(wù)可以實(shí)現(xiàn)端到端的精確一次的消息語義,通過事務(wù)機(jī)制,KAFKA 實(shí)現(xiàn)了對多個 topic 的多個 partition 的原子性的寫入,通過一個例子了解了一下如何使用事物。同時也簡單介紹了事務(wù)實(shí)現(xiàn)的原理,它底層必須要依賴kafka的冪等性機(jī)制,同時通過類似“二段提交”的方式保證事務(wù)的原子性。

歡迎關(guān)注個人公眾號【JAVA旭陽】交流學(xué)習(xí)!文章來源地址http://www.zghlxwxcb.cn/news/detail-472949.html

到了這里,關(guān)于原來kafka也有事務(wù)啊,再也不擔(dān)心消息不一致了的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 初識Linux(中).媽媽再也不用擔(dān)心我Linux找不到門了。

    初識Linux(中).媽媽再也不用擔(dān)心我Linux找不到門了。

    “我會定期分享我的學(xué)習(xí)和工作經(jīng)驗(yàn),也歡迎大家留言和交流,讓我們共同學(xué)習(xí)和進(jìn)步!感謝大家的支持!” 系列文章 初識Linux(上).媽媽再也不用擔(dān)心我Linux找不到門了。 初識Linux(中).媽媽再也不用擔(dān)心我Linux找不到門了。 初識Linux(下).媽媽再也不用擔(dān)心我Linux找不到門了。

    2024年02月05日
    瀏覽(26)
  • 初識Linux(下).媽媽再也不用擔(dān)心我Linux找不到門了

    初識Linux(下).媽媽再也不用擔(dān)心我Linux找不到門了

    “我會定期分享我的學(xué)習(xí)和工作經(jīng)驗(yàn),也歡迎大家留言和交流,讓我們共同學(xué)習(xí)和進(jìn)步!感謝大家的支持!” 系列文章 初識Linux(上).媽媽再也不用擔(dān)心我Linux找不到門了。 初識Linux(中).媽媽再也不用擔(dān)心我Linux找不到門了。 初識Linux(下).媽媽再也不用擔(dān)心我Linux找不到門了。

    2024年02月05日
    瀏覽(92)
  • 再也不用擔(dān)心變量類型錯誤!學(xué)會JS中如何輕松檢查變量類型

    今天要分享的問題就是: 如何在JS中檢查一個變量的類型? 先上結(jié)論: 如果判斷的是基本數(shù)據(jù)類型或JavaScript內(nèi)置對象,使用toString;如果要判斷的是自定義類型,請使用instanceof。 在 ECMAScript 規(guī)范中,共定義了 7 種數(shù)據(jù)類型,分為 基本類型 和 引用類型 兩大類。 基本類型

    2024年02月08日
    瀏覽(18)
  • Spring Boot 項(xiàng)目代碼混淆,實(shí)戰(zhàn)來了,再也不用擔(dān)心代碼泄露了!

    Spring Boot 項(xiàng)目代碼混淆,實(shí)戰(zhàn)來了,再也不用擔(dān)心代碼泄露了!

    簡單就是把代碼跑一哈,然后我們的代碼 .java文件 就被編譯成了 .class 文件 就是針對編譯生成的 jar/war 包 里面的 .class 文件 逆向還原回來,可以看到你的代碼寫的啥。 比較常用的反編譯工具 JD-GUI ,直接把編譯好的jar丟進(jìn)去,大部分都能反編譯看到源碼: 那如果不想給別人反

    2023年04月26日
    瀏覽(18)
  • 多數(shù)人都不會用,有了這些視頻APP,再也不擔(dān)心失效!

    多數(shù)人都不會用,有了這些視頻APP,再也不擔(dān)心失效!

    阿虛儲物間里一大熱門下載內(nèi)容就是影視類APP了 但相信有這類需求的粉絲都知道:這類APP要么你忍受煩人的廣告,要么就找去廣告版, 但去廣告版有個最大的問題就是經(jīng)!常!失!效! 其實(shí)阿虛早就介紹過不少更穩(wěn)定的影視APP了,只是可能很多粉絲都沒注意到 今天阿虛就來

    2024年02月11日
    瀏覽(75)
  • 有了這些開源 Icon 庫,媽媽再也不擔(dān)心我的 UI 太丑啦!

    有了這些開源 Icon 庫,媽媽再也不擔(dān)心我的 UI 太丑啦!

    Remix Icon 是一套面向設(shè)計師和開發(fā)者的開源圖標(biāo)庫,所有的圖標(biāo)均可免費(fèi)用于個人項(xiàng)目和商業(yè)項(xiàng)目。 與拼湊混搭的圖標(biāo)庫不同,Remix Icon 的每一枚圖標(biāo)都是由設(shè)計師按照統(tǒng)一規(guī)范精心繪制的,在擁有完美像素對齊的基礎(chǔ)上,確保每一枚圖標(biāo)風(fēng)格一致且簡潔易讀。 圖標(biāo)以 24x24

    2024年02月11日
    瀏覽(21)
  • 解析不同種類的StableDiffusion模型Models,再也不用擔(dān)心該用什么了

    解析不同種類的StableDiffusion模型Models,再也不用擔(dān)心該用什么了

    Stable Diffusion是一個基于Latent Diffusion Models(潛在擴(kuò)散模型,LDMs)的文圖生成(text-to-image)模型。具體來說,Stable Diffusion在 LAION-5B 的一個子集上訓(xùn)練了一個Latent Diffusion Models,該模型專門用于文圖生成。Latent Diffusion Models通過在一個潛在表示空間中迭代“去噪”數(shù)據(jù)來生成圖

    2023年04月19日
    瀏覽(18)
  • 使用ChatGPT+MindShow一鍵生成PPT,以后再也不用擔(dān)心制作PPT啦

    使用ChatGPT+MindShow一鍵生成PPT,以后再也不用擔(dān)心制作PPT啦

    ?? 作者簡介:大家好,我是阿牛,全棧領(lǐng)域優(yōu)質(zhì)創(chuàng)作者。?? ?? 個人主頁:館主阿牛?? ?? 支持我:點(diǎn)贊??+收藏??+留言?? ??格言:迄今所有人生都大寫著失敗,但不妨礙我繼續(xù)向前!?? 我們經(jīng)常會有制作ppt的需求,尤其大學(xué)里面的小組報告,什么班會團(tuán)課之類的,

    2023年04月23日
    瀏覽(19)
  • 用Python制作搶購腳本,自動搶購飛天茅臺,再也不要擔(dān)心手慢無了

    用Python制作搶購腳本,自動搶購飛天茅臺,再也不要擔(dān)心手慢無了

    前段時間老逛刷朋友圈,有個朋友發(fā)文說:每天早上 10 點(diǎn)守著,花了七天終于搶到了?。。〔⑴渖狭艘粋€茅臺的圖片。 老逛不喝酒也不懂酒,就去問了這哥們啥情況,這哥們說在京東搶了一瓶茅臺酒,只花了 1499 元,這瓶酒原價 3000 左右。 我去京東看了看,搜索「茅臺」第

    2024年02月05日
    瀏覽(26)
  • Z-Libary最新地址檢測,再也不用擔(dān)心找不到ZLibary了

    Z-Libary最新地址檢測,再也不用擔(dān)心找不到ZLibary了

    Z-Library。世界上最大的數(shù)字圖書館。?如果你知道了一本書的書名,那在Z-Library上基本上都可以找到進(jìn)行下載, Z-Library 有很多入口,分為官方和民間鏡像。官方自己做了個跳轉(zhuǎn)站點(diǎn),會自動尋找官方可用網(wǎng)站。一般用官方入口即可,但也存在所有官方入口均封閉情況,此時建議

    2024年02月08日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包