国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

《Flink學習筆記》——第八章 狀態(tài)管理

這篇具有很好參考價值的文章主要介紹了《Flink學習筆記》——第八章 狀態(tài)管理。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

8.1 Flink中的狀態(tài)

8.1.1 概述

在Flink中,算子任務可以分為無狀態(tài)和有狀態(tài)兩種情況。

**無狀態(tài)的算子:**每個事件不依賴其它數據,自己處理完就輸出,也不需要依賴中間結果。例如:打印操作,每個數據只需要它本身就可以完成。

**有狀態(tài)的算子:**事件需要依賴中間或者外其它數據才能完成計算。比如計算累加和,我們需要記錄當前的和是多少,等下一個數據來的時候我們直接將當前和加上該數更新當前累加和。所以我們需要保存當前和。而這里的中間結果和其它數據就是“狀態(tài)”。

8.1.2 狀態(tài)的分類
1)托管狀態(tài)和原始狀態(tài)

Flink的狀態(tài)有兩種:

  • 托管狀態(tài):由Flink統(tǒng)一管理的,狀態(tài)的存儲訪問、故障恢復和重組等一系列問題都由Flink實現,我們只需要調接口就可以。

  • 原始狀態(tài):自定義的,相當于開辟了一塊內存,需要我們自己管理,實現狀態(tài)的序列化和故障恢復。

通常我們采用Flink托管狀態(tài)來實現需求。

2)算子狀態(tài)和按鍵分區(qū)狀態(tài)

托管狀態(tài)又可以分為兩類:

  • 算子狀態(tài):只對當前并行子任務實例有效,這就意味著對于一個并行子任務,占據了一個“分區(qū)”,它所處理的所有數據都會訪問到相同的狀態(tài)。

    《Flink學習筆記》——第八章 狀態(tài)管理,# Flink,flink,學習,筆記

  • 按鍵分區(qū)狀態(tài):狀態(tài)是根據輸入流中定義的鍵來維護和訪問的,所以只能定義在按鍵分區(qū)流中,也就是keyBy之后。

    《Flink學習筆記》——第八章 狀態(tài)管理,# Flink,flink,學習,筆記

1.另外,也可以通過富函數類(Rich Function)來自定義Keyed State,所以只要提供了富函數類接口的算子,也都可以使用Keyed State。所以即使是map、filter這樣無狀態(tài)的基本轉換算子,我們也可以通過富函數類給它們“追加”Keyed State。所以從這個角度講,Flink中所有的算子都可以是有狀態(tài)的。

2.無論是Keyed State還是Operator State,它們都是在本地實例上維護的,也就是說每個并行子任務維護著對應的狀態(tài),算子的子任務之間狀態(tài)不共享。

8.2 按鍵分區(qū)狀態(tài)

任務按照鍵key來訪問和維護的狀態(tài),特點就是以key為作用范圍進行隔離

實際的值如何存儲,或者說存儲結構有以下類型:

8.2.1 值狀態(tài)

狀態(tài)中只保留一個“值”。源碼中定義如下:

public interface ValueState<T> extends State {
    T value() throws IOException;
    void update(T value) throws IOException;
}

T value():獲取當前狀態(tài)的值;

update(T value):對狀態(tài)進行更新,傳入的參數value就是要覆寫的狀態(tài)值

? 在具體使用時,為了讓運行時上下文清楚到底是哪個狀態(tài),我們還需要創(chuàng)建一個“狀態(tài)描述器”(StateDescriptor)來提供狀態(tài)的基本信息。例如源碼中,ValueState的狀態(tài)描述器構造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) {
  super(name, typeClass, null);
}

**案例需求:**檢測每種傳感器的水位值,如果連續(xù)的兩個水位值超過10,就輸出報警。

public class KeyValueStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDs = env.socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDs.keyBy(WaterSensor::getId).process(new MyKeyedProFunc()).print();
        env.execute();
    }
}
package com.zlin.state.po;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.util.Objects;

