1.狀態(tài)分類
相對于其他流計算框架,F(xiàn)link?一個比較重要的特性就是其支持有狀態(tài)計算。即你可以將中間的計算結(jié)果進行保存,并提供給后續(xù)的計算使用:
具體而言,F(xiàn)link?又將狀態(tài)?(State)?分為?Keyed?State?與?Operator?State:
1.1?算子狀態(tài)
算子狀態(tài)?(Operator?State):顧名思義,狀態(tài)是和算子進行綁定的,一個算子的狀態(tài)不能被其他算子所訪問到。官方文檔上對?Operator?State?的解釋是:each?operator?state?is?bound?to?one?parallel?operator?instance,所以更為確切的說一個算子狀態(tài)是與一個并發(fā)的算子實例所綁定的,即假設(shè)算子的并行度是?2,那么其應(yīng)有兩個對應(yīng)的算子狀態(tài):
2.2?鍵控狀態(tài)
鍵控狀態(tài)?(Keyed?State)?:是一種特殊的算子狀態(tài),即狀態(tài)是根據(jù)?key?值進行區(qū)分的,F(xiàn)link?會為每類鍵值維護一個狀態(tài)實例。如下圖所示,每個顏色代表不同?key?值,對應(yīng)四個不同的狀態(tài)實例。需要注意的是鍵控狀態(tài)只能在?KeyedStream?上進行使用,我們可以通過?stream.keyBy(...)?來得到?KeyedStream?。
2.狀態(tài)編程
2.1?鍵控狀態(tài)
Flink?提供了以下數(shù)據(jù)格式來管理和存儲鍵控狀態(tài)?(Keyed?State):
- ValueState:存儲單值類型的狀態(tài)。可以使用??update(T)?進行更新,并通過?T?value()?進行檢索。?
- ListState:存儲列表類型的狀態(tài)。可以使用?add(T)?或?addAll(List)?添加元素;并通過?get()?獲得整個列表。
- ReducingState:用于存儲經(jīng)過?ReduceFunction?計算后的結(jié)果,使用?add(T)?增加元素。
- AggregatingState:用于存儲經(jīng)過?AggregatingState?計算后的結(jié)果,使用?add(IN)?添加元素。
- FoldingState:已被標(biāo)識為廢棄,會在未來版本中移除,官方推薦使用?AggregatingState?代替。
- MapState:維護?Map?類型的狀態(tài)。
以上所有增刪改查方法不必硬記,在使用時通過語法提示來調(diào)用即可。這里給出一個具體的使用示例:假設(shè)我們正在開發(fā)一個監(jiān)控系統(tǒng),當(dāng)監(jiān)控數(shù)據(jù)超過閾值一定次數(shù)后,需要發(fā)出報警信息。這里之所以要達(dá)到一定次數(shù),是因為由于偶發(fā)原因,偶爾一次超過閾值并不能代表什么,故需要達(dá)到一定次數(shù)后才觸發(fā)報警,這就需要使用到?Flink?的狀態(tài)編程。相關(guān)代碼如下:
public class ThresholdWarning extends
RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {
// 通過ListState來存儲非正常數(shù)據(jù)的狀態(tài)
private transient ListState<Long> abnormalData;
// 需要監(jiān)控的閾值
private Long threshold;
// 觸發(fā)報警的次數(shù)
private Integer numberOfTimes;
ThresholdWarning(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
}
@Override
public void open(Configuration parameters) {
// 通過狀態(tài)名稱(句柄)獲取狀態(tài)實例,如果不存在則會自動創(chuàng)建
abnormalData = getRuntimeContext().getListState(
new ListStateDescriptor<>("abnormalData", Long.class));
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out)
throws Exception {
Long inputValue = value.f1;
// 如果輸入值超過閾值,則記錄該次不正常的數(shù)據(jù)信息
if (inputValue >= threshold) {
abnormalData.add(inputValue);
}
ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
// 如果不正常的數(shù)據(jù)出現(xiàn)達(dá)到一定次數(shù),則輸出報警信息
if (list.size() >= numberOfTimes) {
out.collect(Tuple2.of(value.f0 + " 超過指定閾值 ", list));
// 報警信息輸出后,清空狀態(tài)
abnormalData.clear();
}
}
}
調(diào)用自定義的狀態(tài)監(jiān)控,這里我們使用?a,b?來代表不同類型的監(jiān)控數(shù)據(jù),分別對其數(shù)據(jù)進行監(jiān)控:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
.keyBy(0)
.flatMap(new ThresholdWarning(100L, 3)) // 超過100的閾值3次后就進行報警
.printToErr();
env.execute("Managed Keyed State");
輸出如下結(jié)果如下:
2.2?狀態(tài)有效期
以上任何類型的?keyed?state?都支持配置有效期?(TTL)?,示例如下:
StateTtlConfig ttlConfig = StateTtlConfig
// 設(shè)置有效期為 10 秒
.newBuilder(Time.seconds(10))
// 設(shè)置有效期更新規(guī)則,這里設(shè)置為當(dāng)創(chuàng)建和寫入時,都重置其有效期到規(guī)定的10秒
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
/*設(shè)置只要值過期就不可見,另外一個可選值是ReturnExpiredIfNotCleanedUp,
代表即使值過期了,但如果還沒有被物理刪除,就是可見的*/
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
descriptor.enableTimeToLive(ttlConfig);
2.3?算子狀態(tài)
相比于鍵控狀態(tài),算子狀態(tài)目前支持的存儲類型只有以下三種:
- ListState:存儲列表類型的狀態(tài)。
- UnionListState:存儲列表類型的狀態(tài),與?ListState?的區(qū)別在于:如果并行度發(fā)生變化,ListState?會將該算子的所有并發(fā)的狀態(tài)實例進行匯總,然后均分給新的?Task;而?UnionListState?只是將所有并發(fā)的狀態(tài)實例匯總起來,具體的劃分行為則由用戶進行定義。
- BroadcastState:用于廣播的算子狀態(tài)。
這里我們繼續(xù)沿用上面的例子,假設(shè)此時我們不需要區(qū)分監(jiān)控數(shù)據(jù)的類型,只要有監(jiān)控數(shù)據(jù)超過閾值并達(dá)到指定的次數(shù)后,就進行報警,代碼如下:
public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>,
Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction {
// 非正常數(shù)據(jù)
private List<Tuple2<String, Long>> bufferedData;
// checkPointedState
private transient ListState<Tuple2<String, Long>> checkPointedState;
// 需要監(jiān)控的閾值
private Long threshold;
// 次數(shù)
private Integer numberOfTimes;
ThresholdWarning(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
this.bufferedData = new ArrayList<>();
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 注意這里獲取的是OperatorStateStore
checkPointedState = context.getOperatorStateStore().
getListState(new ListStateDescriptor<>("abnormalData",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
})));
// 如果發(fā)生重啟,則需要從快照中將狀態(tài)進行恢復(fù)
if (context.isRestored()) {
for (Tuple2<String, Long> element : checkPointedState.get()) {
bufferedData.add(element);
}
}
}
@Override
public void flatMap(Tuple2<String, Long> value,
Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) {
Long inputValue = value.f1;
// 超過閾值則進行記錄
if (inputValue >= threshold) {
bufferedData.add(value);
}
// 超過指定次數(shù)則輸出報警信息
if (bufferedData.size() >= numberOfTimes) {
// 順便輸出狀態(tài)實例的hashcode
out.collect(Tuple2.of(checkPointedState.hashCode() + "閾值警報!", bufferedData));
bufferedData.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 在進行快照時,將數(shù)據(jù)存儲到checkPointedState
checkPointedState.clear();
for (Tuple2<String, Long> element : bufferedData) {
checkPointedState.add(element);
}
}
}
調(diào)用自定義算子狀態(tài),這里需要將并行度設(shè)置為?1:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟檢查點機制
env.enableCheckpointing(1000);
// 設(shè)置并行度為1
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
.flatMap(new ThresholdWarning(100L, 3))
.printToErr();
env.execute("Managed Keyed State");
}
此時輸出如下:
在上面的調(diào)用代碼中,我們將程序的并行度設(shè)置為?1,可以看到三次輸出中狀態(tài)實例的?hashcode?全是一致的,證明它們都同一個狀態(tài)實例。假設(shè)將并行度設(shè)置為?2,此時輸出如下:
可以看到此時兩次輸出中狀態(tài)實例的?hashcode?是不一致的,代表它們不是同一個狀態(tài)實例,這也就是上文提到的,一個算子狀態(tài)是與一個并發(fā)的算子實例所綁定的。同時這里只輸出兩次,是因為在并發(fā)處理的情況下,線程?1?可能拿到?5?個非正常值,線程?2?可能拿到?4?個非正常值,因為要大于?3?次才能輸出,所以在這種情況下就會出現(xiàn)只輸出兩條記錄的情況,所以需要將程序的并行度設(shè)置為?1。
3.檢查點機制
3.1?CheckPoints
為了使?Flink?的狀態(tài)具有良好的容錯性,F(xiàn)link?提供了檢查點機制?(CheckPoints)??。通過檢查點機制,F(xiàn)link?定期在數(shù)據(jù)流上生成?checkpoint?barrier?,當(dāng)某個算子收到?barrier?時,即會基于當(dāng)前狀態(tài)生成一份快照,然后再將該?barrier?傳遞到下游算子,下游算子接收到該?barrier?后,也基于當(dāng)前狀態(tài)生成一份快照,依次傳遞直至到最后的?Sink?算子上。當(dāng)出現(xiàn)異常后,F(xiàn)link?就可以根據(jù)最近的一次的快照數(shù)據(jù)將所有算子恢復(fù)到先前的狀態(tài)。
3.2?開啟檢查點
默認(rèn)情況下,檢查點機制是關(guān)閉的,需要在程序中進行開啟:
//?開啟檢查點機制,并指定狀態(tài)檢查點之間的時間間隔
env.enableCheckpointing(1000);
//?其他可選配置如下:
//?設(shè)置語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//?設(shè)置兩個檢查點之間的最小時間間隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//?設(shè)置執(zhí)行Checkpoint操作時的超時時間
env.getCheckpointConfig().setCheckpointTimeout(60000);
//?設(shè)置最大并發(fā)執(zhí)行的檢查點的數(shù)量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//?將檢查點持久化到外部存儲
env.getCheckpointConfig().enableExternalizedCheckpoints(
????ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//?如果有更近的保存點時,是否將作業(yè)回退到該檢查點
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
3.3?保存點機制
保存點機制?(Savepoints)?是檢查點機制的一種特殊的實現(xiàn),它允許你通過手工的方式來觸發(fā)?Checkpoint,并將結(jié)果持久化存儲到指定路徑中,主要用于避免?Flink?集群在重啟或升級時導(dǎo)致狀態(tài)丟失。示例如下:
#?觸發(fā)指定id的作業(yè)的Savepoint,并將結(jié)果存儲到指定目錄下
bin/flink?savepoint?:jobId?[:targetDirectory]
4.狀態(tài)后端
4.1?狀態(tài)管理器分類
默認(rèn)情況下,所有的狀態(tài)都存儲在?JVM?的堆內(nèi)存中,在狀態(tài)數(shù)據(jù)過多的情況下,這種方式很有可能導(dǎo)致內(nèi)存溢出,因此?Flink?該提供了其它方式來存儲狀態(tài)數(shù)據(jù),這些存儲方式統(tǒng)一稱為狀態(tài)后端?(或狀態(tài)管理器):
主要有以下三種:
1.?MemoryStateBackend
默認(rèn)的方式,即基于?JVM?的堆內(nèi)存進行存儲,主要適用于本地開發(fā)和調(diào)試。
2.?FsStateBackend
基于文件系統(tǒng)進行存儲,可以是本地文件系統(tǒng),也可以是?HDFS?等分布式文件系統(tǒng)。?需要注意而是雖然選擇使用了?FsStateBackend?,但正在進行的數(shù)據(jù)仍然是存儲在?TaskManager?的內(nèi)存中的,只有在?checkpoint?時,才會將狀態(tài)快照寫入到指定文件系統(tǒng)上。
3.?RocksDBStateBackend
RocksDBStateBackend?是?Flink?內(nèi)置的第三方狀態(tài)管理器,采用嵌入式的?key-value?型數(shù)據(jù)庫?RocksDB?來存儲正在進行的數(shù)據(jù)。等到?checkpoint?時,再將其中的數(shù)據(jù)持久化到指定的文件系統(tǒng)中,所以采用?RocksDBStateBackend?時也需要配置持久化存儲的文件系統(tǒng)。之所以這樣做是因為?RocksDB?作為嵌入式數(shù)據(jù)庫安全性比較低,但比起全文件系統(tǒng)的方式,其讀取速率更快;比起全內(nèi)存的方式,其存儲空間更大,因此它是一種比較均衡的方案。
4.2?配置方式
Flink?支持使用兩種方式來配置后端管理器:
第一種方式:基于代碼方式進行配置,只對當(dāng)前作業(yè)生效:
//?配置?FsStateBackend
env.setStateBackend(new?FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
//?配置?RocksDBStateBackend
env.setStateBackend(new?RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
配置?RocksDBStateBackend?時,需要額外導(dǎo)入下面的依賴:文章來源:http://www.zghlxwxcb.cn/news/detail-723937.html
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.14.2</version>
</dependency>
第二種方式:基于?flink-conf.yaml?配置文件的方式進行配置,對所有部署在該集群上的作業(yè)都生效:文章來源地址http://www.zghlxwxcb.cn/news/detail-723937.html
state.backend:?filesystem
state.checkpoints.dir:?hdfs://namenode:40010/flink/checkpoints
到了這里,關(guān)于Flink狀態(tài)管理與檢查點機制的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!