国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全

這篇具有很好參考價值的文章主要介紹了flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1. 狀態(tài)、狀態(tài)后端、Checkpoint 三者之間的區(qū)別及關(guān)系?

拿五個字做比喻:“鐵鍋燉大鵝”,鐵鍋是狀態(tài)后端,大鵝是狀態(tài),Checkpoint 是燉的動作。

  1. 狀態(tài):本質(zhì)來說就是數(shù)據(jù),在 Flink 中,其實就是 Flink 提供給用戶的狀態(tài)編程接口。比如 flink 中的 MapState,ValueState,ListState。

  2. 狀態(tài)后端:Flink 提供的用于管理狀態(tài)的組件,狀態(tài)后端決定了以什么樣數(shù)據(jù)結(jié)構(gòu),什么樣的存儲方式去存儲和管理我們的狀態(tài)。Flink 目前官方提供了 memory、filesystem,rocksdb 三種狀態(tài)后端來存儲我們的狀態(tài)。
    但!flink1.13后 對狀態(tài)后端做了整合,只有這兩種了

  • HashMapStateBackend
  • EmbeddedRocksDBStateBackend

老版本(flink-1.12 版及以前) Fsstatebackend MemoryStatebackend RocksdbStateBackend
新版本中,F(xiàn)sstatebackend 和 MemoryStatebackend 整合成了 HashMapStateBackend 而且 HashMapStateBackend 和 EmBeddedRocksDBStateBackend 所生成的快照文件也統(tǒng)一了格式,因而 在 job 重新部署或者版本升級時,可以任意替換 statebackend

  1. Checkpoint(狀態(tài)管理):Flink 提供的用于定時將狀態(tài)后端中存儲的狀態(tài)同步到遠程的存儲系統(tǒng)的組件或者能力。為了防止 long run 的 Flink 任務(wù)掛了導(dǎo)致狀態(tài)丟失,產(chǎn)生數(shù)據(jù)質(zhì)量問題,F(xiàn)link 提供了狀態(tài)管理(Checkpoint,Savepoint)的能力把我們使用的狀態(tài)給管理起來,定時的保存到遠程。然后可以在 Flink 任務(wù) failover 時,從遠程把狀態(tài)數(shù)據(jù)恢復(fù)到 Flink 任務(wù)中,保障數(shù)據(jù)質(zhì)量。

2 算子狀態(tài)與鍵控狀態(tài)的區(qū)別

2.1 算子狀態(tài)

  • ? 算子狀態(tài),是每個 subtask 自己持有一份獨立的狀態(tài)數(shù)據(jù)(但如果在失敗恢復(fù)后,算子并行度發(fā) 生變化,則狀態(tài)將在新的 subtask 之間均勻分配);
  • ? 算子函數(shù)實現(xiàn) CheckpointedFunction 后,即可使用算子狀態(tài);
  • ? 算子狀態(tài),通常用于 source 算子中;其他場景下建議使用 KeyedState(鍵控狀態(tài));
  • 算子狀態(tài),在邏輯上,由算子 task 下所有 subtask 共享; 如何理解:正常運行時,subtask 自己讀寫自己的狀態(tài)數(shù)據(jù);而一旦 job 重啟且?guī)顟B(tài)算子發(fā)生了并行度的變化,則 之前的狀態(tài)數(shù)據(jù)將在新的一批 subtask 間均勻分配

2.2 鍵控狀態(tài)

  • ? 鍵控狀態(tài),只能應(yīng)用于 KeyedStream 的算子中(keyby 后的處理算子中);
  • ? 算子為每一個 key 綁定一份獨立的狀態(tài)數(shù)據(jù);

flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

2.3 算子狀態(tài)api

要使用算子狀態(tài)(operator state),需要讓算子函數(shù)實現(xiàn) CheckpointedFunction 接口;