/**
 * @author ZLin
 * @since 2023/8/27
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class WaterSensor {
    // 傳感器id
    public String id;
    // 時間戳
    public Long ts;
    // 水位值
    public Integer vc;

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        WaterSensor that = (WaterSensor) o;
        return Objects.equals(id, that.id) &&
                Objects.equals(ts, that.ts) &&
                Objects.equals(vc, that.vc);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }

}
public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
    @Override
    public WaterSensor map(String value) throws Exception {
        String[] values = value.split(",");
        return new WaterSensor(values[0], Long.valueOf(values[1]), Integer.valueOf(values[2]));
    }
}
public class MyKeyedProFunc extends KeyedProcessFunction<String, WaterSensor, String> {
    // TODO 1.定義狀態(tài)
    ValueState<Integer> lastVcState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // TODO 2.在open方法中,初始化狀態(tài)
        // 狀態(tài)描述器兩個參數:第一個參數,起個名字,不重復;第二個參數,存儲的類型
        lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
    }


    @Override
    public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context context, Collector<String> out) throws Exception {
        //     lastVcState.value();  // 取出 本組 值狀態(tài) 的數據
        //     lastVcState.update(); // 更新 本組 值狀態(tài) 的數據
        //     lastVcState.clear();  // 清除 本組 值狀態(tài) 的數據

        // 1. 取出上一條數據的水位值(Integer默認值是null,判斷)
        int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();

        // 2. 求差值的絕對值,判斷是否超過10
        Integer vc = value.getVc();
        if (Math.abs(vc - lastVc) > 10) {
            out.collect("傳感器=" + value.getId() + "==>當前水位值=" + vc + ",與上一條水位值=" + lastVc + ",相差超過10!!?。?);
        }

        // 3. 更新狀態(tài)里的水位值
        lastVcState.update(vc);

    }
}
8.2.2 列表狀態(tài)

將需要保存的數據,以列表(List)的形式組織起來。在ListState接口中同樣有一個類型參數T,表示列表中數據的類型。ListState也提供了一系列的方法來操作狀態(tài),使用方式與一般的List非常相似。

類似地,ListState的狀態(tài)描述器就叫作ListStateDescriptor

ListState:

  • Iterable get():獲取當前的列表狀態(tài),返回的是一個可迭代類型Iterable;

  • update(List values):傳入一個列表values,直接對狀態(tài)進行覆蓋;

  • add(T value):在狀態(tài)列表中添加一個元素value;

  • addAll(List values):向列表中添加多個元素,以列表values形式傳入

  • clear():清空

案例:針對每種傳感器輸出最高的3個水位值

package com.zlin.state;

import com.zlin.state.func.WaterSensorMapFunction;
import com.zlin.state.po.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * @author ZLin
 * @since 2023/8/27
 */
public class KeyedListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            ListState<Integer> vcListState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                // 1.來一條,存到list狀態(tài)里
                                vcListState.add(value.getVc());

                                // 2.從list狀態(tài)拿出來(Iterable), 拷貝到一個List中,排序, 只留3個最大的
                                Iterable<Integer> vcListIt = vcListState.get();
                                // 2.1 拷貝到List中
                                List<Integer> vcList = new ArrayList<>();
                                for (Integer vc : vcListIt) {
                                    vcList.add(vc);
                                }
                                // 2.2 對List進行降序排序
                                vcList.sort((o1, o2) -> o2 - o1);
                                // 2.3 只保留最大的3個(list中的個數一定是連續(xù)變大,一超過3就立即清理即可)
                                if (vcList.size() > 3) {
                                    // 將最后一個元素清除(第4個)
                                    vcList.remove(3);
                                }

                                out.collect("傳感器id為" + value.getId() + ",最大的3個水位值=" + vcList.toString());

                                // 3.更新list狀態(tài)
                                vcListState.update(vcList);


//                                vcListState.get();            //取出 list狀態(tài) 本組的數據,是一個Iterable
//                                vcListState.add();            // 向 list狀態(tài) 本組 添加一個元素
//                                vcListState.addAll();         // 向 list狀態(tài) 本組 添加多個元素
//                                vcListState.update();         // 更新 list狀態(tài) 本組數據(覆蓋)
//                                vcListState.clear();          // 清空List狀態(tài) 本組數據
                            }
                        }
                )
                .print();

        env.execute();

    }
}
8.2.3 Map狀態(tài)

把一些鍵值對(key-value)作為狀態(tài)整體保存起來,可以認為就是一組key-value映射的列表。對應的MapState<UK, UV>接口中,就會有UK、UV兩個泛型,分別表示保存的key和value的類型。同樣,MapState提供了操作映射狀態(tài)的方法,與Map的使用非常類似。

