目錄
概述
設(shè)置重啟策略
什么是flink的重啟策略(Restartstrategy)
flink的重啟策略(Restartstrategy)實戰(zhàn)
flink的4種重啟策略
FixedDelayRestartstrategy(固定延時重啟策略)
FailureRateRestartstrategy(故障率重啟策略)
NoRestartstrategy(不重啟策略)
配置State Backends 以及 Checkpointing
Checkpoint
啟用和配置
選擇 State backend
MemoryStateBackend
FsStateBackend
RocksDBStateBackend
State backend比較
概述
編寫 Flink Python Table API 程序的第一步是創(chuàng)建 TableEnvironment。這是 Python Table API 作業(yè)的入口類。
get_config()返回 table config,可以通過 table config 來定義 Table API 的運行時行為。
t_env = TableEnvironment.create(Environmentsettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
設(shè)置重啟策略
在TableConfig中,通過設(shè)置鍵值選項來配置它們。
什么是flink的重啟策略(Restartstrategy)
Restartstrategy,重啟策略,在遇到機器或者代碼等不可預(yù)知的問題時導(dǎo)致 Job 或者 Task 掛掉的時候,它會根據(jù)配置的重啟策略將 Job 或者受影響的 Task 拉起來重新執(zhí)行,以使得作業(yè)恢復(fù)到之前正常執(zhí)行狀態(tài)。Flink 中的重啟策略決定了是否要重啟 Job 或者 Task,以及重啟的次數(shù)和每次重啟的時間間隔。
flink的重啟策略(Restartstrategy)實戰(zhàn)
flink的 Restartstrategy 作用是提升任務(wù)健壯性和容錯性,保證任務(wù)可以實時產(chǎn)出數(shù)據(jù)。
設(shè)置重啟策略和公司處理數(shù)據(jù)業(yè)務(wù)需求有很大的關(guān)系,根據(jù)不同的業(yè)務(wù)需求設(shè)置處理任務(wù)的不同策略。
其實遇到上面這種問題比較常見,比如有時候因為數(shù)據(jù)的問題(不合規(guī)范、為 null 等),這時在處理這些臟數(shù)據(jù)的時候可能就會遇到各種各樣的異常錯誤,比如空指針、數(shù)組越界、數(shù)據(jù)類型轉(zhuǎn)換錯誤等。
可能你會說只要過濾掉這種臟數(shù)據(jù)就行了,或者進行異常捕獲就不會導(dǎo)致 Job 不斷重啟的問題了。
所以日常開發(fā)中我們要盡力的保證代碼的健壯性,但是也要配置好 Flink Job 的 Restartstrategy(重啟策略)。
flink的4種重啟策略
默認的重啟策略是通過Flink的flink-conf.yaml來指定的,這個配置參數(shù)restart-strategy定義了哪種策略會被采用。
如果checkpoint未啟動,就會采用no restart策略,如果啟動了checkpoint機制,但是未指定重啟策略的話,就會采用fixed-delay策略,重試Integer.MAX_VALUE次。
配置參數(shù) restart-strategy 定義了哪個策略被使用。
固定間隔 (Fixed delay)
失敗率 (Failure rate)
無重啟 (No restart)
FixedDelayRestartstrategy(固定延時重啟策略)
FixedDelayRestartstrategy是固定延遲重啟策略,程序按照集群配置文件中或者程序中額外設(shè)置的重啟次數(shù)嘗試重啟作業(yè),如果嘗試次數(shù)超過了給定的最大次數(shù),程序還沒有起來,則停止作業(yè),另外還可以配置連續(xù)兩次重啟之間的等待時間。
在 flink-conf.yaml 中可以像下面這樣配置:
restart-strategy: fixed-delay
#表示作業(yè)重啟的最大次數(shù),啟用 checkpoint 的話是 Integer.MAX_VALUE,否則是 1。
restart-strategy.fixed-delay.attempts: 3
#如果設(shè)置分鐘可以類似 1 min,該參數(shù)表示兩次重啟之間的時間間隔,當(dāng)程序與外部系統(tǒng)有連接交互時延遲重啟可能會有幫助,啟用 checkpoint 的話,延遲重啟的時間是 10 秒,否則使用 akka.ask.timeout 的值。
restart-strategy.fixed-delay.delay: 10 s
python程序設(shè)置重啟策略為 "fixed-delay":
table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s")
FailureRateRestartstrategy(故障率重啟策略)
FailureRateRestartstrategy 是故障率重啟策略,在發(fā)生故障之后重啟作業(yè),如果固定時間間隔之內(nèi)發(fā)生故障的次數(shù)超過設(shè)置的值后,作業(yè)就會失敗停止,該重啟策略也支持設(shè)置連續(xù)兩次重啟之間的等待時間。
在 flink-conf.yaml 中可以像下面這樣配置:
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3?
#固定時間間隔內(nèi)允許的最大重啟次數(shù),默認 1
restart-strategy.failure-rate.failure-rate-interval: 5 min?
#固定時間間隔,默認 1 分鐘
restart-strategy.failure-rate.delay: 10 s
#連續(xù)兩次重啟嘗試之間的延遲時間,默認是 akka.ask.timeout
python程序設(shè)置重啟策略為 "fixed-delay":
table_env.get_config().get_configuration().set_string("restart-strategy", "failure-rate")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.delay", "1s")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.failure-rate-interval", "1 min")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.max-failures-per-interval", "1")
NoRestartstrategy(不重啟策略)
NoRestartstrategy 作業(yè)不重啟策略,直接失敗停止,在 flink-conf.yaml 中配置如下:
restart-strategy: none
配置State Backends 以及 Checkpointing
Checkpoint
Flink為了使 State 容錯,需要有 State checkpoint(狀態(tài)檢查點)。
Checkpoint 允許 Flink 恢復(fù)流的 State 和處理位置,從而為程序提供與無故障執(zhí)行相同的語義。
Checkpoint 使用的先決條件:
一個持久化的,能夠在一定時間范圍內(nèi)重放記錄的數(shù)據(jù)源。
例如,持久化消息隊列:Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub 或文件系統(tǒng):HDFS,S3,GFS,NFS,Ceph...
State 持久化存儲系統(tǒng),通常是分布式文件系統(tǒng):HDFS,S3,GFS,NFS,Ceph...
啟用和配置
Checkpoint 默認情況下是不啟用的。StreamExecutionEnvironment 對象調(diào)用 enableCheckpointing(n) 啟用 Checkpoint,其中n是以毫秒為單位的 Checkpoint 間隔。
Checkpoint 的配置項包括:
#設(shè)置 checkpoint 模式為 EXACTLY_ONCE
#Checkpoint 支持這兩種模式:恰好一次exactly-once或至少一次at-least-once
#對于大多數(shù)應(yīng)用來說,EXACTLY_ONCE是優(yōu)選的。at-least-once可能在某些要求超低延遲(幾毫秒)的應(yīng)用程序使用。
table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
#Checkpoint最小間隔時間,設(shè)置為5000,表示在上一個 checkpoint 完成后的至少5秒后才會啟動下一個 checkpoint
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min")
#Checkpoint 超時時間:在超時時間內(nèi) checkpoint 未完成,則中止正在進行的 checkpoint。
table_env.get_config().get_configuration().set_string("execution.checkpointing.timeout", "10min")
#Checkpoint并發(fā)數(shù):最多可以同時運行checkpoint的數(shù)量,當(dāng)達到最大值,必須其中一個完成,才能新啟一個。
table_env.get_config().get_configuration().set_string("execution.checkpointing.max-concurrent-checkpoints", "2")
選擇 State backend
Checkpoint 的存儲的位置取決于配置的 State backend(JobManager 內(nèi)存,文件系統(tǒng),數(shù)據(jù)庫...)。
默認情況下,State 存儲在 TaskManager 內(nèi)存中,Checkpoint 存儲在 JobManager 內(nèi)存中。
Flink 自帶了以下幾種開箱即用的 state backend:
MemoryStateBackend
FsStateBackend
RocksDBStateBackend
在沒有配置的情況下,系統(tǒng)默認使用 MemoryStateBackend
MemoryStateBackend
使用 MemoryStateBackend,在 checkpoint 中對 State 做一次快照,并在向 JobManager 發(fā)送 checkpoint 確認完成的消息中帶上此快照數(shù)據(jù),然后快照就會存儲在 JobManager 的內(nèi)存堆中。
FsStateBackend
FsStateBackend 需要配置一個文件系統(tǒng)的URL來,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。
FsStateBackend 在 TaskManager 的內(nèi)存中持有正在處理的數(shù)據(jù)。
Checkpoint 時將 state snapshot 寫入文件系統(tǒng)目錄下的文件中,文件的路徑會傳遞給 JobManager,存在其內(nèi)存中。
FsStateBackend 默認是異步操作,以避免在寫 state snapshot 時阻塞處理程序。如果要禁用異步,可以在 FsStateBackend 構(gòu)造函數(shù)中設(shè)置
RocksDBStateBackend
RocksDBStateBackend 需要配置一個文件系統(tǒng)的URL來,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。
RocksDBStateBackend 在 RocksDB 中持有正在處理的數(shù)據(jù),RocksDB 在 TaskManager 的數(shù)據(jù)目錄下。
Checkpoint 時將整個 RocksDB 寫入文件系統(tǒng)目錄下的文件中,文件的路徑會傳遞給 JobManager,存在其內(nèi)存中。
RocksDBStateBackend 通常也是異步的。目前唯一支持增量 checkpoint。
使用 RocksDB 可以保存的狀態(tài)量僅受可用磁盤空間量的限制。這也意味著可以實現(xiàn)的最大吞吐量更低,后臺的所有讀/寫都必須通過序列化和反序列化來檢索/存儲 State,這也比使用基于堆內(nèi)存的方式代價更昂貴。
State backend比較
StateBackend |
in-flight |
checkpoint |
吞吐 |
推薦使用場景 |
MemoryStateBackend? |
TM Memory |
JM Memory |
高 |
調(diào)試、無狀態(tài)或?qū)?shù)據(jù)丟失或重復(fù)無要求 |
FsStateBackend????? |
TM Memory |
FS/HDFS |
高 |
普通狀態(tài)、窗口、KV 結(jié)構(gòu) |
RocksDBStateBackend |
RocksDB on TM |
FS/HDFS |
低 |
超大狀態(tài)、超長窗口、大型 KV 結(jié)構(gòu) |
# 設(shè)置 statebackend 類型為 "rocksdb",其他可選項有 "filesystem" 和 "jobmanager"
# 你也可以將這個屬性設(shè)置為 StateBackendFactory 的完整類名
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory文章來源:http://www.zghlxwxcb.cn/news/detail-539189.html
table_env.get_config().get_configuration().set_string("state.backend", "rocksdb")
# 設(shè)置 RocksDB statebackend 所需要的 checkpoint 目錄文章來源地址http://www.zghlxwxcb.cn/news/detail-539189.html
table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/")
到了這里,關(guān)于Flink流批一體計算(11):PyFlink Tabel API之TableEnvironment的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!