/*** * @author hunter.d * @qq 657270652 * @wx haitao-duan * @date 2022/4/10 **/
public class OperatorStateTest {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("execution.savepoint.path", "D:\\ckpt\\27270525e8f166834f2bbf7c617ad6d3\\chk-11");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.enableCheckpointing(2000);
        env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        //以下兩個 map 算子,其中一個是帶狀態(tài)的 
        // 如果修改代碼邏輯(如調(diào)整兩個 map 算子順序),且沒有設(shè)置 uid,則從 savepoints 恢復(fù)時將失敗
        source.map(new StatefulMapFunc()).uid("stateful-mapfunc-001").setParallelism(2).map(new NoStateMapFunc()).setParallelism(1).print().setParallelism(1);
        env.execute();
    }

    public static class NoStateMapFunc implements MapFunction<String, String> {
        @Override
        public String map(String value) throws Exception {
            return value.toUpperCase();
        }
    }

    // 帶狀態(tài)的 map 函數(shù)(將接收的字符串記在狀態(tài)中,以不斷拼接新數(shù)據(jù)返回) 

    public static class StatefulMapFunc extends RichMapFunction<String, String> implements CheckpointedFunction {
        ListState<String> lstState;

        /*** 正常的 map 映射邏輯方法 */

        @Override
        public String map(String value) throws Exception {
            lstState.add(value);
            StringBuilder sb = new StringBuilder();
            for (String s : lstState.get()) {
                sb.append(s).append(",");
            }
            return sb.toString();
        }

        /*** checkpoint 觸發(fā)時會調(diào)用的方法 */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        /*** 初始化算子任務(wù)時會調(diào)用的方法,以加載、初始化狀態(tài)數(shù)據(jù) */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            lstState = context.getOperatorStateStore().getListState(new ListStateDescriptor<String>("lst", String.class));
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            Iterable<String> iter = lstState.get();
            Iterator<String> it = iter.iterator(); 
            
            // 用于觀察 task 失敗恢復(fù)后的狀態(tài)恢復(fù)情況 
            System.out.println("-------" + indexOfThisSubtask + " - 初始化時,打印狀態(tài):-----");
            while (it.hasNext()) {
                System.out.println(indexOfThisSubtask + ":" + it.next());
            }
            System.out.println("-------" + indexOfThisSubtask + " -初始化時,打印狀態(tài):-----");
        }
    }
}
        

2.4 鍵控狀態(tài)api

要使用鍵控狀態(tài)(Keyed State),需要在實現(xiàn) RichFunction 的函數(shù)中;

 public class _15_ChannelEventsCntMapFunc extends RichMapFunction<EventLog, String> {
    ValueState<Integer> valueState;

    @Override
    public void open(Configuration parameters) throws Exception {
        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.milliseconds(2000)).cleanupFullSnapshot().neverReturnExpired().useProc
        essingTime().updateTtlOnReadAndWrite().build();
        ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("cnt", Integer.class);
        desc.enableTimeToLive(ttlConfig); // 獲取單值狀態(tài)管理器 
        valueState = getRuntimeContext().getState(desc);
    }

    @Override
    public String map(EventLog eventLog) throws Exception {
        // 來一條數(shù)據(jù),就對狀態(tài)更新 
        valueState.update((valueState.value() == null ? 0 : valueState.value()) + 1);
        return eventLog.getChannel() + " : " + valueState.value();
    }
}

3 HashMapStateBackend 狀態(tài)后端

HashMapStateBackend:

  • ? 狀態(tài)數(shù)據(jù)是以 java 對象形式存儲在 heap 內(nèi)存中;

  • ? 內(nèi)存空間不夠時,也會溢出一部分數(shù)據(jù)到本地磁盤文件;

  • ? 可以支撐大規(guī)模的狀態(tài)數(shù)據(jù);(只不過在狀態(tài)數(shù)據(jù)規(guī)模超出內(nèi)存空間時,讀寫效率就會明顯降低)

  • 對于 KeyedState 來說:
    HashMapStateBackend 在內(nèi)存中是使用 CopyOnWriteStateMap 結(jié)構(gòu)來存儲用戶的狀態(tài)數(shù)據(jù); 注意,此數(shù)據(jù)結(jié)構(gòu)類,名為 Map,實非 Map,它其實是一個單向鏈表的數(shù)據(jù)結(jié)構(gòu)

  • 對于 OperatorState 來說:
    flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端
    可以清楚看出,它底層直接用一個 Map 集合來存儲用戶的狀態(tài)數(shù)據(jù):狀態(tài)名稱 --> 狀態(tài) List