MapState<UK, UV>:

  • UV get(UK key):傳入一個key作為參數,查詢對應的value值;

  • put(UK key, UV value):傳入一個鍵值對,更新key對應的value值;

  • putAll(Map<UK, UV> map):將傳入的映射map中所有的鍵值對,全部添加到映射狀態(tài)中;

  • remove(UK key):將指定key對應的鍵值對刪除;

  • boolean contains(UK key):判斷是否存在指定的key,返回一個boolean值。

另外,MapState也提供了獲取整個映射相關信息的方法;

  • Iterable<Map.Entry<UK, UV>> entries():獲取映射狀態(tài)中所有的鍵值對;

  • Iterable keys():獲取映射狀態(tài)中所有的鍵(key),返回一個可迭代Iterable類型;

  • Iterable values():獲取映射狀態(tài)中所有的值(value),返回一個可迭代Iterable類型;

  • boolean isEmpty():判斷映射是否為空,返回一個boolean值

  • clear():清空

**案例需求:**統(tǒng)計每種傳感器每種水位值出現的次數。

public class KeyedMapStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            MapState<Integer, Integer> vcCountMapState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                // 1.判斷是否存在vc對應的key
                                Integer vc = value.getVc();
                                if (vcCountMapState.contains(vc)) {
                                    // 1.1 如果包含這個vc的key,直接對value+1
                                    Integer count = vcCountMapState.get(vc);
                                    vcCountMapState.put(vc, ++count);
                                } else {
                                    // 1.2 如果不包含這個vc的key,初始化put進去
                                    vcCountMapState.put(vc, 1);
                                }

                                // 2.遍歷Map狀態(tài),輸出每個k-v的值
                                StringBuilder outStr = new StringBuilder();
                                outStr.append("======================================\n");
                                outStr.append("傳感器id為" + value.getId() + "\n");
                                for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {
                                    outStr.append(vcCount.toString() + "\n");
                                }
                                outStr.append("======================================\n");

                                out.collect(outStr.toString());


//                                vcCountMapState.get();          // 對本組的Map狀態(tài),根據key,獲取value
//                                vcCountMapState.contains();     // 對本組的Map狀態(tài),判斷key是否存在
//                                vcCountMapState.put(, );        // 對本組的Map狀態(tài),添加一個 鍵值對
//                                vcCountMapState.putAll();  // 對本組的Map狀態(tài),添加多個 鍵值對
//                                vcCountMapState.entries();      // 對本組的Map狀態(tài),獲取所有鍵值對
//                                vcCountMapState.keys();         // 對本組的Map狀態(tài),獲取所有鍵
//                                vcCountMapState.values();       // 對本組的Map狀態(tài),獲取所有值
//                                vcCountMapState.remove();   // 對本組的Map狀態(tài),根據指定key,移除鍵值對
//                                vcCountMapState.isEmpty();      // 對本組的Map狀態(tài),判斷是否為空
//                                vcCountMapState.iterator();     // 對本組的Map狀態(tài),獲取迭代器
//                                vcCountMapState.clear();        // 對本組的Map狀態(tài),清空

                            }
                        }
                )
                .print();

        env.execute();
    }
}

8.2.4 歸約狀態(tài)

? 類似于值狀態(tài)(Value),不過需要對添加進來的所有數據進行歸約,將歸約聚合之后的值作為狀態(tài)保存下來。ReducingState這個接口調用的方法類似于ListState,只不過它保存的只是一個聚合值,所以調用.add()方法時,不是在狀態(tài)列表里添加元素,而是直接把新數據和之前的狀態(tài)進行歸約,并用得到的結果更新狀態(tài)。

? 歸約邏輯的定義,是在歸約狀態(tài)描述器(ReducingStateDescriptor)中,通過傳入一個歸約函數(ReduceFunction)來實現的。這里的歸約函數,就是我們之前介紹reduce聚合算子時講到的ReduceFunction,所以狀態(tài)類型跟輸入的數據類型是一樣的

ReducingState<T>
public ReducingStateDescriptor(
    String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}

name:名稱
reduceFunction:歸約聚合邏輯
typeClass:類型

**案例:**計算每種傳感器的水位和

