一、Flink狀態(tài)編程
有狀態(tài)的計算是流處理框架要實現(xiàn)的重要功能,因為稍復(fù)雜的流處理場景都需要記錄狀態(tài),然后在新流入數(shù)據(jù)的基礎(chǔ)上不斷更新狀態(tài)。
SparkStreaming在狀態(tài)管理這塊做的不好, 很多時候需要借助于外部存儲(例如Redis)來手動管理狀態(tài), 增加了編程的難度。
Flink的狀態(tài)管理是它的優(yōu)勢之一。
二、什么是狀態(tài)
在流式計算中有些操作一次處理一個獨立的事件(比如解析一個事件), 有些操作卻需要記住多個事件的信息(比如窗口操作)。
流式計算分為無狀態(tài)計算和有狀態(tài)計算兩種情況。
無狀態(tài)的計算觀察每個獨立事件,并根據(jù)最后一個事件輸出結(jié)果。例如,流處理應(yīng)用程序從傳感器接收水位數(shù)據(jù),并在水位超過指定高度時發(fā)出警告。
在簡單聚合、窗口聚合、處理函數(shù)的應(yīng)用,都會有狀態(tài)的身影出現(xiàn)。在Flink這樣的分布式系統(tǒng)中,我們不僅需要定義出狀態(tài)在任務(wù)并行時的處理方式,還需要考慮如何持久化保存、以便發(fā)生故障時能正確地恢復(fù),這就需要一套完整的管理機制來處理所有狀態(tài)。
三、為什么需要管理狀態(tài)
下面的幾個場景都需要使用流處理的狀態(tài)功能:
去重: 數(shù)據(jù)流中的數(shù)據(jù)有重復(fù),我們想對重復(fù)數(shù)據(jù)去重,需要記錄哪些數(shù)據(jù)已經(jīng)流入過應(yīng)用,當(dāng)新數(shù)據(jù)流入時,根據(jù)已流入過的數(shù)據(jù)來判斷去重。
檢測: 檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態(tài)的形式緩存下來。比如,判斷一個溫度傳感器數(shù)據(jù)流中的溫度是否在持續(xù)上升。
聚合: 對一個時間窗口內(nèi)的數(shù)據(jù)進行聚合分析,分析一個小時內(nèi)水位的情況
更新機器學(xué)習(xí)模型: 在線機器學(xué)習(xí)場景下,需要根據(jù)新流入數(shù)據(jù)不斷更新機器學(xué)習(xí)的模型參數(shù)。
四、Flink中的狀態(tài)分類
Managed State
狀態(tài)管理方式 Flink Runtime托管, 自動存儲, 自動恢復(fù), 自動伸縮
狀態(tài)數(shù)據(jù)結(jié)構(gòu) Flink提供多種常用數(shù)據(jù)結(jié)構(gòu), 例如:ListState, MapState等
使用場景 絕大數(shù)Flink算子。
Raw State
狀態(tài)管理方式 用戶自己管理
狀態(tài)數(shù)據(jù)結(jié)構(gòu) 字節(jié)數(shù)組: byte[]
使用場景 所有算子
從具體使用場景來說,絕大多數(shù)的算子都可以通過繼承Rich函數(shù)類或其他提供好的接口類,在里面使用Managed State。Raw State一般是在已有算子和Managed State不夠用時,用戶自定義算子時使用。
在我們平時的使用中Managed State已經(jīng)足夠我們使用。
對Managed State繼續(xù)細分,它又有2種類型
Operator State(算子狀態(tài))
Keyed State(鍵控狀態(tài))
Operator State
適用用算子類型: 可用于所有算子: 常用于source, sink,
例如:FlinkKafkaConsumer
狀態(tài)分配:一個算子的子任務(wù)對應(yīng)一個狀態(tài)
創(chuàng)建和訪問方式: 實現(xiàn)CheckpointedFunction或ListCheckpointed(已經(jīng)過時)接口
橫向擴展 :并發(fā)改變時有多重重寫分配方式可選: 均勻分配和合并后每個得到全量
支持的數(shù)據(jù)結(jié)構(gòu): ListState,UnionListStste和BroadCastState
Keyed State
適用用算子類型: 只能用于用于KeyedStream上的算子
狀態(tài)分配 :一個Key對應(yīng)一個State: 一個算子會處理多個Key, 則訪問相應(yīng)的多個State
創(chuàng)建和訪問方式:重寫RichFunction, 通過里面的RuntimeContext訪問w
橫向擴展 :并發(fā)改變, State隨著Key在實例間遷移
支持的數(shù)據(jù)結(jié)構(gòu):ValueState, ListState,MapState ReduceState, AggregatingState
五、算子狀態(tài)的使用
Operator State可以用在所有算子上,每個算子子任務(wù)或者說每個算子實例共享一個狀態(tài),流入這個算子子任務(wù)的數(shù)據(jù)可以訪問和更新這個狀態(tài)。
注意: 算子子任務(wù)之間的狀態(tài)不能互相訪問
Operator State的實際應(yīng)用場景不如Keyed State多,它經(jīng)常被用在Source或Sink等算子上,用來保存流入數(shù)據(jù)的偏移量或?qū)敵鰯?shù)據(jù)做緩存,以保證Flink應(yīng)用的Exactly-Once語義。
Flink為算子狀態(tài)提供三種基本數(shù)據(jù)結(jié)構(gòu):
列表狀態(tài)(List state),將狀態(tài)表示為一組數(shù)據(jù)的列表
聯(lián)合列表狀態(tài)(Union list state),也是將狀態(tài)表示為數(shù)據(jù)的列表。它與常規(guī)列表狀態(tài)的區(qū)別在于,在發(fā)生故障時,或者從保存點(savepoint)啟動應(yīng)用程序時如何恢復(fù)。一種是均勻分配(List state),另外一種是將所有 State 合并為全量 State 再分發(fā)給每個實例(Union list state)。
廣播狀態(tài)(Broadcast state)
是一種特殊的算子狀態(tài). 如果一個算子有多項任務(wù),而它的每項任務(wù)狀態(tài)又都相同,那么這種特殊情況最適合應(yīng)用廣播狀態(tài)。
三種狀態(tài)的實現(xiàn)
六、鍵控狀態(tài)的使用
鍵控狀態(tài)是根據(jù)輸入數(shù)據(jù)流中定義的鍵(key)來維護和訪問的,只能用于KeyedStream(keyBy算子處理之后)。相同key的所有數(shù)據(jù)都會訪問相同的狀態(tài)。
鍵控狀態(tài)支持的數(shù)據(jù)類型
注意:
a)所有的類型都有clear(), 清空當(dāng)前key的狀態(tài)
b)這些狀態(tài)對象僅用于用戶與狀態(tài)進行交互.
c)狀態(tài)不是必須存儲到內(nèi)存, 也可以存儲在磁盤或者任意其他地方
d)從狀態(tài)獲取的值與輸入元素的key相關(guān)
七、狀態(tài)后端
狀態(tài)的存儲、訪問以及維護,由一個可插入的組件決定,這個組件就叫做狀態(tài)后端。
狀態(tài)后端主要負責(zé)兩件事:
本地(taskmanager)的狀態(tài)管理
將檢查點(checkpoint)狀態(tài)寫入遠程存儲文章來源:http://www.zghlxwxcb.cn/news/detail-698652.html
狀態(tài)后端的分類及配置
狀態(tài)后端作為一個可插入的組件, 沒有固定的配置, 我們可以根據(jù)需要選擇一個合適的狀態(tài)后端。
MemoryStateBackend
內(nèi)存級別的狀態(tài)后端(默認),
存儲方式:本地狀態(tài)存儲在TaskManager的內(nèi)存中, checkpoint 存儲在JobManager的內(nèi)存中.
特點:快速, 低延遲, 但不穩(wěn)定
使用場景: 1. 本地測試 2. 幾乎無狀態(tài)的作業(yè)(ETL) 3. JobManager不容易掛, 或者掛了影響不大. 4. 不推薦在生產(chǎn)環(huán)境下使用
FsStateBackend
存儲方式: 本地狀態(tài)在TaskManager內(nèi)存, Checkpoint時, 存儲在文件系統(tǒng)(hdfs)中
特點: 擁有內(nèi)存級別的本地訪問速度, 和更好的容錯保證
使用場景: 1. 常規(guī)使用狀態(tài)的作業(yè). 例如分鐘級別窗口聚合, join等 2. 需要開啟HA的作業(yè) 3. 可以應(yīng)用在生產(chǎn)環(huán)境中
RocksDBStateBackend
將所有的狀態(tài)序列化之后, 存入本地的RocksDB數(shù)據(jù)庫中.(一種NoSql數(shù)據(jù)庫, KV形式存儲)
存儲方式: 1. 本地狀態(tài)存儲在TaskManager的RocksDB數(shù)據(jù)庫中(實際是內(nèi)存+磁盤) 2. Checkpoint在外部文件系統(tǒng)(hdfs)中.
使用場景: 1. 超大狀態(tài)的作業(yè), 例如天級的窗口聚合 2. 需要開啟HA的作業(yè) 3. 對讀寫狀態(tài)性能要求不高的作業(yè) 4. 可以使用在生產(chǎn)環(huán)境文章來源地址http://www.zghlxwxcb.cn/news/detail-698652.html
八、案例列表狀態(tài)
package com.lyh.flink09;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
public class state_programe1_s {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.socketTextStream("hadoop100",9999)
.map(new MyMapFunctin())
.print();
env.execute();
}
public static class MyMapFunctin implements MapFunction<String,Long>, CheckpointedFunction {
private Long count = 0L;
private ListState<Long> state;
@Override
public Long map(String value) throws Exception {
count++;
return count;
}
// 初始化時會調(diào)用這個方法,向本地狀態(tài)中填充數(shù)據(jù). 每個子任務(wù)調(diào)用一次
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("initialize.....");
state = context.getOperatorStateStore()
.getListState(new ListStateDescriptor<Long>("state",Long.class));
for (Long c : state.get()) {
count += c;
}
}
// Checkpoint時會調(diào)用這個方法,我們要實現(xiàn)具體的snapshot邏輯,比如將哪些本地狀態(tài)持久化
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
System.out.println("snapshot.....");
state.clear();
state.add(count);
}
}
}
九、案例廣播狀態(tài)
package com.lyh.flink09;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
public class state_broad1_s {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment()
.setParallelism(3);
DataStreamSource<String> dataStream = env.socketTextStream("hadoop100", 9999);
DataStreamSource<String> controlStream = env.socketTextStream("hadoop100", 8888);
MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class);
// 廣播流
BroadcastStream<String> broadcastStream = controlStream.broadcast(stateDescriptor);
dataStream
.connect(broadcastStream)
.process(new BroadcastProcessFunction<String, String, String>() {
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// 從廣播狀態(tài)中取值, 不同的值做不同的業(yè)務(wù)
ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
if ("1".equals(state.get("switch"))) {
out.collect("切換到1號配置....");
} else if ("0".equals(state.get("switch"))) {
out.collect("切換到0號配置....");
} else {
out.collect("切換到其他配置....");
}
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
// 把值放入廣播狀態(tài)
state.put("switch", value);
}
})
.print();
env.execute();
}
}
到了這里,關(guān)于大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!