4 EmBeddedRocksDbStateBackend 狀態(tài)后端

  • ? 狀態(tài)數(shù)據(jù)是交給 rocksdb 來管理;
  • ? Rocksdb 中的數(shù)據(jù)是以序列化的 kv 字節(jié)進行存儲;
  • ? Rockdb 中的數(shù)據(jù),有內(nèi)存緩存的部分,也有磁盤文件的部分;
  • ? Rockdb 的磁盤文件數(shù)據(jù)讀寫速度相對還是比較快的,所以在支持超大規(guī)模狀態(tài)數(shù)據(jù)時,數(shù)據(jù)的 讀寫效率不會有太大的降低

注意:上述 2 中狀態(tài)后端,在生成 checkpoint 快照文件時,生成的文件格式是完全一致的; 所以,用戶的 flink 程序在更改狀態(tài)后端后,重啟時依然可以加載和恢復(fù)此前的快照文件數(shù)據(jù);
老版本中,狀態(tài)與狀態(tài)后端的關(guān)系是:
flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

5 狀態(tài)數(shù)據(jù)結(jié)構(gòu)介紹

5.1 算子狀態(tài)提供的數(shù)據(jù)結(jié)構(gòu)

ListState , UnionListState
UnionListState 和普通 ListState 的區(qū)別:

  1. UnionListState 的快照存儲數(shù)據(jù),在系統(tǒng)重啟后,list 數(shù)據(jù)的重分配模式為: 廣播模式; 在每個 subtask 上都擁有一份完整的數(shù)據(jù);
  2. ListState 的快照存儲數(shù)據(jù),系統(tǒng)重啟后,list 數(shù)據(jù)的重分配模式為: round-robin 輪詢平均分配

5.2 鍵控狀態(tài)提供的數(shù)據(jù)結(jié)構(gòu)

? ValueState ? ListState ? MapState ? ReducingState ? AggregateState

6 Reducing 聚合狀態(tài)

用戶傳入一個增量聚合函數(shù)后,狀態(tài)實現(xiàn)自動增量聚合(輸入數(shù)據(jù)與聚合結(jié)果類型必須一致)

// 獲取一個 reduce 聚合狀態(tài) 
    reduceState =runtimeContext.getReducingState(new ReducingStateDescriptor<Integer>("reduceState",new ReduceFunction<Integer>()

    {
        @Override public Integer reduce (Integer value1, Integer value2) throws Exception {
        return value1 + value2;
    }
    },Integer .class));

7 廣播狀態(tài)

/*** * @author hunter.d * @qq 657270652 * @wx haitao-duan * @date 2022/4/10 **/
public class OperatorStateTest {
    // 主數(shù)據(jù)流 
    SingleOutputStreamOperator<Student> s1;
    // 待廣播出去的流
    SingleOutputStreamOperator<StuInfo> s2;
    // 定義廣播狀態(tài)的狀態(tài)描述對象
    MapStateDescriptor<Integer, StuInfo> stateDescriptor = new MapStateDescriptor<>("info", Integer.class, StuInfo.class);
    // 將 s2 流廣播出去 BroadcastStream<StuInfo>
    stuInfoBroadcastStream =s2.broadcast(stateDescriptor);
    // 用主數(shù)據(jù)流 connect 連接 廣播數(shù)據(jù)流,并處理
    s1.connect(stuInfoBroadcastStream).

    process(new BroadcastProcessFunction<Student, StuInfo, String>() {
        @Override public void processElement (Student student, ReadOnlyContext
        readOnlyContext, Collector < String > collector) throws Exception
        {
            // 對 "主流" 中的元素進行處理 
            readOnlyContext.getBroadCastState(); // 只讀狀態(tài)
        }
        @Override public void processBroadcastElement (StuInfo stuInfo, Context context, Collector < String > collector) throws
        Exception {
            // 對 "廣播流" 中的元素進行處理
            context.getBroadCastState(); // 可讀可寫 
        }
    });

}

8. flink重啟時,修改并行度,state會發(fā)生什么變化?鍵值狀態(tài)分區(qū)策略,解決數(shù)據(jù)傾斜

假設(shè)原來是3個并行度
flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

重啟之后給兩個并行度,state會發(fā)生什么呢?他依然可以加載之前的快照數(shù)據(jù)

這里面引入一下 : subtask 是什么呢,相當于 每一個算子,就是一個subtask,像下面的 4個sink 就是4個subtask,4個并行度。
flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

那么說回來,3個并行度 改成了兩個,少了一個,這個subtask上存儲的state 要怎么辦呢。
假設(shè)你用的是liststate。重啟的時候 ,會自動做分配到剩余的兩個state里

flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端
也可能是,直接重新分配給某一個state,以前 三個并行度,分別讀kafka的三個分區(qū),1分區(qū) 1000,2 分區(qū) 500, 3分區(qū) 800
重新分配后,就變成了 1分區(qū) 1000+3分區(qū) 800 ; 2 分區(qū) 500 這種情況。

在鍵值狀態(tài)API的設(shè)計思路小節(jié)中,我們提到鍵值狀態(tài)在重分布時要和KeyedStream的哈希數(shù)據(jù)分區(qū)策略保持完全一致。原理理解起來簡單,但實際上Flink鍵值狀態(tài)重分布的機制在此基礎(chǔ)上還做了很多的性能優(yōu)化,本節(jié)我們就詳細剖析鍵值狀態(tài)重分布的過程,掌握這部分知識將會對我們在生產(chǎn)環(huán)境中解決數(shù)據(jù)傾斜問題有很大的幫助。

我們以電商場景中計算每種商品累計銷售額的場景為例,邏輯數(shù)據(jù)流為Source→KeyBy\Map→Sink,我們在KeyBy\Map算子中使用鍵值狀態(tài)ValueState來保存每種商品的累計銷售額。

接下來我們看看ValueState鍵值狀態(tài)在KeyBy\Map算子并行度從2變?yōu)?時鍵值狀態(tài)的重分布過程。如圖6-28所示,我們用parallelism代表算子并行度,假設(shè)KeyedStream的哈希數(shù)據(jù)分區(qū)策略的計算公式為SubTask(key)=hash(key)%parallelism(符號%代表取余計算),該計算公式用于計算某個key的數(shù)據(jù)要被發(fā)往KeyBy\Map算子的SubTask下標。

當KeyBy\Map算子的并行度為2時,哈希數(shù)據(jù)分區(qū)策略就為SubTask(key)=hash(key)%2,假設(shè)這時經(jīng)過計算后,key為商品3的數(shù)據(jù)會被發(fā)送到KeyBy\Map[1]中,那么商品3的累計銷售額的狀態(tài)數(shù)據(jù)就會存儲在KeyBy\Map[1]本地。當用戶將KeyBy\Map算子的并行度擴展為3后,哈希數(shù)據(jù)分區(qū)策略就變?yōu)榱薙ubTask(key)=hash(key)%3,由于數(shù)據(jù)分區(qū)策略的計算公式變化了,因此每一個key的數(shù)據(jù)要發(fā)往的SubTask也會發(fā)生改變。假設(shè)這時key為商品3的數(shù)據(jù)會被發(fā)送到KeyBy\Map[0]中,那么商品3的累計銷售額狀態(tài)數(shù)據(jù)必然要被重分布到KeyBy\Map[0]中,如圖所示。

flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端
雖然鍵值狀態(tài)的重分布策略能夠降低用戶的開發(fā)成本,但是這種重分布策略卻對鍵值狀態(tài)重分布的性能提出了巨大的挑戰(zhàn)。如圖6-28所示,當算子并行度為2時,每個SubTask在執(zhí)行快照時會將本地的狀態(tài)數(shù)據(jù)順序地寫入到遠程分布式文件系統(tǒng)中,SubTask0和SubTask1分別寫入文件1和文件2。當算子并行度變?yōu)?后,根據(jù)新的Hash分區(qū)策略計算,key為商品0、商品3、商品6和商品9的數(shù)據(jù)要被恢復(fù)到SubTask0中,那么SubTask0就要同時讀取文件1和文件2,SubTask1和SubTask2的恢復(fù)過程也相同,都分別需要從文件1和文件2中恢復(fù)一部分key的狀態(tài)數(shù)據(jù)。