public class KeyedProcessFuncReducingState extends KeyedProcessFunction<String, WaterSensor,Integer> {
    private ReducingState<Integer> sumVcState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        sumVcState = this.getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("sumVcState",
                Integer::sum, Integer.class));
    }

    @Override
    public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Integer>.Context context,
                               Collector<Integer> out) throws Exception {
        sumVcState.add(value.getVc());
        out.collect(sumVcState.get());
    }
}

8.2.5 聚合狀態(tài)

? 與歸約狀態(tài)非常類似,聚合狀態(tài)也是一個值,用來保存添加進來的所有數據的聚合結果。與ReducingState不同的是,它的聚合邏輯是由在描述器中傳入一個更加一般化的聚合函數(AggregateFunction)來定義的;這也就是之前我們講過的AggregateFunction,里面通過一個累加器(Accumulator)來表示狀態(tài),所以聚合的狀態(tài)類型可以跟添加進來的數據類型完全不同,使用更加靈活。

? 同樣地,AggregatingState接口調用方法也與ReducingState相同,調用.add()方法添加元素時,會直接使用指定的AggregateFunction進行聚合并更新狀態(tài)。

AggregatingState<IN, OUT>
public AggregatingStateDescriptor(
            String name,
            AggregateFunction<IN, ACC, OUT> aggFunction,
            TypeInformation<ACC> stateType)
name:描述器名稱
aggFunction:聚合函數,聚合邏輯
stateType:狀態(tài)里各個元素的數據類型

**案例需求:**計算每種傳感器的平均水位

public class AggregatingStateKeyedProcessFunc extends KeyedProcessFunction<String, WaterSensor, String> {
    private AggregatingState<Integer, Double> vcAvgAggregatingState;


    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        vcAvgAggregatingState = this.getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
                "vcAvgAggregatingState",
                new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
                    @Override
                    public Tuple2<Integer, Integer> createAccumulator() {
                        return Tuple2.of(0, 0);
                    }

                    @Override
                    public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                        return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
                    }

                    @Override
                    public Double getResult(Tuple2<Integer, Integer> accumulator) {
                        return accumulator.f0 * 1D / accumulator.f1;
                    }

                    @Override
                    public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                        return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
                    }
                }, Types.TUPLE(Types.INT, Types.INT)));
    }

    @Override
    public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context context,
                               Collector<String> out) throws Exception {
        // 將水位值添加到聚合狀態(tài)中
        vcAvgAggregatingState.add(value.getVc());

        // 從聚合狀態(tài)中獲取結果
        Double vcAvg = vcAvgAggregatingState.get();

        out.collect("傳感器id為" + value.getId() + ",平均水位值=" + vcAvg);
    }
}

區(qū)別:

// 歸約函數
ReduceFunction<T>

// 聚合函數
AggregateFunction<IN, ACC, OUT>

可以看到當輸入輸出類型不一致時,我們使用聚合函數處理更方便。

8.2.6 狀態(tài)生存時間(TTL)

? 在實際應用中,很多狀態(tài)會隨著時間的推移逐漸增長,如果不加以限制,最終就會導致存儲空間的耗盡。一個優(yōu)化的思路是直接在代碼中調用.clear()方法去清除狀態(tài),但是有時候我們的邏輯要求不能直接清除。這時就需要配置一個狀態(tài)的“生存時間”(time-to-live,TTL),當狀態(tài)在內存中存在的時間超出這個值時,就將它清除。

? 具體實現上,如果用一個進程不停地掃描所有狀態(tài)看是否過期,顯然會占用大量資源做無用功。狀態(tài)的失效其實不需要立即刪除,所以我們可以給狀態(tài)附加一個屬性,也就是狀態(tài)的“失效時間”。狀態(tài)創(chuàng)建的時候,設置 失效時間 = 當前時間 + TTL;之后如果有對狀態(tài)的訪問和修改,我們可以再對失效時間進行更新;當設置的清除條件被觸發(fā)時(比如,狀態(tài)被訪問的時候,或者每隔一段時間掃描一次失效狀態(tài)),就可以判斷狀態(tài)是否失效、從而進行清除了。

? 配置狀態(tài)的TTL時,需要創(chuàng)建一個StateTtlConfig配置對象,然后調用狀態(tài)描述器的.enableTimeToLive()方法啟動TTL功能。

