国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

從Flink的Kafka消費(fèi)者看算子聯(lián)合列表狀態(tài)的使用

這篇具有很好參考價值的文章主要介紹了從Flink的Kafka消費(fèi)者看算子聯(lián)合列表狀態(tài)的使用。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

背景

算子的聯(lián)合列表狀態(tài)是平時使用的比較少的一種狀態(tài),本文通過kafka的消費(fèi)者實(shí)現(xiàn)來看一下怎么使用算子列表聯(lián)合狀態(tài)

算子聯(lián)合列表狀態(tài)

首先我們看一下算子聯(lián)合列表狀態(tài)的在進(jìn)行故障恢復(fù)或者從某個保存點(diǎn)進(jìn)行擴(kuò)縮容啟動應(yīng)用時狀態(tài)的恢復(fù)情況
從Flink的Kafka消費(fèi)者看算子聯(lián)合列表狀態(tài)的使用,flink,大數(shù)據(jù),kafka,flink,kafka,大數(shù)據(jù)
算子聯(lián)合列表狀態(tài)主要由這兩個方法處理:
1初始化方法

public final void initializeState(FunctionInitializationContext context) throws Exception {

        OperatorStateStore stateStore = context.getOperatorStateStore();
		// 在初始化方法中獲取聯(lián)合列表狀態(tài)
        this.unionOffsetStates =
                stateStore.getUnionListState(
                        new ListStateDescriptor<>(
                                OFFSETS_STATE_NAME,
                                createStateSerializer(getRuntimeContext().getExecutionConfig())));

        if (context.isRestored()) {
            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把聯(lián)合列表狀態(tài)的數(shù)據(jù)都恢復(fù)成類的本地變量中
            // populate actual holder for restored state
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }

            LOG.info(
                    "Consumer subtask {} restored state: {}.",
                    getRuntimeContext().getIndexOfThisSubtask(),
                    restoredState);
        } else {
            LOG.info(
                    "Consumer subtask {} has no restore state.",
                    getRuntimeContext().getIndexOfThisSubtask());
        }
    }

2.開始通知檢查點(diǎn)開始的方法:文章來源地址http://www.zghlxwxcb.cn/news/detail-718737.html

public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            unionOffsetStates.clear();

            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
                        subscribedPartitionsToStartOffsets.entrySet()) {
                        // 進(jìn)行checkpoint時,把數(shù)據(jù)保存到聯(lián)合列表狀態(tài)中進(jìn)行保存
                    unionOffsetStates.add(
                            Tuple2.of(
                                    subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call
                    // can happen
                    // on this function at a time: either snapshotState() or
                    // notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call
                    // can happen
                    // on this function at a time: either snapshotState() or
                    // notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
                        currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(
                                    kafkaTopicPartitionLongEntry.getKey(),
                                    kafkaTopicPartitionLongEntry.getValue()));
                }
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }

到了這里,關(guān)于從Flink的Kafka消費(fèi)者看算子聯(lián)合列表狀態(tài)的使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 13、Kafka ------ kafka 消費(fèi)者API用法(消費(fèi)者消費(fèi)消息代碼演示)

    13、Kafka ------ kafka 消費(fèi)者API用法(消費(fèi)者消費(fèi)消息代碼演示)

    消費(fèi)者API的核心類是 KafkaConsumer,它提供了如下常用方法: 下面這些方法都體現(xiàn)了Kafka是一個數(shù)據(jù)流平臺,消費(fèi)者通過這些方法可以從分區(qū)的任意位置、重新開始讀取數(shù)據(jù)。 根據(jù)KafkaConsumer不難看出,使用消費(fèi)者API拉取消息很簡單,基本只要幾步: 1、創(chuàng)建KafkaConsumer對象,創(chuàng)建

    2024年04月11日
    瀏覽(30)
  • 分布式 - 消息隊(duì)列Kafka:Kafka消費(fèi)者和消費(fèi)者組

    分布式 - 消息隊(duì)列Kafka:Kafka消費(fèi)者和消費(fèi)者組

    1. Kafka 消費(fèi)者是什么? 消費(fèi)者負(fù)責(zé)訂閱Kafka中的主題,并且從訂閱的主題上拉取消息。與其他一些消息中間件不同的是:在Kafka的消費(fèi)理念中還有一層消費(fèi)組的概念,每個消費(fèi)者都有一個對應(yīng)的消費(fèi)組。當(dāng)消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費(fèi)組中的一個消費(fèi)者

    2024年02月13日
    瀏覽(29)
  • kafka配置多個消費(fèi)者groupid kafka多個消費(fèi)者消費(fèi)同一個partition(java)

    kafka配置多個消費(fèi)者groupid kafka多個消費(fèi)者消費(fèi)同一個partition(java)

    kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺。kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動作流數(shù)據(jù)。 kafka中partition類似數(shù)據(jù)庫中的分表數(shù)據(jù),可以起到水平擴(kuò)展數(shù)據(jù)的目的,比如有a,b,c,d,e,f 6個數(shù)據(jù),某個topic有兩個partition,一

    2024年01月22日
    瀏覽(161)
  • Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組詳細(xì)消費(fèi)流程圖解及消費(fèi)者重要參數(shù))

    Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組詳細(xì)消費(fèi)流程圖解及消費(fèi)者重要參數(shù))

    創(chuàng)建一個消費(fèi)者網(wǎng)絡(luò)連接客戶端,主要用于與kafka集群進(jìn)行交互,如下圖所示: 調(diào)用sendFetches發(fā)送消費(fèi)請求,如下圖所示: (1)、Fetch.min.bytes每批次最小抓取大小,默認(rèn)1字節(jié) (2)、fetch.max.wait.ms一批數(shù)據(jù)最小值未達(dá)到的超時時間,默認(rèn)500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    瀏覽(21)
  • 10、Kafka ------ 消費(fèi)者組 和 消費(fèi)者實(shí)例,分區(qū) 和 消費(fèi)者實(shí)例 之間的分配策略

    10、Kafka ------ 消費(fèi)者組 和 消費(fèi)者實(shí)例,分區(qū) 和 消費(fèi)者實(shí)例 之間的分配策略

    形象來說:你可以把主題內(nèi)的多個分區(qū)當(dāng)成多個子任務(wù)、多個子任務(wù)組成項(xiàng)目,每個消費(fèi)者實(shí)例就相當(dāng)于一個員工,假如你們 team 包含2個員工。 同理: 同一主題下,每個分區(qū)最多只會分給同一個組內(nèi)的一個消費(fèi)者實(shí)例 消費(fèi)者以組的名義來訂閱主題,前面的 kafka-console-consu

    2024年01月19日
    瀏覽(19)
  • Kafka消費(fèi)者不消費(fèi)數(shù)據(jù)

    Kafka消費(fèi)者不消費(fèi)數(shù)據(jù)

    背景: 工作往往是千篇一律,真正能學(xué)到點(diǎn)知識都是在上線后。使用Skywalking+Kafka+ES進(jìn)行應(yīng)用監(jiān)控。 現(xiàn)象: 公司使用Skywalking在開發(fā)測試環(huán)境中Kafka順利消費(fèi)數(shù)據(jù),到了UAT環(huán)境一開始還正常,后面接入了更多的應(yīng)用后出現(xiàn)了問題:OAP服務(wù)正常但是ES里不再有數(shù)據(jù)。 排查: 通過

    2023年04月14日
    瀏覽(54)
  • Kafka-消費(fèi)者組消費(fèi)流程

    Kafka-消費(fèi)者組消費(fèi)流程

    消費(fèi)者向kafka集群發(fā)送消費(fèi)請求,消費(fèi)者客戶端默認(rèn)每次從kafka集群拉取50M數(shù)據(jù),放到緩沖隊(duì)列中,消費(fèi)者從緩沖隊(duì)列中每次拉取500條數(shù)據(jù)進(jìn)行消費(fèi)。? ?

    2024年02月12日
    瀏覽(23)
  • Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組原理)

    Kafka3.0.0版本——消費(fèi)者(消費(fèi)者組原理)

    1.1、消費(fèi)者組概述 Consumer Group(CG):消費(fèi)者組,由多個consumer組成。形成一個消費(fèi)者組的條件,是所有消費(fèi)者的groupid相同。 注意: (1)、消費(fèi)者組內(nèi)每個消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費(fèi)者消費(fèi)。 (2)、消費(fèi)者組之間互不影響。所有的消費(fèi)者

    2024年02月09日
    瀏覽(28)
  • 【Kafka】Kafka消費(fèi)者

    【Kafka】Kafka消費(fèi)者

    pull(拉)模式:consumer采用從broker中主動拉取數(shù)據(jù)。 Kafka采用這種方式。 push(推)模式:Kafka沒有采用這種方式,因?yàn)橛蒪roker決定消息發(fā)送速率,很難適應(yīng)所有的消費(fèi)者的消費(fèi)速率。例如推送的速度是50m/s,consumer1和consumer2舊來不及處理消息。 pull模式不足之處是,如果Kafka沒有數(shù)

    2024年02月13日
    瀏覽(25)
  • Kafka消費(fèi)者無法消費(fèi)數(shù)據(jù),解決

    作為一個在項(xiàng)目中邊學(xué)邊用的實(shí)習(xí)生,真的被昨天還好好的今天就不能消費(fèi)數(shù)據(jù)的kafka折磨到了,下面提供一點(diǎn)建議,希望能對大家有所幫助。 //操作前集群都關(guān)了 1.首先去kafka-home的config目錄下找到server.properties文件, 加入advertised.listeners=PLAINTEXT://ip:9092? ? 如果有配置liste

    2024年02月17日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包