這時我們發(fā)現(xiàn)如果要讓每個SubTask都完整且正確的恢復(fù)狀態(tài)數(shù)據(jù),就需要讓每個SubTask都從分布式文件系統(tǒng)中讀取到所有的快照文件,然后再過濾出屬于當前SubTask的key的狀態(tài)數(shù)據(jù),但是按照這樣的流程執(zhí)行,就會出現(xiàn)以下兩個問題。

  • 狀態(tài)恢復(fù)時的性能問題:在算子以新的并行度啟動并從快照恢復(fù)時,算子的每個SubTask都會讀取大量不屬于當前SubTask的key的狀態(tài)數(shù)據(jù),同時還需要從中篩選出屬于當前SubTask的狀態(tài)數(shù)據(jù),而這會導(dǎo)致SubTask的啟動過程耗費大量的時間,作業(yè)的恢復(fù)過程很漫長。舉例來說,如果算子的并行度為500,每一個SubTask中都有100萬個key的狀態(tài)數(shù)據(jù),那么整個作業(yè)總計會有5億個key,這時如果我們將算子并行度擴展為1000,那么對這1000個SubTask來說,每一個SubTask都要讀取到這5億個key的快照文件,然后再過濾出屬于自己的key的狀態(tài)數(shù)據(jù),但是平均下來每個SubTask最終只會保留50萬個key的狀態(tài)數(shù)據(jù),其余的4.995億個key的數(shù)據(jù)都會被過濾。
  • 分布式文件系統(tǒng)的穩(wěn)定性問題:在算子從快照時,所有的SubTask都會對分布式文件系統(tǒng)發(fā)起大量讀取相同文件的請求,這對分布式文件系統(tǒng)穩(wěn)定性也會造成影響,并且隨著算子并行度的增大,這種情況會越來越嚴重。

綜上所述,使用該方案來恢復(fù)狀態(tài)數(shù)據(jù)時,性能是無法達到預(yù)期的。其低效的原因就在于從狀態(tài)恢復(fù)時,SubTask不知道分布式文件系統(tǒng)中的每一份快照文件中存儲了哪些key的狀態(tài)數(shù)據(jù),也不知道這些key的狀態(tài)數(shù)據(jù)在快照文件中的偏移量,所以只能全量讀取后再按照key一個一個的進行過濾。

這里插一句,在20240125 此時此刻,flink計劃做的2.0中最核心的一塊就是狀態(tài)的存算分離,解決的就是大狀態(tài)的性能場景問題,與這里引出的狀態(tài)重分配導(dǎo)致的問題,本質(zhì)上是一個情況,所以這個問題正常情況來說,flink2.0上線以后,可以解決。

9.Flink State TTL 是怎么做到數(shù)據(jù)過期的?首先我們來想想,要做到 TTL 的話,要具備什么條件呢?

想想 Redis 的 TTL 設(shè)置,如果我們要設(shè)置 TTL 則必然需要給一條數(shù)據(jù)給一個時間戳,只有這樣才能判斷這條數(shù)據(jù)是否過期了。

在 Flink 中設(shè)置 State TTL,就會有這樣一個時間戳,具體實現(xiàn)時,F(xiàn)link 會把時間戳字段和具體數(shù)據(jù)字段存儲作為同級存儲到 State 中。

舉個例子,我要將一個 String 存儲到 State 中時:

  1. ? 沒有設(shè)置 State TTL 時,則直接將 String 存儲在 State 中
  2. ? 如果設(shè)置 State TTL 時,則 Flink 會將 <String, Long> 存儲在 State 中,其中 Long 為時間戳,用于判斷是否過期。

接下來以 FileSystem 狀態(tài)后端下的 MapState 作為案例來說:

? 如果沒有設(shè)置 State TTL,則生產(chǎn)的 MapState 的字段類型如下(可以看到生成的就是 HeapMapState 實例):
flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端
? 如果設(shè)置了 State TTL,則生成的 MapState 的字段類型如下(可以看到使用到了裝飾器的設(shè)計模式生成是 TtlMapState):
flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

注意:
任務(wù)設(shè)置了 State TTL 和不設(shè)置 State TTL 的狀態(tài)是不兼容的。這里大家在使用時一定要注意。防止出現(xiàn)任務(wù)從 Checkpoint 恢復(fù)不了的情況。但是你可以去修改 TTL 時長,因為修改時長并不會改變 State 存儲結(jié)構(gòu)。

注意:
? 存活時長的計時器可以在數(shù)據(jù)被讀、寫時重置;
? Ttl 存活管理粒度是到元素級的(如 liststate 中的每個元素,mapstate 中的每個 entry)

