国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink State 狀態(tài)管理

這篇具有很好參考價值的文章主要介紹了Flink State 狀態(tài)管理。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。


前言

狀態(tài)在Flink中叫做State,用來保存中間計(jì)算結(jié)果或者緩存數(shù)據(jù)。要做到比較好的狀態(tài)管理,需要考慮以下幾點(diǎn)內(nèi)容:

  • 狀態(tài)數(shù)據(jù)的存儲和訪問
    在Task內(nèi)部,如何高效地保存狀態(tài)數(shù)據(jù)和使用狀態(tài)數(shù)據(jù)。
  • 狀態(tài)數(shù)據(jù)的備份和恢復(fù)
    作業(yè)失敗是無法避免的,那么就要考慮如何高效地將狀態(tài)數(shù)據(jù)保存下來,避免狀態(tài)備份降低集群的吞吐量,并且在Failover時恢復(fù)作業(yè)到失敗前的狀態(tài)。
  • 狀態(tài)數(shù)據(jù)的劃分和動態(tài)擴(kuò)容
    作業(yè)在集群內(nèi)并行執(zhí)行那么就要思考對于作業(yè)的Task而言如何使用統(tǒng)一的方式對狀態(tài)數(shù)據(jù)進(jìn)行切分,在作業(yè)修改并行度導(dǎo)致Task數(shù)據(jù)改變的時候,如何確保正確地恢復(fù)。

一、狀態(tài)分類

State按照是否有Key劃分KeyedState和OperatorState兩種。按照數(shù)據(jù)結(jié)構(gòu)不同,flink定義了多種state,分別應(yīng)用于不同的場景,具體實(shí)現(xiàn)如下:ValueState、ListState、MapState、ReducingState、AggregatingState。

  • ValueState: 保存一個可以更新和檢索的值(如上所述,每個值都對應(yīng)到當(dāng)前的輸入數(shù)據(jù)的 key,因此算子接收到的每個 key 都可能對應(yīng)一個值)。 這個值可以通過 update(T) 進(jìn)行更新,通過 T value() 進(jìn)行檢索。

  • ListState: 保存一個元素的列表。可以往這個列表中追加數(shù)據(jù),并在當(dāng)前的列表上進(jìn)行檢索。可以通過 add(T) 或者 addAll(List) 進(jìn)行添加元素,通過 Iterable get() 獲得整個列表。還可以通過 update(List) 覆蓋當(dāng)前的列表。

  • ReducingState: 保存一個單值,表示添加到狀態(tài)的所有值的聚合。接口與 ListState 類似,但使用 add(T) 增加元素,會使用提供的 ReduceFunction 進(jìn)行聚合。

  • AggregatingState<IN, OUT>: 保留一個單值,表示添加到狀態(tài)的所有值的聚合。和 ReducingState 相反的是, 聚合類型可能與 添加到狀態(tài)的元素的類型不同。 接口與 ListState 類似,但使用 add(IN) 添加的元素會用指定的 AggregateFunction 進(jìn)行聚合。

  • MapState<UK, UV>: 維護(hù)了一個映射列表。 你可以添加鍵值對到狀態(tài)中,也可以獲得反映當(dāng)前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 檢索特定 key。 使用 entries(),keys() 和 values() 分別檢索映射、鍵和值的可迭代視圖。你還可以通過 isEmpty() 來判斷是否包含任何鍵值對。

二、keyed代碼示例

更多代碼示例請下載Flink State體系剖析以及案例實(shí)踐

ListState

代碼如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-796627.html


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 需求:當(dāng)接收到的相同 key 的元素個數(shù)等于 3個,就計(jì)算這些元素的 value 的平均值。
 * 計(jì)算keyed stream中每3個元素的 value 的平均值
 */
public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設(shè)置并行度
        env.setParallelism(12);
        //獲取數(shù)據(jù)源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(2L, 2L),
                        Tuple2.of(2L, 6L));
        /**
         * 1L, 3L
         * 1L, 7L
         * 1L, 5L
         *
         * 1L,5.0 double
         *
         * 2L, 4L
         * 2L, 2L
         * 2L, 6L
         *
         * 2L,4.0 double
         *
         *
         */


        // 輸出:
        //(1,5.0)
        //(2,4.0)
        dataStreamSource
                .keyBy(tuple -> tuple.f0) //分組
                .flatMap(new CountAverageWithListState())
                .print();


        env.execute("TestStatefulApi");
    }
}

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;

import java.util.Collections;
import java.util.List;