StateTtlConfig stateTtlConfig = StateTtlConfig
    .newBuilder(Time.seconds(10))
    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
/*
newBuilder(@Nonnull Time ttl): ttl——設定的狀態(tài)生存時間
setUpdateType(UpdateType updateType):更新狀態(tài)失效時間,
	Disabled:狀態(tài)不會過期
	OnCreateAndWrite:當創(chuàng)建或者寫的時候,更新狀態(tài)失效時間
	OnReadAndWrite:當讀或者寫的時候更新狀態(tài)失效時間
setStateVisibility(@Nonnull StateVisibility stateVisibility):設置狀態(tài)的可見性,所謂的“狀態(tài)可見性”,是指因為清除操作并不是實時的,所以當狀態(tài)過期之后還有可能繼續(xù)存在,這時如果對它進行訪問,能否正常讀取到就是一個問題了。
	ReturnExpiredIfNotCleanedUp:默認行為,表示從不返回過期值,也就是只要過期就認為它已經被清除了,應用不能繼續(xù)讀取
	NeverReturnExpired:就是如果過期狀態(tài)還存在,就返回它的值
*/


ReducingStateDescriptor<Integer> reducingStateDescriptor = new ReducingStateDescriptor<Integer>("sumVcState", Integer::sum, Integer.class);
reducingStateDescriptor.enableTimeToLive(stateTtlConfig);

? 除此之外,TTL配置還可以設置在保存檢查點(checkpoint)時觸發(fā)清除操作,或者配置增量的清理(incremental cleanup),還可以針對RocksDB狀態(tài)后端使用壓縮過濾器(compaction filter)進行后臺清理。這里需要注意,目前的TTL設置只支持處理時間。

?。?!注意:目前的TTL設置只支持處理時間!?。?/p>

8.3 算子狀態(tài)

? 算子狀態(tài)(Operator State)就是一個算子并行實例上定義的狀態(tài),作用范圍被限定為當前算子任務。算子狀態(tài)跟數據的key無關,所以不同key的數據只要被分發(fā)到同一個并行子任務,就會訪問到同一個Operator State

? 當算子的并行度發(fā)生變化時,算子狀態(tài)也支持在并行的算子任務實例之間做重組分配。根據狀態(tài)的類型不同,重組分配的方案也會不同。

? 算子狀態(tài)也支持不同的結構類型,主要有三種:ListState、UnionListState和BroadcastState。

8.3.1 列表狀態(tài)(ListState)

與Keyed State中的ListState一樣,將狀態(tài)表示為一組數據的列表。

與Keyed State中的列表狀態(tài)的區(qū)別是:在算子狀態(tài)的上下文中,不會按鍵(key)分別處理狀態(tài),所以每一個并行子任務上只會保留一個“列表”(list),也就是當前并行子任務上所有狀態(tài)項的集合。列表中的狀態(tài)項就是可以重新分配的最細粒度,彼此之間完全獨立。

當算子并行度進行縮放調整時,算子的列表狀態(tài)中的所有元素項會被統(tǒng)一收集起來,相當于把多個分區(qū)的列表合并成了一個“大列表”,然后再均勻地分配給所有并行任務。這種“均勻分配”的具體方法就是“輪詢”(round-robin),與之前介紹的rebanlance數據傳輸方式類似,是通過逐一“發(fā)牌”的方式將狀態(tài)項平均分配的。這種方式也叫作“平均分割重組”(even-split redistribution)。

算子狀態(tài)中不會存在“鍵組”(key group)這樣的結構,所以為了方便重組分配,就把它直接定義成了“列表”(list)。這也就解釋了,為什么算子狀態(tài)中沒有最簡單的值狀態(tài)(ValueState)。

public class OperatorStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env
                .socketTextStream("hadoop102", 7777)
                .map(new MyCountMapFunction())
                .print();
        
        env.execute();
    }
}
public class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {
    private Long count = 0L;
    private ListState<Long> state;

    @Override
    public Long map(String value) throws Exception {
        return ++count;
    }