cleanupStrategies(過期數(shù)據(jù)清理策略,目前支持的策略有)

  1. cleanupIncrementally : 增量清除 每當訪問狀態(tài)時,都會驅(qū)動一次過期檢查(算子注冊了很多 key 的 state,一次檢查只針對其中一部分: 由參數(shù) cleanupSize 決定) 算子持有一個包含所有 key 的迭代器,每次檢查后,迭代器都會向前 advance 指定的 key 數(shù)量; 本策略,針對“本地狀態(tài)空間”,且只用于 HashMapStateBackend
    flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

  2. cleanupFullSnapshot
    在進行全量快照(checkpoint)時,清理掉過期數(shù)據(jù); 注意:只是在生成的 checkpoint 數(shù)據(jù)中不包含過期數(shù)據(jù);在本地狀態(tài)空間中,并沒有做清理; 本策略,針對“快照”生效

  3. cleanupInRocksdbCompactFilter 只針對 rocksdbStateBackend 有效; 它是利用 rocksdb 的 compact 功能,在 rocksdb 進行 compact 時,清除掉過期數(shù)據(jù); 本策略,針對“本地狀態(tài)空間”,且只用于 EmbeddedRocksDbStateBackend

10.Flink SQL API State TTL 的過期機制是 onCreateAndUpdate 還是onReadAndWrite

? 結(jié)論:Flink SQL API State TTL 的過期機制目前只支持 onCreateAndUpdate,DataStream API 兩個都支持

flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端
? 剖析:

  • onCreateAndUpdate:是在創(chuàng)建 State 和更新 State 時【更新 State TTL】
  • onReadAndWrite:是在訪問 State 和寫入 State 時【更新 State TTL】

? 實際踩坑場景:Flink SQL Deduplicate 寫法,row_number partition by user_id order by proctime asc,此 SQL 最后生成的算子只會在第一條數(shù)據(jù)來的時候更新 state,后續(xù)訪問不會更新 state TTL,因此 state 會在用戶設(shè)置的 state TTL 時間之后過期。

5.operator-state 和 keyed-state 兩者的區(qū)別?最大并行度又和它們有什么關(guān)系?舉個生產(chǎn)環(huán)境中經(jīng)常出現(xiàn)的案例,當用戶停止任務(wù)、更新代碼邏輯并且改變?nèi)蝿?wù)并發(fā)度時,兩種 state 都是怎樣進行恢復(fù)的?

? 總結(jié)如下:
flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

? operator-state:

flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

  • ? 狀態(tài)適用算子:所有算子都可以使用 operator-state,沒有限制。

  • ? 狀態(tài)的創(chuàng)建方式:如果需要使用 operator-state,需要實現(xiàn) CheckpointedFunction 或 ListCheckpointed 接口

  • ? DataStream API 中,operator-state 提供了 ListState、BroadcastState、UnionListState 3 種用戶接口

  • ? 狀態(tài)的存儲粒度:以單算子單并行度粒度訪問、更新狀態(tài)

  • ? 并行度變化時:a. ListState:均勻劃分到算子的每個 sub-task 上,比如 Flink Kafka Source 中就使用了 ListState 存儲消費 Kafka 的 offset,其 rescale 如下圖flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

  • BroadcastState:每個 sub-task 的廣播狀態(tài)都一樣 c. UnionListState:將原來所有元素合并,合并后的數(shù)據(jù)每個算子都有一份全量狀態(tài)數(shù)據(jù)
    flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

? keyed-state:

flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端文章來源地址http://www.zghlxwxcb.cn/news/detail-834640.html

  • ? 狀態(tài)適用算子:keyed-stream 后的算子使用。注意這里很多同學(xué)會犯一個錯誤,就是大家會認為 keyby 后面跟的所有算子都使用的是 keyed-state,但這是錯誤的 ?,比如有 keyby.process.flatmap,其中 flatmap 中使用狀態(tài)的話是 operator-state
  • ? 狀態(tài)的創(chuàng)建方式:從 context 接口獲取具體的 keyed-state
  • ? DataStream API 中,keyed-state 提供了 ValueState、MapState、ListState 等用戶接口,其中最常用 ValueState、MapState
  • ? 狀態(tài)的存儲粒度:以單 key 粒度訪問、更新狀態(tài)。舉例,當我們使用 keyby.process,在 process 中處理邏輯時,其實每一次 process 的處理 context 都會對應(yīng)到一個 key,所以在 process 中的處理都是以 key 為粒度的。這里很多同學(xué)會犯一個錯 ?,比如想在 open 方法中訪問、更新 state,這是不行的,因為 open 方法在執(zhí)行時,還沒有到正式的數(shù)據(jù)處理環(huán)節(jié),上下文中是沒有 key 的。
  • ? 并行度變化時:keyed-state 的重新劃分是隨著 key-group 進行的。其中 key-group 的個數(shù)就是最大并發(fā)度的個數(shù)。其中一個 key-group 處理一段區(qū)間 key 的數(shù)據(jù),不同 key-group 處理的 key 是完全不同的。當任務(wù)并行度變化時,會將 key-group 重新劃分到算子不同的 sub-task 上,任務(wù)啟動后,任務(wù)數(shù)據(jù)在做 keyby 進行數(shù)據(jù) shuffle 時,依然能夠按照當前數(shù)據(jù)的 key 發(fā)到下游能夠處理這個 key 的 key-group 中進行處理,如下圖所示。注意:最大并行度和 key-group 的個數(shù)綁定,所以如果想恢復(fù)任務(wù) state,最大并行度是不能修改的。大家需要提前預(yù)估最大并行度個數(shù)。
    flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全,flink技術(shù)原理,flink,java,前端

