国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上)

這篇具有很好參考價值的文章主要介紹了大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、Flink狀態(tài)編程

有狀態(tài)的計算是流處理框架要實現(xiàn)的重要功能,因為稍復(fù)雜的流處理場景都需要記錄狀態(tài),然后在新流入數(shù)據(jù)的基礎(chǔ)上不斷更新狀態(tài)。
SparkStreaming在狀態(tài)管理這塊做的不好, 很多時候需要借助于外部存儲(例如Redis)來手動管理狀態(tài), 增加了編程的難度。
Flink的狀態(tài)管理是它的優(yōu)勢之一。

二、什么是狀態(tài)

在流式計算中有些操作一次處理一個獨立的事件(比如解析一個事件), 有些操作卻需要記住多個事件的信息(比如窗口操作)。

流式計算分為無狀態(tài)計算和有狀態(tài)計算兩種情況。
無狀態(tài)的計算觀察每個獨立事件,并根據(jù)最后一個事件輸出結(jié)果。例如,流處理應(yīng)用程序從傳感器接收水位數(shù)據(jù),并在水位超過指定高度時發(fā)出警告。

在簡單聚合、窗口聚合、處理函數(shù)的應(yīng)用,都會有狀態(tài)的身影出現(xiàn)。在Flink這樣的分布式系統(tǒng)中,我們不僅需要定義出狀態(tài)在任務(wù)并行時的處理方式,還需要考慮如何持久化保存、以便發(fā)生故障時能正確地恢復(fù),這就需要一套完整的管理機制來處理所有狀態(tài)。

三、為什么需要管理狀態(tài)

下面的幾個場景都需要使用流處理的狀態(tài)功能:
去重: 數(shù)據(jù)流中的數(shù)據(jù)有重復(fù),我們想對重復(fù)數(shù)據(jù)去重,需要記錄哪些數(shù)據(jù)已經(jīng)流入過應(yīng)用,當(dāng)新數(shù)據(jù)流入時,根據(jù)已流入過的數(shù)據(jù)來判斷去重。
檢測: 檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態(tài)的形式緩存下來。比如,判斷一個溫度傳感器數(shù)據(jù)流中的溫度是否在持續(xù)上升。
聚合: 對一個時間窗口內(nèi)的數(shù)據(jù)進行聚合分析,分析一個小時內(nèi)水位的情況
更新機器學(xué)習(xí)模型: 在線機器學(xué)習(xí)場景下,需要根據(jù)新流入數(shù)據(jù)不斷更新機器學(xué)習(xí)的模型參數(shù)。

四、Flink中的狀態(tài)分類

Managed State
狀態(tài)管理方式 Flink Runtime托管, 自動存儲, 自動恢復(fù), 自動伸縮
狀態(tài)數(shù)據(jù)結(jié)構(gòu) Flink提供多種常用數(shù)據(jù)結(jié)構(gòu), 例如:ListState, MapState等
使用場景 絕大數(shù)Flink算子。

Raw State
狀態(tài)管理方式 用戶自己管理
狀態(tài)數(shù)據(jù)結(jié)構(gòu) 字節(jié)數(shù)組: byte[]
使用場景 所有算子

從具體使用場景來說,絕大多數(shù)的算子都可以通過繼承Rich函數(shù)類或其他提供好的接口類,在里面使用Managed State。Raw State一般是在已有算子和Managed State不夠用時,用戶自定義算子時使用。
在我們平時的使用中Managed State已經(jīng)足夠我們使用。

大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上),大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK,大數(shù)據(jù),flink
對Managed State繼續(xù)細分,它又有2種類型
Operator State(算子狀態(tài))
Keyed State(鍵控狀態(tài))

Operator State
適用用算子類型: 可用于所有算子: 常用于source, sink,
例如:FlinkKafkaConsumer
狀態(tài)分配:一個算子的子任務(wù)對應(yīng)一個狀態(tài)
創(chuàng)建和訪問方式: 實現(xiàn)CheckpointedFunction或ListCheckpointed(已經(jīng)過時)接口
橫向擴展 :并發(fā)改變時有多重重寫分配方式可選: 均勻分配和合并后每個得到全量
支持的數(shù)據(jù)結(jié)構(gòu): ListState,UnionListStste和BroadCastState