    /**
     * 本地變量持久化:將 本地變量 拷貝到 算子狀態(tài)中,開啟checkpoint時才會調用
     * @param functionSnapshotContext context
     * @throws Exception e
     */
    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        System.out.println("snapshotState...");
        // 清空算子狀態(tài)
        state.clear();
        // 將本地變量添加到算子狀態(tài)中
        state.add(count);

    }

    /**
     * 初始化本地變量:程序啟動和恢復時, 從狀態(tài)中 把數據添加到 本地變量,每個子任務調用一次
     * @param context context
     * @throws Exception e
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        System.out.println("initializeState...");
        // 從 上下文 初始化 算子狀態(tài)
        state = context
                .getOperatorStateStore()
                .getListState(new ListStateDescriptor<Long>("state", Types.LONG));
        // 從 算子狀態(tài)中 把數據 拷貝到 本地變量
        if (context.isRestored()) {
            for (Long c : state.get()) {
                count += c;
            }
        }

    }
}
8.3.2 聯合列表狀態(tài)(UnionListState)

與ListState類似,聯合列表狀態(tài)也會將狀態(tài)表示為一個列表。它與常規(guī)列表狀態(tài)的區(qū)別在于,算子并行度進行縮放調整時對于狀態(tài)的分配方式不同。

UnionListState的重點就在于“聯合”(union)。在并行度調整時,常規(guī)列表狀態(tài)是輪詢分配狀態(tài)項,而聯合列表狀態(tài)的算子則會直接廣播狀態(tài)的完整列表。這樣,并行度縮放之后的并行子任務就獲取到了聯合后完整的“大列表”,可以自行選擇要使用的狀態(tài)項和要丟棄的狀態(tài)項。這種分配也叫作“聯合重組”(union redistribution)。如果列表中狀態(tài)項數量太多,為資源和效率考慮一般不建議使用聯合重組的方式。

state = context
              .getOperatorStateStore()
              .getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));
8.3.3 廣播狀態(tài)(BroadcastState)

有時我們希望算子并行子任務都保持同一份“全局”狀態(tài),用來做統(tǒng)一的配置和規(guī)則設定。這時所有分區(qū)的所有數據都會訪問到同一個狀態(tài),狀態(tài)就像被“廣播”到所有分區(qū)一樣,這種特殊的算子狀態(tài),就叫作廣播狀態(tài)(BroadcastState)。

因為廣播狀態(tài)在每個并行子任務上的實例都一樣,所以在并行度調整的時候就比較簡單,只要復制一份到新的并行任務就可以實現擴展;而對于并行度縮小的情況,可以將多余的并行子任務連同狀態(tài)直接砍掉——因為狀態(tài)都是復制出來的,并不會丟失。

**案例實操:**水位超過指定的閾值發(fā)送告警,閾值可以動態(tài)修改

public class BroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 數據流
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        // 配置流(用來廣播配置)
        DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);

        // TODO 1. 將 配置流 廣播
        MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
        BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);

        // TODO 2.把 數據流 和 廣播后的配置流 connect
        BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);

        sensorBCS.process(new BroadcastStateProcessFunc(broadcastMapState)).print();

        env.execute();
    }
}
package com.zlin.state.func;

import com.zlin.state.po.WaterSensor;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author ZLin
 * @since 2023/8/28
 */
public class BroadcastStateProcessFunc extends BroadcastProcessFunction<WaterSensor, String, String> {

    MapStateDescriptor<String, Integer> broadcastMapState;

    public BroadcastStateProcessFunc(MapStateDescriptor<String, Integer> broadcastMapState) {
        this.broadcastMapState = broadcastMapState;
    }

    /**
     * 數據流的處理,廣播狀態(tài),只能讀取,不能修改
     * @param value 數據
     * @param context 只讀上下文
     * @param out 收集器
     * @throws Exception e
     */
    @Override
    public void processElement(WaterSensor value, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext context, Collector<String> out) throws Exception {

        // 通過上下文獲取廣播狀態(tài),獲取里面的值
        ReadOnlyBroadcastState<String, Integer> broadcastState = context.getBroadcastState(broadcastMapState);
        Integer threshold = broadcastState.get("threshold");
        threshold = (threshold == null ? 0 : threshold);

        if (value.getVc() > threshold) {
            out.collect(value + ",水位超過指定的閾值:" + threshold + "!!!");
        }

    }