11.ValueState 和 MapState 各自適合的應(yīng)用場景?

  1. ? ValueState
  • 應(yīng)用場景:簡單的一個變量存儲,比如 Long\String 等。如果狀態(tài)后端為 RocksDB,極其不建議在 ValueState 中存儲一個大 Map,這種場景下序列化和反序列化的成本非常高,這種常見適合使用 MapState。其實這種場景也是很多小伙伴一開始使用 State 的誤用之痛,一定要避免。
  • TTL:針對整個 Value 起作用
  1. ? MapState
  • 應(yīng)用場景:和 Map 使用方式一樣一樣的
  • TTL:針對 Map 的 key 生效,每個 key 一個 TTL

到了這里,關(guān)于flink state原理,TTL,狀態(tài)后端,數(shù)據(jù)傾斜一文全的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • flink對狀態(tài)ttl進行單元測試

    在處理鍵值分區(qū)狀態(tài)時,使用ttl設(shè)置過期時間是我們經(jīng)常使用的,但是任何代碼的修改都需要首先進行單元測試,本文就使用單元測試來驗證一下狀態(tài)ttl的設(shè)置是否正確 首先看一下處理函數(shù): 單元測試代碼: 測試代碼中已經(jīng)包含了詳細的注解,我們實現(xiàn)自己的ttl單元測試時可

    2024年02月05日
    瀏覽(21)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)后端(下)

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)后端(下)

    每傳入一條數(shù)據(jù),有狀態(tài)的算子任務(wù)都會讀取和更新狀態(tài)。由于有效的狀態(tài)訪問對于處理數(shù)據(jù)的低延遲至關(guān)重要,因此每個并行任務(wù)(子任務(wù))都會在本地維護其狀態(tài),以確保快速的狀態(tài)訪問。 狀態(tài)的存儲、訪問以及維護,由一個可插入的組件決定,這個組件就叫做狀態(tài)后端(

    2024年02月09日
    瀏覽(21)
  • Flink 學(xué)習(xí)七 Flink 狀態(tài)(flink state)

    Flink 學(xué)習(xí)七 Flink 狀態(tài)(flink state)

    流式計算邏輯中,比如sum,max; 需要記錄和后面計算使用到一些歷史的累計數(shù)據(jù), 狀態(tài)就是 :用戶在程序邏輯中用于記錄信息的變量 在Flink 中 ,狀態(tài)state 不僅僅是要記錄狀態(tài);在程序運行中如果失敗,是需要重新恢復(fù),所以這個狀態(tài)也是需要持久化;一遍后續(xù)程序繼續(xù)運行 1.1 row state 我

    2024年02月09日
    瀏覽(33)
  • Flink State 狀態(tài)管理

    狀態(tài)在Flink中叫做State,用來保存中間計算結(jié)果或者緩存數(shù)據(jù)。要做到比較好的狀態(tài)管理,需要考慮以下幾點內(nèi)容: 狀態(tài)數(shù)據(jù)的存儲和訪問 在Task內(nèi)部,如何高效地保存狀態(tài)數(shù)據(jù)和使用狀態(tài)數(shù)據(jù)。 狀態(tài)數(shù)據(jù)的備份和恢復(fù) 作業(yè)失敗是無法避免的,那么就要考慮如何高效地將狀態(tài)

    2024年01月17日
    瀏覽(24)
  • 【狀態(tài)管理|概述】Flink的狀態(tài)管理:為什么需要state、怎么保存state、對于state過大怎么處理

    按照數(shù)據(jù)的劃分和擴張方式,F(xiàn)link中大致分為2類: Keyed States:記錄每個Key對應(yīng)的狀態(tài)值 因為一個任務(wù)的并行度有多少,就會有多少個子任務(wù),當key的范圍大于并行度時,就會出現(xiàn)一個subTask上可能包含多個Key(),但不同Task上不會出現(xiàn)相同的Key(解決了shuffle的問題?) ? 常

    2024年02月01日
    瀏覽(19)
  • Flink狀態(tài)詳解:什么是時狀態(tài)(state)?狀態(tài)描述(StateDescriptor)及其重要性

    Flink狀態(tài)詳解:什么是時狀態(tài)(state)?狀態(tài)描述(StateDescriptor)及其重要性

    了解Flink中的狀態(tài)概念及其在流處理中的重要性。探討狀態(tài)在有狀態(tài)計算中的作用,狀態(tài)描述符(StateDescriptor)的基本概念和用法。理解狀態(tài)在Flink任務(wù)中的維護、恢復(fù)和與算子的關(guān)聯(lián)。

    2024年02月08日
    瀏覽(28)
  • Flink 狀態(tài)后端

    狀態(tài)后端 (state backend) : 負責(zé)管理本地狀態(tài)的存儲方式, 位置 Flink 的狀態(tài)后端有兩類 : 哈希表狀態(tài)后端 (HashMapStateBackend) : 狀態(tài)放在內(nèi)存 內(nèi)嵌 RocksDB 狀態(tài)后端 (EmbeddedRocksDBStateBackend) : 狀態(tài)放在 RocksDB 數(shù)據(jù)庫 哈希表狀態(tài)后端 : 實現(xiàn) : 將狀態(tài)當作對象 (objects) , 保存在 Taskmanager 的

    2024年02月13日
    瀏覽(25)
  • 209.Flink(四):狀態(tài),按鍵分區(qū),算子狀態(tài),狀態(tài)后端。容錯機制,檢查點,保存點。狀態(tài)一致性。flink與kafka整合

    算子任務(wù)可以分為有狀態(tài)、無狀態(tài)兩種。 無狀態(tài):filter,map這種,每次都是獨立事件 有狀態(tài):sum這種,每次處理數(shù)據(jù)需要額外一個狀態(tài)值來輔助。這個額外的值就叫“狀態(tài)” (1)托管狀態(tài)(Managed State)和原始狀態(tài)(Raw State) 托管狀態(tài) 就是由Flink統(tǒng)一管理的,狀態(tài)的存儲訪問

    2024年02月06日
    瀏覽(22)
  • 回聲狀態(tài)網(wǎng)絡(luò)(Echo State Networks,ESN)詳細原理講解及Python代碼實現(xiàn)

    回聲狀態(tài)網(wǎng)絡(luò)(Echo State Networks,ESN)詳細原理講解及Python代碼實現(xiàn)

    回聲狀態(tài)網(wǎng)絡(luò)是一種循環(huán)神經(jīng)網(wǎng)絡(luò)。ESN 訓(xùn)練方式與傳統(tǒng) RNN 不同。網(wǎng)絡(luò)結(jié)構(gòu)如下圖: (1)儲層(Reservoir):中文翻譯有叫儲備池、儲層、儲蓄池等等各種名稱。ESN 中的儲層是互連神經(jīng)元的集合,其中連接及其權(quán)重是隨機初始化和固定的。該儲層充當動態(tài)儲層,其目的是將輸

    2024年04月17日
    瀏覽(22)
  • 如何解決Flink任務(wù)的數(shù)據(jù)傾斜

    如何解決flink任務(wù)的數(shù)據(jù)傾斜問題 Flink 任務(wù)的數(shù)據(jù)傾斜問題可以通過以下幾種方法來解決: 使用滑動窗口:滑動窗口可以將窗口劃分成多個子窗口,從而使數(shù)據(jù)更加均衡地分配到不同的計算節(jié)點中。同時,滑動窗口還可以使窗口內(nèi)的數(shù)據(jù)更加連續(xù),從而減少數(shù)據(jù)傾斜的情況。

    2024年02月14日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包