Keyed State
適用用算子類型: 只能用于用于KeyedStream上的算子
狀態(tài)分配 :一個Key對應(yīng)一個State: 一個算子會處理多個Key, 則訪問相應(yīng)的多個State
創(chuàng)建和訪問方式:重寫RichFunction, 通過里面的RuntimeContext訪問w
橫向擴展 :并發(fā)改變, State隨著Key在實例間遷移
支持的數(shù)據(jù)結(jié)構(gòu):ValueState, ListState,MapState ReduceState, AggregatingState

五、算子狀態(tài)的使用

Operator State可以用在所有算子上,每個算子子任務(wù)或者說每個算子實例共享一個狀態(tài),流入這個算子子任務(wù)的數(shù)據(jù)可以訪問和更新這個狀態(tài)。

注意: 算子子任務(wù)之間的狀態(tài)不能互相訪問
Operator State的實際應(yīng)用場景不如Keyed State多,它經(jīng)常被用在Source或Sink等算子上,用來保存流入數(shù)據(jù)的偏移量或?qū)敵鰯?shù)據(jù)做緩存,以保證Flink應(yīng)用的Exactly-Once語義。

Flink為算子狀態(tài)提供三種基本數(shù)據(jù)結(jié)構(gòu):
列表狀態(tài)(List state),將狀態(tài)表示為一組數(shù)據(jù)的列表

聯(lián)合列表狀態(tài)(Union list state),也是將狀態(tài)表示為數(shù)據(jù)的列表。它與常規(guī)列表狀態(tài)的區(qū)別在于,在發(fā)生故障時,或者從保存點(savepoint)啟動應(yīng)用程序時如何恢復(fù)。一種是均勻分配(List state),另外一種是將所有 State 合并為全量 State 再分發(fā)給每個實例(Union list state)。

廣播狀態(tài)(Broadcast state)
是一種特殊的算子狀態(tài). 如果一個算子有多項任務(wù),而它的每項任務(wù)狀態(tài)又都相同,那么這種特殊情況最適合應(yīng)用廣播狀態(tài)。

三種狀態(tài)的實現(xiàn)
大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上),大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK,大數(shù)據(jù),flink

六、鍵控狀態(tài)的使用

鍵控狀態(tài)是根據(jù)輸入數(shù)據(jù)流中定義的鍵(key)來維護和訪問的,只能用于KeyedStream(keyBy算子處理之后)。相同key的所有數(shù)據(jù)都會訪問相同的狀態(tài)。
鍵控狀態(tài)支持的數(shù)據(jù)類型
大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上),大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK,大數(shù)據(jù),flink
注意:
a)所有的類型都有clear(), 清空當(dāng)前key的狀態(tài)
b)這些狀態(tài)對象僅用于用戶與狀態(tài)進行交互.
c)狀態(tài)不是必須存儲到內(nèi)存, 也可以存儲在磁盤或者任意其他地方
d)從狀態(tài)獲取的值與輸入元素的key相關(guān)

七、狀態(tài)后端

狀態(tài)的存儲、訪問以及維護,由一個可插入的組件決定,這個組件就叫做狀態(tài)后端。
狀態(tài)后端主要負責(zé)兩件事:
本地(taskmanager)的狀態(tài)管理
將檢查點(checkpoint)狀態(tài)寫入遠程存儲