    /**
     * 廣播后的配置數據的處理方法:只有廣播流才能修改廣播狀態(tài)
     * @param s 配置值
     * @param context 上下文
     * @param collector 收集器
     * @throws Exception e
     */
    @Override
    public void processBroadcastElement(String s, BroadcastProcessFunction<WaterSensor, String, String>.Context context, Collector<String> collector) throws Exception {
        BroadcastState<String, Integer> broadcastState = context.getBroadcastState(broadcastMapState);
        broadcastState.put("threshold", Integer.valueOf(s));
    }
}

8.4 狀態(tài)后端(State Backends)

? 在Flink中,狀態(tài)的存儲、訪問以及維護,都是由一個可插拔的組件決定的,這個組件就叫作狀態(tài)后端(state backend)。狀態(tài)后端主要負責管理本地狀態(tài)的存儲方式和位置。

8.4.1 狀態(tài)后端的分類(HashMapStateBackend/RocksDB

狀態(tài)后端是一個“開箱即用”的組件,可以在不改變應用程序邏輯的情況下獨立配置。

兩種:

  • 哈希表狀態(tài)后端(HashMapStateBackend):默認;HashMapStateBackend是把狀態(tài)存放在內存里。把狀態(tài)當作對象(objects),保存在Taskmanager的JVM堆上,以鍵值對的形式存儲起來。
  • 內嵌RocksDB狀態(tài)后端(EmbeddedRocksDBStateBackend):RocksDB是一種內嵌的key-value存儲介質,可以把數據持久化到本地硬盤。配置EmbeddedRocksDBStateBackend后,會將處理中的數據全部放入RocksDB數據庫中,RocksDB默認存儲在TaskManager的本地數據目錄里。RocksDB的狀態(tài)數據被存儲為序列化的字節(jié)數組,讀寫操作需要序列化/反序列化,因此狀態(tài)的訪問性能要差一些。另外,因為做了序列化,key的比較也會按照字節(jié)進行,而不是直接調用.hashCode()和.equals()方法。EmbeddedRocksDBStateBackend始終執(zhí)行的是異步快照,所以不會因為保存檢查點而阻塞數據的處理;而且它還提供了增量式保存檢查點的機制,這在很多情況下可以大大提升保存效率。
8.4.2 如何選擇正確的狀態(tài)后端

HashMapStateBackend:

  • 內存計算,讀寫速度非???/li>
  • 狀態(tài)的大小會受到集群可用內存的限制,如果應用的狀態(tài)隨著時間不停地增長,就會耗盡內存資源。

EmbeddedRocksDBStateBackend:

  • RocksDB是硬盤存儲,所以可以根據可用的磁盤空間進行擴展,所以它非常適合于超級海量狀態(tài)的存儲。
  • 由于每個狀態(tài)的讀寫都需要做序列化/反序列化,而且可能需要直接從磁盤讀取數據,這就會導致性能的降低,平均讀寫性能要比HashMapStateBackend慢一個數量級。
8.4.3 狀態(tài)后端的配置

(1)配置文件

如果沒有配置,則使用默認的集群配置文件flink-conf.yaml中指定的,配置的鍵名稱為state.backend。一般用來配置默認的狀態(tài)后端。

(2)為每個作業(yè)單獨配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
/
env.setStateBackend(new EmbeddedRocksDBStateBackend());

如果想在IDE中使用EmbeddedRocksDBStateBackend,需要為Flink項目添加依賴:文章來源地址http://www.zghlxwxcb.cn/news/detail-679571.html

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>${flink.version}</version>
</dependency>

到了這里,關于《Flink學習筆記》——第八章 狀態(tài)管理的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Flink中的狀態(tài)管理

    Flink中的狀態(tài)管理

    在Flink中,算子任務可以分為 有狀態(tài) 和 無狀態(tài) 兩種狀態(tài)。 無狀態(tài)的算子任務只需要觀察每個獨立事件,根據當前輸入的數據直接轉換輸出結果。 例如 Map 、 Filter 、 FlatMap 都是屬于 無狀態(tài)算子 。? 而 有狀態(tài)的算子任務,就是除了當前數據外,還需要一些其他的數據來得到

    2024年01月23日
    瀏覽(20)
  • Flink State 狀態(tài)管理

    狀態(tài)在Flink中叫做State,用來保存中間計算結果或者緩存數據。要做到比較好的狀態(tài)管理,需要考慮以下幾點內容: 狀態(tài)數據的存儲和訪問 在Task內部,如何高效地保存狀態(tài)數據和使用狀態(tài)數據。 狀態(tài)數據的備份和恢復 作業(yè)失敗是無法避免的,那么就要考慮如何高效地將狀態(tài)

    2024年01月17日
    瀏覽(23)
  • Flink 學習七 Flink 狀態(tài)(flink state)

    Flink 學習七 Flink 狀態(tài)(flink state)

    流式計算邏輯中,比如sum,max; 需要記錄和后面計算使用到一些歷史的累計數據, 狀態(tài)就是 :用戶在程序邏輯中用于記錄信息的變量 在Flink 中 ,狀態(tài)state 不僅僅是要記錄狀態(tài);在程序運行中如果失敗,是需要重新恢復,所以這個狀態(tài)也是需要持久化;一遍后續(xù)程序繼續(xù)運行 1.1 row state 我

    2024年02月09日
    瀏覽(33)
  • 【大數據】Flink 架構(四):狀態(tài)管理

    【大數據】Flink 架構(四):狀態(tài)管理

    《 Flink 架構 》系列(已完結),共包含以下 6 篇文章: Flink 架構(一):系統(tǒng)架構 Flink 架構(二):數據傳輸 Flink 架構(三):事件時間處理 Flink 架構(四):狀態(tài)管理 Flink 架構(五):檢查點 Checkpoint(看完即懂) Flink 架構(六):保存點 Savepoint ?? 如果您覺得這篇

    2024年02月19日
    瀏覽(22)
  • Flink狀態(tài)管理與檢查點機制

    Flink狀態(tài)管理與檢查點機制

    本專欄案例代碼和數據集鏈接:? https://download.csdn.net/download/shangjg03/88477960 相對于其他流計算框架,Flink?一個比較重要的特性就是其支持有狀態(tài)計算。即你可以將中間的計算結果進行保存,并提供給后續(xù)的計算使用: 具體而言,Flink?又將狀態(tài)?(State)?分為?Keyed?State?與?O

    2024年02月07日
    瀏覽(90)
  • 【Flink狀態(tài)管理五】Checkpoint的設計與實現

    【Flink狀態(tài)管理五】Checkpoint的設計與實現

    由于系統(tǒng)原因導致Flink作業(yè)無法正常運行的情況非常多,且很多時候都是無法避免的。對于Flink集群來講,能夠快速從異常狀態(tài)中恢復,同時保證處理數據的正確性和一致性非常重要。Flink主要借助Checkpoint的方式保障整個系統(tǒng)狀態(tài)數據的一致性,也就是基于ABS算法實現輕量級快

    2024年02月21日
    瀏覽(20)
  • Flink 狀態(tài)管理與容錯機制(CheckPoint & SavePoint)的關系

    Flink 狀態(tài)管理與容錯機制(CheckPoint & SavePoint)的關系

    無狀態(tài)計算的例子: 例如一個加法算子,第一次輸入 2+3=5 那么以后我多次數據 2+3 的時候得到的結果都是 5 。得出的結論就是,相同的輸入都會得到相同的結果,與次數無關。 有狀態(tài)計算的例子: 訪問量的統(tǒng)計,我們都知道 Nginx 的訪問日志一個請求一條日志,基于此我們就

    2024年02月04日
    瀏覽(26)
  • Flink分流,合流,狀態(tài),checkpoint和精準一次筆記

    第8章 分流 1.使用側輸出流 2.合流 2.1 union :使用 ProcessFunction 處理合流后的數據 2.2 Connect : 兩條流的格式可以不一樣, map操作使用CoMapFunction,process 傳入:CoProcessFunction 2.2 BroadcastConnectedStream keyBy 進行了按鍵分區(qū),那么要傳入的就是 KeyedBroadcastProcessFunction; 如果沒有按鍵分

    2024年02月12日
    瀏覽(20)
  • 【狀態(tài)管理|概述】Flink的狀態(tài)管理:為什么需要state、怎么保存state、對于state過大怎么處理

    按照數據的劃分和擴張方式,Flink中大致分為2類: Keyed States:記錄每個Key對應的狀態(tài)值 因為一個任務的并行度有多少,就會有多少個子任務,當key的范圍大于并行度時,就會出現一個subTask上可能包含多個Key(),但不同Task上不會出現相同的Key(解決了shuffle的問題?) ? 常

    2024年02月01日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包