前言
狀態(tài)在Flink中叫做State,用來保存中間計(jì)算結(jié)果或者緩存數(shù)據(jù)。要做到比較好的狀態(tài)管理,需要考慮以下幾點(diǎn)內(nèi)容:
- 狀態(tài)數(shù)據(jù)的存儲和訪問
在Task內(nèi)部,如何高效地保存狀態(tài)數(shù)據(jù)和使用狀態(tài)數(shù)據(jù)。 - 狀態(tài)數(shù)據(jù)的備份和恢復(fù)
作業(yè)失敗是無法避免的,那么就要考慮如何高效地將狀態(tài)數(shù)據(jù)保存下來,避免狀態(tài)備份降低集群的吞吐量,并且在Failover時恢復(fù)作業(yè)到失敗前的狀態(tài)。 - 狀態(tài)數(shù)據(jù)的劃分和動態(tài)擴(kuò)容
作業(yè)在集群內(nèi)并行執(zhí)行那么就要思考對于作業(yè)的Task而言如何使用統(tǒng)一的方式對狀態(tài)數(shù)據(jù)進(jìn)行切分,在作業(yè)修改并行度導(dǎo)致Task數(shù)據(jù)改變的時候,如何確保正確地恢復(fù)。
一、狀態(tài)分類
State按照是否有Key劃分KeyedState和OperatorState兩種。按照數(shù)據(jù)結(jié)構(gòu)不同,flink定義了多種state,分別應(yīng)用于不同的場景,具體實(shí)現(xiàn)如下:ValueState、ListState、MapState、ReducingState、AggregatingState。
-
ValueState: 保存一個可以更新和檢索的值(如上所述,每個值都對應(yīng)到當(dāng)前的輸入數(shù)據(jù)的 key,因此算子接收到的每個 key 都可能對應(yīng)一個值)。 這個值可以通過 update(T) 進(jìn)行更新,通過 T value() 進(jìn)行檢索。
-
ListState: 保存一個元素的列表。可以往這個列表中追加數(shù)據(jù),并在當(dāng)前的列表上進(jìn)行檢索。可以通過 add(T) 或者 addAll(List) 進(jìn)行添加元素,通過 Iterable get() 獲得整個列表。還可以通過 update(List) 覆蓋當(dāng)前的列表。
-
ReducingState: 保存一個單值,表示添加到狀態(tài)的所有值的聚合。接口與 ListState 類似,但使用 add(T) 增加元素,會使用提供的 ReduceFunction 進(jìn)行聚合。
-
AggregatingState<IN, OUT>: 保留一個單值,表示添加到狀態(tài)的所有值的聚合。和 ReducingState 相反的是, 聚合類型可能與 添加到狀態(tài)的元素的類型不同。 接口與 ListState 類似,但使用 add(IN) 添加的元素會用指定的 AggregateFunction 進(jìn)行聚合。
-
MapState<UK, UV>: 維護(hù)了一個映射列表。 你可以添加鍵值對到狀態(tài)中,也可以獲得反映當(dāng)前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 檢索特定 key。 使用 entries(),keys() 和 values() 分別檢索映射、鍵和值的可迭代視圖。你還可以通過 isEmpty() 來判斷是否包含任何鍵值對。
二、keyed代碼示例
更多代碼示例請下載Flink State體系剖析以及案例實(shí)踐文章來源:http://www.zghlxwxcb.cn/news/detail-796627.html
ListState
代碼如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-796627.html
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 需求:當(dāng)接收到的相同 key 的元素個數(shù)等于 3個,就計(jì)算這些元素的 value 的平均值。
* 計(jì)算keyed stream中每3個元素的 value 的平均值
*/
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置并行度
env.setParallelism(12);
//獲取數(shù)據(jù)源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L),
Tuple2.of(1L, 5L),
Tuple2.of(2L, 2L),
Tuple2.of(2L, 6L));
/**
* 1L, 3L
* 1L, 7L
* 1L, 5L
*
* 1L,5.0 double
*
* 2L, 4L
* 2L, 2L
* 2L, 6L
*
* 2L,4.0 double
*
*
*/
// 輸出:
//(1,5.0)
//(2,4.0)
dataStreamSource
.keyBy(tuple -> tuple.f0) //分組
.flatMap(new CountAverageWithListState())
.print();
env.execute("TestStatefulApi");
}
}
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
* ListState<T> :這個狀態(tài)為每一個 key 保存集合的值
* get() 獲取狀態(tài)值
* add() / addAll() 更新狀態(tài)值,將數(shù)據(jù)放到狀態(tài)中
* clear() 清除狀態(tài)
*/
public class CountAverageWithListState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
// managed keyed state
/**
* ValueState : 里面只能存一條元素
* ListState : 里面可以存很多數(shù)據(jù)
*/
private ListState<Tuple2<Long, Long>> elementsByKey;
@Override
public void open(Configuration parameters) throws Exception {
// 注冊狀態(tài)
ListStateDescriptor<Tuple2<Long, Long>> descriptor =
new ListStateDescriptor<Tuple2<Long, Long>>(
"average", // 狀態(tài)的名字
Types.TUPLE(Types.LONG, Types.LONG)); // 狀態(tài)存儲的數(shù)據(jù)類型
elementsByKey = getRuntimeContext().getListState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Double>> out) throws Exception {
// 拿到當(dāng)前的 key 的狀態(tài)值
Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();
// 如果狀態(tài)值還沒有初始化,則初始化
if (currentState == null) {
elementsByKey.addAll(Collections.emptyList());
}
// 更新狀態(tài)
elementsByKey.add(element);
// 判斷,如果當(dāng)前的 key 出現(xiàn)了 3 次,則需要計(jì)算平均值,并且輸出
List<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());
if (allElements.size() == 3) {
long count = 0;
long sum = 0;
for (Tuple2<Long, Long> ele : allElements) {
count++;
sum += ele.f1;
}
double avg = (double) sum / count;
out.collect(Tuple2.of(element.f0, avg));
// 清除狀態(tài)
elementsByKey.clear();
}
}
}
MapState
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.UUID;
/**
* MapState<K, V> :這個狀態(tài)為每一個 key 保存一個 Map 集合
* put() 將對應(yīng)的 key 的鍵值對放到狀態(tài)中
* values() 拿到 MapState 中所有的 value
* clear() 清除狀態(tài)
*/
public class CountAverageWithMapState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
// managed keyed state
//1. MapState :key 是一個唯一的值,value 是接收到的相同的 key 對應(yīng)的 value 的值
/**
* MapState:
* Map集合的特點(diǎn),相同key,會覆蓋數(shù)據(jù)。
*/
private MapState<String, Long> mapState;
@Override
public void open(Configuration parameters) throws Exception {
// 注冊狀態(tài)
MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<String, Long>(
"average", // 狀態(tài)的名字
String.class, Long.class); // 狀態(tài)存儲的數(shù)據(jù)類型
mapState = getRuntimeContext().getMapState(descriptor);
}
/**
*
* @param element
* @param out
* @throws Exception
*/
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Double>> out) throws Exception {
mapState.put(UUID.randomUUID().toString(), element.f1); //list
// 判斷,如果當(dāng)前的 key 出現(xiàn)了 3 次,則需要計(jì)算平均值,并且輸出
List<Long> allElements = Lists.newArrayList(mapState.values());
if (allElements.size() == 3) {
long count = 0;
long sum = 0;
for (Long ele : allElements) {
count++;
sum += ele;
}
double avg = (double) sum / count;
//
out.collect(Tuple2.of(element.f0, avg));
// 清除狀態(tài)
mapState.clear();
}
}
}
總結(jié)
- 是否存在當(dāng)前處理的 key(current key):operator state 是沒有當(dāng)前 key 的概念,而 keyed
state 的數(shù)值總是與一個 current key 對應(yīng)。 - 存儲對象是否 on heap: 目前 operator state backend 僅有一種 on-heap 的實(shí)現(xiàn);而 keyed state
backend 有 on-heap 和 off-heap(RocksDB)的多種實(shí)現(xiàn)。 - 是否需要手動聲明快照(snapshot)和恢復(fù) (restore) 方法:operator state 需要手動實(shí)現(xiàn)
snapshot 和 restore 方法;而 keyed state 則由 backend 自行實(shí)現(xiàn),對用戶透明。 - 數(shù)據(jù)大?。阂话愣裕覀冋J(rèn)為 operator state 的數(shù)據(jù)規(guī)模是比較小的;認(rèn)為 keyed state 規(guī)模是
相對比較大的。需要注意的是,這是一個經(jīng)驗(yàn)判斷,不是一個絕對的判斷區(qū)分標(biāo)準(zhǔn)。
更多內(nèi)容和代碼示例請下載Flink State體系剖析以及案例實(shí)踐
到了這里,關(guān)于Flink State 狀態(tài)管理的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!