狀態(tài)后端的分類及配置
狀態(tài)后端作為一個可插入的組件, 沒有固定的配置, 我們可以根據(jù)需要選擇一個合適的狀態(tài)后端。
大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上),大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK,大數(shù)據(jù),flink
MemoryStateBackend
內(nèi)存級別的狀態(tài)后端(默認),
存儲方式:本地狀態(tài)存儲在TaskManager的內(nèi)存中, checkpoint 存儲在JobManager的內(nèi)存中.
特點:快速, 低延遲, 但不穩(wěn)定
使用場景: 1. 本地測試 2. 幾乎無狀態(tài)的作業(yè)(ETL) 3. JobManager不容易掛, 或者掛了影響不大. 4. 不推薦在生產(chǎn)環(huán)境下使用
FsStateBackend
存儲方式: 本地狀態(tài)在TaskManager內(nèi)存, Checkpoint時, 存儲在文件系統(tǒng)(hdfs)中
特點: 擁有內(nèi)存級別的本地訪問速度, 和更好的容錯保證
使用場景: 1. 常規(guī)使用狀態(tài)的作業(yè). 例如分鐘級別窗口聚合, join等 2. 需要開啟HA的作業(yè) 3. 可以應(yīng)用在生產(chǎn)環(huán)境中
RocksDBStateBackend
將所有的狀態(tài)序列化之后, 存入本地的RocksDB數(shù)據(jù)庫中.(一種NoSql數(shù)據(jù)庫, KV形式存儲)
存儲方式: 1. 本地狀態(tài)存儲在TaskManager的RocksDB數(shù)據(jù)庫中(實際是內(nèi)存+磁盤) 2. Checkpoint在外部文件系統(tǒng)(hdfs)中.
使用場景: 1. 超大狀態(tài)的作業(yè), 例如天級的窗口聚合 2. 需要開啟HA的作業(yè) 3. 對讀寫狀態(tài)性能要求不高的作業(yè) 4. 可以使用在生產(chǎn)環(huán)境文章來源地址http://www.zghlxwxcb.cn/news/detail-698652.html

八、案例列表狀態(tài)

package com.lyh.flink09;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;

public class state_programe1_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.socketTextStream("hadoop100",9999)
                .map(new MyMapFunctin())
                .print();
        env.execute();

    }
    public static class MyMapFunctin implements MapFunction<String,Long>, CheckpointedFunction {
        private Long count = 0L;
        private ListState<Long> state;
        @Override
        public Long map(String value) throws Exception {
            count++;
            return count;
        }
        // 初始化時會調(diào)用這個方法,向本地狀態(tài)中填充數(shù)據(jù). 每個子任務(wù)調(diào)用一次
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("initialize.....");
            state = context.getOperatorStateStore()
                    .getListState(new ListStateDescriptor<Long>("state",Long.class));

                for (Long c : state.get()) {
                    count += c;
                }
            }

        // Checkpoint時會調(diào)用這個方法,我們要實現(xiàn)具體的snapshot邏輯,比如將哪些本地狀態(tài)持久化
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("snapshot.....");
            state.clear();
            state.add(count);
        }
    }
}

九、案例廣播狀態(tài)

package com.lyh.flink09;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class state_broad1_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment()
                .setParallelism(3);
        DataStreamSource<String> dataStream = env.socketTextStream("hadoop100", 9999);
        DataStreamSource<String> controlStream = env.socketTextStream("hadoop100", 8888);


        MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class);
        // 廣播流
        BroadcastStream<String> broadcastStream = controlStream.broadcast(stateDescriptor);
        dataStream
                .connect(broadcastStream)
                .process(new BroadcastProcessFunction<String, String, String>() {
                    @Override
                    public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                        // 從廣播狀態(tài)中取值, 不同的值做不同的業(yè)務(wù)
                        ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
                        if ("1".equals(state.get("switch"))) {
                            out.collect("切換到1號配置....");
                        } else if ("0".equals(state.get("switch"))) {
                            out.collect("切換到0號配置....");
                        } else {
                            out.collect("切換到其他配置....");
                        }
                    }

                    @Override
                    public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                        BroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
                        // 把值放入廣播狀態(tài)
                        state.put("switch", value);
                    }
                })
                .print();

        env.execute();
    }

}

