第8章
分流
1.使用側(cè)輸出流
2.合流
2.1 union :使用 ProcessFunction 處理合流后的數(shù)據(jù)
2.2 Connect :
兩條流的格式可以不一樣, map操作使用CoMapFunction,process 傳入:CoProcessFunction
2.2 BroadcastConnectedStream
keyBy 進行了按鍵分區(qū),那么要傳入的就是 KeyedBroadcastProcessFunction;
如果沒有按鍵分區(qū),就傳入 BroadcastProcessFunction
3.基于時間的合流——雙流聯(lián)結(jié)(Join)
3.1 窗口聯(lián)結(jié)(Window Join)
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
3.2 間隔聯(lián)結(jié)(Interval Join)
所以匹配的條件為:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
process函數(shù)傳入:ProcessJoinFunction
3.3 窗口同組聯(lián)結(jié)(Window CoGroup)
stream1.coGroup(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(<CoGroupFunction>)
第九章:狀態(tài)編程
1 狀態(tài)的分類:托管狀態(tài)(Managed State)和原始狀態(tài)(Raw State)
1.托管狀態(tài)分為兩類:算子狀態(tài)(Operator State)和按鍵分區(qū)狀態(tài)(Keyed State)
1.1算子狀態(tài)可以用在所有算子上,使用的時候其實就跟一個本地變量沒什么區(qū)別——因為本地變量的作用域也是當(dāng)前任務(wù)實例。在使用時,我們還需進一步實現(xiàn) CheckpointedFunction 接口。
ListState、UnionListState 和 BroadcastState
1.2 按鍵分區(qū)狀態(tài)(Keyed State):狀態(tài)是根據(jù)輸入流中定義的鍵(key)來維護和訪問的,所以只能定義在按鍵分區(qū)流(KeyedStream)中,也就 keyBy 之后才可以使用
支持的數(shù)據(jù)結(jié)構(gòu):值狀態(tài)(ValueState)、列表狀態(tài)(ListState)、映射狀態(tài)(MapState)、歸約狀態(tài)(ReducingState)、聚合狀態(tài)(AggregatingState)
open中聲明狀態(tài):getRuntimeContext.getMapState(new MapStateDescriptor[String,String]("my-map-state",classOf[String],classOf[String]))
2.狀態(tài)生存時間(TTL)
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10))//這就是設(shè)定的狀態(tài)生存時間
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//創(chuàng)建狀態(tài)和更改狀態(tài)(寫操作)時更新失效時間
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//表示從不返回過期值
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my
state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
3.狀態(tài)持久化和狀態(tài)后端
1. 狀態(tài)后端的分類:“哈希表狀態(tài)后端”(HashMapStateBackend)、內(nèi)嵌 RocksDB 狀態(tài)后端”(EmbeddedRocksDBStateBackend)。
第十章:檢查點(Checkpoint)文章來源:http://www.zghlxwxcb.cn/news/detail-657240.html
1.從檢查點來恢復(fù)狀態(tài)了。具體的步驟為:
(1)重啟應(yīng)用,所有任務(wù)的狀態(tài)會清空
(2)讀取檢查點,重置狀態(tài)。找到最近一次保存的檢查點,從中讀出每個算子任務(wù)狀態(tài)的快照,分別填充到對應(yīng)的狀態(tài)中。
(3)重放數(shù)據(jù):保存檢查點后開始重新讀取數(shù)據(jù),這可以通過 Source 任務(wù)向外部數(shù)據(jù)源重新提交偏移量(offset)來實現(xiàn)
(4)繼續(xù)處理數(shù)據(jù)
2.檢查點算法:
Flink 使用了 Chandy-Lamport 算法的一種變體,被稱為“異步分界線快照”(asynchronous barrier snapshotting)算法。算法的核心就是兩個原則:當(dāng)上游任務(wù)向多個并行
下游任務(wù)發(fā)送 barrier 時,需要廣播出去;而當(dāng)多個上游任務(wù)向同一個下游任務(wù)傳遞 barrier 時,
需要在下游任務(wù)執(zhí)行“分界線對齊”(barrier alignment)操作,也就是需要等到所有并行分區(qū)
的 barrier 都到齊,才可以開始狀態(tài)的保存。
具體過程如下:
(1)JobManager 發(fā)送指令,觸發(fā)檢查點的保存;Source 任務(wù)保存狀態(tài),插入分界線
(2)狀態(tài)快照保存完成,分界線向下游傳遞
(3)向下游多個并行子任務(wù)廣播分界線,執(zhí)行分界線對齊
(4)分界線對齊后,保存狀態(tài)到持久化存儲
(5)先處理緩存數(shù)據(jù),然后正常繼續(xù)處理
3 端到端精確一次(end-to-end exactly-once)
3.1 輸入端保證
外部數(shù)據(jù)源就必須擁有重放數(shù)據(jù)的能力
3.2輸出端保證
冪等寫入
事務(wù)寫入:預(yù)寫日志(WAL)和兩階段提交(2PC)
(1)預(yù)寫日志(write-ahead-log,WAL):缺點:再次確認可能會導(dǎo)致數(shù)據(jù)寫出成功,但是確認消息失敗,導(dǎo)致的數(shù)據(jù)重復(fù)寫入
①先把結(jié)果數(shù)據(jù)作為日志(log)狀態(tài)保存起來
②進行檢查點保存時,也會將這些結(jié)果數(shù)據(jù)一并做持久化存儲
③在收到檢查點完成的通知時,將所有結(jié)果一次性寫入外部系統(tǒng)。
(2)兩階段提交(two-phase-commit,2PC)文章來源地址http://www.zghlxwxcb.cn/news/detail-657240.html
具體的實現(xiàn)步驟為:
①當(dāng)?shù)谝粭l數(shù)據(jù)到來時,或者收到檢查點的分界線時,Sink 任務(wù)都會啟動一個事務(wù)。
②接下來接收到的所有數(shù)據(jù),都通過這個事務(wù)寫入外部系統(tǒng);這時由于事務(wù)沒有提交,所
以數(shù)據(jù)盡管寫入了外部系統(tǒng),但是不可用,是“預(yù)提交”的狀態(tài)。
③當(dāng) Sink 任務(wù)收到 JobManager 發(fā)來檢查點完成的通知時,正式提交事務(wù),寫入的結(jié)果就
真正可用了。
當(dāng)中間發(fā)生故障時,當(dāng)前未提交的事務(wù)就會回滾,于是所有寫入外部系統(tǒng)的數(shù)據(jù)也就實現(xiàn)
了撤回
2PC 對外部系統(tǒng)的要求
外部系統(tǒng)必須提供事務(wù)支持,或者 Sink 任務(wù)必須能夠模擬外部系統(tǒng)上的事務(wù)。
? 在檢查點的間隔期間里,必須能夠開啟一個事務(wù)并接受數(shù)據(jù)寫入。
? 在收到檢查點完成的通知之前,事務(wù)必須是“等待提交”的狀態(tài)。在故障恢復(fù)的情況
下,這可能需要一些時間。如果這個時候外部系統(tǒng)關(guān)閉事務(wù)(例如超時了),那么未
提交的數(shù)據(jù)就會丟失。
? Sink 任務(wù)必須能夠在進程失敗后恢復(fù)事務(wù)。
? 提交事務(wù)必須是冪等操作。也就是說,事務(wù)的重復(fù)提交應(yīng)該是無效的。
(3) kafka-flink-kafka 實現(xiàn)端到端 exactly-once 的具體過程可以分解如下
1.啟動檢查點保存:標志著我們進入了兩階段提交協(xié)議的“預(yù)提交”階段
2.算子任務(wù)對狀態(tài)做快照保存到狀態(tài)后端
3.Sink 任務(wù)開啟事務(wù),進行預(yù)提交
4.檢查點保存完成,提交事務(wù)
當(dāng)所有算子的快照都完成,JobManager 會向所有任務(wù)發(fā)確認通知,告訴大家當(dāng)前檢查點已成功保存,當(dāng) Sink 任務(wù)收到確認通知后,就會正式提交之前的事務(wù)
需要的配置:必須啟用檢查點、 FlinkKafkaProducer 的構(gòu)造函數(shù)中傳入?yún)?shù) Semantic.EXACTLY_ONCE、Kafka 讀取數(shù)據(jù)的消費者的隔離級別(read_committed)、事務(wù)超時配置
到了這里,關(guān)于Flink分流,合流,狀態(tài),checkpoint和精準一次筆記的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!