/**
 *  ListState<T> :這個狀態(tài)為每一個 key 保存集合的值
 *      get() 獲取狀態(tài)值
 *      add() / addAll() 更新狀態(tài)值,將數(shù)據(jù)放到狀態(tài)中
 *      clear() 清除狀態(tài)
 */
public class CountAverageWithListState
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    // managed keyed state
    /**
     * ValueState : 里面只能存一條元素
     * ListState : 里面可以存很多數(shù)據(jù)
     */
    private ListState<Tuple2<Long, Long>> elementsByKey;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注冊狀態(tài)
        ListStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ListStateDescriptor<Tuple2<Long, Long>>(
                        "average",  // 狀態(tài)的名字
                        Types.TUPLE(Types.LONG, Types.LONG)); // 狀態(tài)存儲的數(shù)據(jù)類型
        elementsByKey = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws Exception {
        // 拿到當(dāng)前的 key 的狀態(tài)值
        Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();

        // 如果狀態(tài)值還沒有初始化,則初始化
        if (currentState == null) {
            elementsByKey.addAll(Collections.emptyList());
        }

        // 更新狀態(tài)
        elementsByKey.add(element);


        // 判斷,如果當(dāng)前的 key 出現(xiàn)了 3 次,則需要計(jì)算平均值,并且輸出
        List<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());

        if (allElements.size() == 3) {
            long count = 0;
            long sum = 0;
            for (Tuple2<Long, Long> ele : allElements) {
                count++;
                sum += ele.f1;
            }
            double avg = (double) sum / count;
            out.collect(Tuple2.of(element.f0, avg));
            // 清除狀態(tài)
            elementsByKey.clear();
        }
    }
}

MapState

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.UUID;

/**
 *  MapState<K, V> :這個狀態(tài)為每一個 key 保存一個 Map 集合
 *      put() 將對應(yīng)的 key 的鍵值對放到狀態(tài)中
 *      values() 拿到 MapState 中所有的 value
 *      clear() 清除狀態(tài)
 */
public class CountAverageWithMapState
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    // managed keyed state
    //1. MapState :key 是一個唯一的值,value 是接收到的相同的 key 對應(yīng)的 value 的值
    /**
     * MapState:
     *      Map集合的特點(diǎn),相同key,會覆蓋數(shù)據(jù)。
     */
    private MapState<String, Long> mapState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注冊狀態(tài)
        MapStateDescriptor<String, Long> descriptor =
                new MapStateDescriptor<String, Long>(
                        "average",  // 狀態(tài)的名字
                        String.class, Long.class); // 狀態(tài)存儲的數(shù)據(jù)類型
        mapState = getRuntimeContext().getMapState(descriptor);
    }

    /**
     *
     * @param element
     * @param out
     * @throws Exception
     */
    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws Exception {

        mapState.put(UUID.randomUUID().toString(), element.f1); //list

        // 判斷,如果當(dāng)前的 key 出現(xiàn)了 3 次,則需要計(jì)算平均值,并且輸出
        List<Long> allElements = Lists.newArrayList(mapState.values());

        if (allElements.size() == 3) {
            long count = 0;
            long sum = 0;
            for (Long ele : allElements) {
                count++;
                sum += ele;
            }
            double avg = (double) sum / count;
            //
            out.collect(Tuple2.of(element.f0, avg));
            // 清除狀態(tài)
            mapState.clear();

        }
    }
}

總結(jié)

  1. 是否存在當(dāng)前處理的 key(current key):operator state 是沒有當(dāng)前 key 的概念,而 keyed
    state 的數(shù)值總是與一個 current key 對應(yīng)。
  2. 存儲對象是否 on heap: 目前 operator state backend 僅有一種 on-heap 的實(shí)現(xiàn);而 keyed state
    backend 有 on-heap 和 off-heap(RocksDB)的多種實(shí)現(xiàn)。
  3. 是否需要手動聲明快照(snapshot)和恢復(fù) (restore) 方法:operator state 需要手動實(shí)現(xiàn)
    snapshot 和 restore 方法;而 keyed state 則由 backend 自行實(shí)現(xiàn),對用戶透明。
  4. 數(shù)據(jù)大?。阂话愣裕覀冋J(rèn)為 operator state 的數(shù)據(jù)規(guī)模是比較小的;認(rèn)為 keyed state 規(guī)模是
    相對比較大的。需要注意的是,這是一個經(jīng)驗(yàn)判斷,不是一個絕對的判斷區(qū)分標(biāo)準(zhǔn)。
    更多內(nèi)容和代碼示例請下載Flink State體系剖析以及案例實(shí)踐