到了這里,關(guān)于大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink第七章:狀態(tài)編程

    Flink第七章:狀態(tài)編程

    Flink第一章:環(huán)境搭建 Flink第二章:基本操作. Flink第三章:基本操作(二) Flink第四章:水位線和窗口 Flink第五章:處理函數(shù) Flink第六章:多流操作 Flink第七章:狀態(tài)編程 這次我們來學(xué)習(xí)Flink中的狀態(tài)學(xué)習(xí)部分,創(chuàng)建以下scala文件 這個文件里有幾個常用的狀態(tài)創(chuàng)建 按鍵分區(qū)中值狀態(tài)編程案

    2024年02月06日
    瀏覽(24)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink窗口函數(shù)

    前面指定了窗口的分配器, 接著我們需要來指定如何計算, 這事由window function來負責(zé). 一旦窗口關(guān)閉, window function 去計算處理窗口中的每個元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一種. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以對

    2024年02月11日
    瀏覽(37)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink營銷對賬

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink營銷對賬

    在電商網(wǎng)站中,訂單的支付作為直接與營銷收入掛鉤的一環(huán),在業(yè)務(wù)流程中非常重要。對于訂單而言,為了正確控制業(yè)務(wù)流程,也為了增加用戶的支付意愿,網(wǎng)站一般會設(shè)置一個支付失效時間,超過一段時間不支付的訂單就會被取消。另外,對于訂單的支付,我們還應(yīng)保證用

    2024年02月11日
    瀏覽(20)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink 容錯機制

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink 容錯機制

    在分布式架構(gòu)中,當(dāng)某個節(jié)點出現(xiàn)故障,其他節(jié)點基本不受影響。在 Flink 中,有一套完整的容錯機制,最重要就是檢查點(checkpoint)。 在流處理中,我們可以用存檔讀檔的思路,把之前的計算結(jié)果做個保存,這樣重啟之后就可以繼續(xù)處理新數(shù)據(jù)、而不需要重新計算了。所以

    2024年02月07日
    瀏覽(22)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink

    具體版本根據(jù)實際情況確定 參見大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Redis 安裝與使用 可以根據(jù)要寫入的redis的不同數(shù)據(jù)類型進行調(diào)整

    2024年02月13日
    瀏覽(16)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink定時器

    基于處理時間或者事件時間處理過一個元素之后, 注冊一個定時器, 然后指定的時間執(zhí)行. Context和OnTimerContext所持有的TimerService對象擁有以下方法: currentProcessingTime(): Long 返回當(dāng)前處理時間 currentWatermark(): Long 返回當(dāng)前watermark的時間戳 registerProcessingTimeTimer(timestamp: Long): Unit 會注

    2024年02月10日
    瀏覽(20)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink惡意登錄監(jiān)控

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink惡意登錄監(jiān)控

    對于網(wǎng)站而言,用戶登錄并不是頻繁的業(yè)務(wù)操作。如果一個用戶短時間內(nèi)頻繁登錄失敗,就有可能是出現(xiàn)了程序的惡意攻擊,比如密碼暴力破解。 因此我們考慮,應(yīng)該對用戶的登錄失敗動作進行統(tǒng)計,具體來說,如果同一用戶(可以是不同IP)在2秒之內(nèi)連續(xù)兩次登錄失敗,就

    2024年02月07日
    瀏覽(14)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時間滾動動窗口

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時間滾動動窗口

    在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。在這種情況下,我們必須定義一個窗口,用來收集

    2024年02月11日
    瀏覽(22)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink-Transform

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink-Transform

    轉(zhuǎn)換算子可以把一個或多個DataStream轉(zhuǎn)成一個新的DataStream.程序可以把多個復(fù)雜的轉(zhuǎn)換組合成復(fù)雜的數(shù)據(jù)流拓撲. 2.1、map(映射) 將數(shù)據(jù)流中的數(shù)據(jù)進行轉(zhuǎn)換, 形成新的數(shù)據(jù)流,消費一個元素并產(chǎn)出一個元素 2.2、filter(過濾) 根據(jù)指定的規(guī)則將滿足條件(true)的數(shù)據(jù)保留,不

    2024年02月13日
    瀏覽(18)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink 網(wǎng)站UV統(tǒng)計

    在實際應(yīng)用中,我們往往會關(guān)注,到底有多少不同的用戶訪問了網(wǎng)站,所以另外一個統(tǒng)計流量的重要指標是網(wǎng)站的獨立訪客數(shù)(Unique Visitor,UV)。 對于UserBehavior數(shù)據(jù)源來說,我們直接可以根據(jù)userId來區(qū)分不同的用戶。 將userid放到SET集合里面,統(tǒng)計集合長度,便可以統(tǒng)計到網(wǎng)

    2024年02月11日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包