背景
算子的聯(lián)合列表狀態(tài)是平時使用的比較少的一種狀態(tài),本文通過kafka的消費(fèi)者實(shí)現(xiàn)來看一下怎么使用算子列表聯(lián)合狀態(tài)
算子聯(lián)合列表狀態(tài)
首先我們看一下算子聯(lián)合列表狀態(tài)的在進(jìn)行故障恢復(fù)或者從某個保存點(diǎn)進(jìn)行擴(kuò)縮容啟動應(yīng)用時狀態(tài)的恢復(fù)情況
算子聯(lián)合列表狀態(tài)主要由這兩個方法處理:
1初始化方法文章來源:http://www.zghlxwxcb.cn/news/detail-718737.html
public final void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();
// 在初始化方法中獲取聯(lián)合列表狀態(tài)
this.unionOffsetStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
createStateSerializer(getRuntimeContext().getExecutionConfig())));
if (context.isRestored()) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把聯(lián)合列表狀態(tài)的數(shù)據(jù)都恢復(fù)成類的本地變量中
// populate actual holder for restored state
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}
LOG.info(
"Consumer subtask {} restored state: {}.",
getRuntimeContext().getIndexOfThisSubtask(),
restoredState);
} else {
LOG.info(
"Consumer subtask {} has no restore state.",
getRuntimeContext().getIndexOfThisSubtask());
}
}
2.開始通知檢查點(diǎn)開始的方法:文章來源地址http://www.zghlxwxcb.cn/news/detail-718737.html
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
unionOffsetStates.clear();
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
subscribedPartitionsToStartOffsets.entrySet()) {
// 進(jìn)行checkpoint時,把數(shù)據(jù)保存到聯(lián)合列表狀態(tài)中進(jìn)行保存
unionOffsetStates.add(
Tuple2.of(
subscribedPartition.getKey(), subscribedPartition.getValue()));
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call
// can happen
// on this function at a time: either snapshotState() or
// notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call
// can happen
// on this function at a time: either snapshotState() or
// notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
currentOffsets.entrySet()) {
unionOffsetStates.add(
Tuple2.of(
kafkaTopicPartitionLongEntry.getKey(),
kafkaTopicPartitionLongEntry.getValue()));
}
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
}
}
到了這里,關(guān)于從Flink的Kafka消費(fèi)者看算子聯(lián)合列表狀態(tài)的使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!