到了這里,關(guān)于Flink State 狀態(tài)管理的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink State backend狀態(tài)后端

    Flink在v1.12到v1.14的改進(jìn)當(dāng)中,其狀態(tài)后端也發(fā)生了變化。老版本的狀態(tài)后端有三個,分別是MemoryStateBackend、FsStateBackend、RocksDBStateBackend,在flink1.14中,這些狀態(tài)已經(jīng)被廢棄了,新版本的狀態(tài)后端是 HashMapStateBackend、EmbeddedRocksDBStateBackend。 有狀態(tài)流應(yīng)用中的檢查點(diǎn)(checkpoint),

    2024年01月25日
    瀏覽(26)
  • Flink理論—容錯之狀態(tài)后端(State Backends)

    Flink理論—容錯之狀態(tài)后端(State Backends)

    Flink 使用流重放 和 檢查點(diǎn) 的組合來實(shí)現(xiàn)容錯。檢查點(diǎn)標(biāo)記每個輸入流中的特定點(diǎn)以及每個運(yùn)算符的相應(yīng)狀態(tài)。通過恢復(fù)運(yùn)算符的狀態(tài)并從檢查點(diǎn)點(diǎn)重放記錄,可以從檢查點(diǎn)恢復(fù)流數(shù)據(jù)流,同時保持一致性 容錯機(jī)制不斷地繪制分布式流數(shù)據(jù)流的快照。對于小狀態(tài)的流式應(yīng)用程

    2024年02月20日
    瀏覽(24)
  • Flink狀態(tài)詳解:什么是時狀態(tài)(state)?狀態(tài)描述(StateDescriptor)及其重要性

    Flink狀態(tài)詳解:什么是時狀態(tài)(state)?狀態(tài)描述(StateDescriptor)及其重要性

    了解Flink中的狀態(tài)概念及其在流處理中的重要性。探討狀態(tài)在有狀態(tài)計(jì)算中的作用,狀態(tài)描述符(StateDescriptor)的基本概念和用法。理解狀態(tài)在Flink任務(wù)中的維護(hù)、恢復(fù)和與算子的關(guān)聯(lián)。

    2024年02月08日
    瀏覽(28)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介紹及示例(2) - operator state

    【flink番外篇】7、flink的State(Keyed State和operator state)介紹及示例(2) - operator state

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月22日
    瀏覽(29)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介紹及示例(1) - Keyed State

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(23)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介紹及示例 - 完整版

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(34)
  • flink學(xué)習(xí)之state

    state作用 保留當(dāng)前key的歷史狀態(tài)。 state用法 ListStateInteger vipList = getRuntimeContext().getListState(new ListStateDescriptorInteger(\\\"vipList\\\", TypeInformation.of(Integer.class))); 有valueState listState mapstate 。冒失沒有setstate state案例 比如起點(diǎn)的小說不能被下載。別人只能通過截屏,提取文字的方式盜版小

    2024年02月09日
    瀏覽(17)
  • flink 的 State

    flink 的 State

    目錄 一、前言 二、什么是State 2.1:什么時候需要?dú)v史數(shù)據(jù) 2.2:為什么要容錯,以及checkpoint如何進(jìn)行容錯 2.3:state basckend 又是什么 三、有哪些常見的是 State 四、 State的使用 五、State backend 5.1??MemoryStateBackend: 5.2? FsStatebackend: 5.3??RocksDBStateBackend: 六、Checkpoint 七、 Deep

    2023年04月18日
    瀏覽(19)
  • Flink_state 的優(yōu)化與 remote_state 的探索

    Flink_state 的優(yōu)化與 remote_state 的探索

    摘要:本文整理自 bilibili 資深開發(fā)工程師張楊,在 Flink Forward Asia 2022 核心技術(shù)專場的分享。本篇內(nèi)容主要分為四個部分: 相關(guān)背景 state 壓縮優(yōu)化 Remote state 探索 未來規(guī)劃 點(diǎn)擊查看原文視頻 演講PPT 1.1 業(yè)務(wù)概況 從業(yè)務(wù)規(guī)模來講,B 站目前大約是 4000+的 Flink 任務(wù),其中 95%是

    2024年02月11日
    瀏覽(21)
  • Flink源碼之State創(chuàng)建流程

    Flink源碼之State創(chuàng)建流程

    StreamOperatorStateHandler 在StreamTask啟動初始化時通過StreamTaskStateInitializerImpl::streamOperatorStateContext會為每個StreamOperator 創(chuàng)建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有個StreamOperatorStateHandler成員變量,調(diào)用AbstractStreamOperator::initializeState方法中會初始化StreamOperatorStateH

    2024年02月12日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包