国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink流批一體計算(11):PyFlink Tabel API之TableEnvironment

這篇具有很好參考價值的文章主要介紹了Flink流批一體計算(11):PyFlink Tabel API之TableEnvironment。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

概述

設(shè)置重啟策略

什么是flink的重啟策略(Restartstrategy)

flink的重啟策略(Restartstrategy)實戰(zhàn)

flink的4種重啟策略

FixedDelayRestartstrategy(固定延時重啟策略)

FailureRateRestartstrategy(故障率重啟策略)

NoRestartstrategy(不重啟策略)

配置State Backends 以及 Checkpointing

Checkpoint

啟用和配置

選擇 State backend

MemoryStateBackend

FsStateBackend

RocksDBStateBackend

State backend比較

概述

編寫 Flink Python Table API 程序的第一步是創(chuàng)建 TableEnvironment。這是 Python Table API 作業(yè)的入口類。

get_config()返回 table config,可以通過 table config 來定義 Table API 的運行時行為。

t_env = TableEnvironment.create(Environmentsettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

設(shè)置重啟策略

在TableConfig中,通過設(shè)置鍵值選項來配置它們。

什么是flink的重啟策略(Restartstrategy)

Restartstrategy,重啟策略,在遇到機器或者代碼等不可預(yù)知的問題時導(dǎo)致 Job 或者 Task 掛掉的時候,它會根據(jù)配置的重啟策略將 Job 或者受影響的 Task 拉起來重新執(zhí)行,以使得作業(yè)恢復(fù)到之前正常執(zhí)行狀態(tài)。Flink 中的重啟策略決定了是否要重啟 Job 或者 Task,以及重啟的次數(shù)和每次重啟的時間間隔。

flink的重啟策略(Restartstrategy)實戰(zhàn)

flink的 Restartstrategy 作用是提升任務(wù)健壯性和容錯性,保證任務(wù)可以實時產(chǎn)出數(shù)據(jù)。

設(shè)置重啟策略和公司處理數(shù)據(jù)業(yè)務(wù)需求有很大的關(guān)系,根據(jù)不同的業(yè)務(wù)需求設(shè)置處理任務(wù)的不同策略。

其實遇到上面這種問題比較常見,比如有時候因為數(shù)據(jù)的問題(不合規(guī)范、為 null 等),這時在處理這些臟數(shù)據(jù)的時候可能就會遇到各種各樣的異常錯誤,比如空指針、數(shù)組越界、數(shù)據(jù)類型轉(zhuǎn)換錯誤等。

可能你會說只要過濾掉這種臟數(shù)據(jù)就行了,或者進行異常捕獲就不會導(dǎo)致 Job 不斷重啟的問題了。

所以日常開發(fā)中我們要盡力的保證代碼的健壯性,但是也要配置好 Flink Job 的 Restartstrategy(重啟策略)。

flink的4種重啟策略

默認的重啟策略是通過Flink的flink-conf.yaml來指定的,這個配置參數(shù)restart-strategy定義了哪種策略會被采用。

如果checkpoint未啟動,就會采用no restart策略,如果啟動了checkpoint機制,但是未指定重啟策略的話,就會采用fixed-delay策略,重試Integer.MAX_VALUE次。

配置參數(shù) restart-strategy 定義了哪個策略被使用。

固定間隔 (Fixed delay)

失敗率 (Failure rate)

無重啟 (No restart)

FixedDelayRestartstrategy(固定延時重啟策略)

FixedDelayRestartstrategy是固定延遲重啟策略,程序按照集群配置文件中或者程序中額外設(shè)置的重啟次數(shù)嘗試重啟作業(yè),如果嘗試次數(shù)超過了給定的最大次數(shù),程序還沒有起來,則停止作業(yè),另外還可以配置連續(xù)兩次重啟之間的等待時間。

在 flink-conf.yaml 中可以像下面這樣配置:

restart-strategy: fixed-delay

#表示作業(yè)重啟的最大次數(shù),啟用 checkpoint 的話是 Integer.MAX_VALUE,否則是 1。

restart-strategy.fixed-delay.attempts: 3

#如果設(shè)置分鐘可以類似 1 min,該參數(shù)表示兩次重啟之間的時間間隔,當(dāng)程序與外部系統(tǒng)有連接交互時延遲重啟可能會有幫助,啟用 checkpoint 的話,延遲重啟的時間是 10 秒,否則使用 akka.ask.timeout 的值。

restart-strategy.fixed-delay.delay: 10 s

python程序設(shè)置重啟策略為 "fixed-delay":

table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s")
FailureRateRestartstrategy(故障率重啟策略)

FailureRateRestartstrategy 是故障率重啟策略,在發(fā)生故障之后重啟作業(yè),如果固定時間間隔之內(nèi)發(fā)生故障的次數(shù)超過設(shè)置的值后,作業(yè)就會失敗停止,該重啟策略也支持設(shè)置連續(xù)兩次重啟之間的等待時間。

在 flink-conf.yaml 中可以像下面這樣配置:

restart-strategy: failure-rate

restart-strategy.failure-rate.max-failures-per-interval: 3?

#固定時間間隔內(nèi)允許的最大重啟次數(shù),默認 1

restart-strategy.failure-rate.failure-rate-interval: 5 min?

#固定時間間隔,默認 1 分鐘

restart-strategy.failure-rate.delay: 10 s

#連續(xù)兩次重啟嘗試之間的延遲時間,默認是 akka.ask.timeout

python程序設(shè)置重啟策略為 "fixed-delay":

table_env.get_config().get_configuration().set_string("restart-strategy", "failure-rate")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.delay", "1s")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.failure-rate-interval", "1 min")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.max-failures-per-interval", "1")
NoRestartstrategy(不重啟策略)

NoRestartstrategy 作業(yè)不重啟策略,直接失敗停止,在 flink-conf.yaml 中配置如下:

restart-strategy: none

配置State Backends 以及 Checkpointing

Checkpoint

Flink為了使 State 容錯,需要有 State checkpoint(狀態(tài)檢查點)。

Checkpoint 允許 Flink 恢復(fù)流的 State 和處理位置,從而為程序提供與無故障執(zhí)行相同的語義。

Checkpoint 使用的先決條件:

一個持久化的,能夠在一定時間范圍內(nèi)重放記錄的數(shù)據(jù)源。

例如,持久化消息隊列:Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub 或文件系統(tǒng):HDFS,S3,GFS,NFS,Ceph...

State 持久化存儲系統(tǒng),通常是分布式文件系統(tǒng):HDFS,S3,GFS,NFS,Ceph...

啟用和配置

Checkpoint 默認情況下是不啟用的。StreamExecutionEnvironment 對象調(diào)用 enableCheckpointing(n) 啟用 Checkpoint,其中n是以毫秒為單位的 Checkpoint 間隔。

Checkpoint 的配置項包括:

#設(shè)置 checkpoint 模式為 EXACTLY_ONCE

#Checkpoint 支持這兩種模式:恰好一次exactly-once或至少一次at-least-once

#對于大多數(shù)應(yīng)用來說,EXACTLY_ONCE是優(yōu)選的。at-least-once可能在某些要求超低延遲(幾毫秒)的應(yīng)用程序使用。

table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")

#Checkpoint最小間隔時間,設(shè)置為5000,表示在上一個 checkpoint 完成后的至少5秒后才會啟動下一個 checkpoint

table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min")

#Checkpoint 超時時間:在超時時間內(nèi) checkpoint 未完成,則中止正在進行的 checkpoint。

table_env.get_config().get_configuration().set_string("execution.checkpointing.timeout", "10min")

#Checkpoint并發(fā)數(shù):最多可以同時運行checkpoint的數(shù)量,當(dāng)達到最大值,必須其中一個完成,才能新啟一個。

table_env.get_config().get_configuration().set_string("execution.checkpointing.max-concurrent-checkpoints", "2")
選擇 State backend

Checkpoint 的存儲的位置取決于配置的 State backend(JobManager 內(nèi)存,文件系統(tǒng),數(shù)據(jù)庫...)。

默認情況下,State 存儲在 TaskManager 內(nèi)存中,Checkpoint 存儲在 JobManager 內(nèi)存中。

Flink 自帶了以下幾種開箱即用的 state backend:

MemoryStateBackend

FsStateBackend

RocksDBStateBackend

在沒有配置的情況下,系統(tǒng)默認使用 MemoryStateBackend

MemoryStateBackend

使用 MemoryStateBackend,在 checkpoint 中對 State 做一次快照,并在向 JobManager 發(fā)送 checkpoint 確認完成的消息中帶上此快照數(shù)據(jù),然后快照就會存儲在 JobManager 的內(nèi)存堆中。

FsStateBackend

FsStateBackend 需要配置一個文件系統(tǒng)的URL來,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

FsStateBackend 在 TaskManager 的內(nèi)存中持有正在處理的數(shù)據(jù)。

Checkpoint 時將 state snapshot 寫入文件系統(tǒng)目錄下的文件中,文件的路徑會傳遞給 JobManager,存在其內(nèi)存中。

FsStateBackend 默認是異步操作,以避免在寫 state snapshot 時阻塞處理程序。如果要禁用異步,可以在 FsStateBackend 構(gòu)造函數(shù)中設(shè)置

RocksDBStateBackend

RocksDBStateBackend 需要配置一個文件系統(tǒng)的URL來,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

RocksDBStateBackend 在 RocksDB 中持有正在處理的數(shù)據(jù),RocksDB 在 TaskManager 的數(shù)據(jù)目錄下。

Checkpoint 時將整個 RocksDB 寫入文件系統(tǒng)目錄下的文件中,文件的路徑會傳遞給 JobManager,存在其內(nèi)存中。

RocksDBStateBackend 通常也是異步的。目前唯一支持增量 checkpoint。

使用 RocksDB 可以保存的狀態(tài)量僅受可用磁盤空間量的限制。這也意味著可以實現(xiàn)的最大吞吐量更低,后臺的所有讀/寫都必須通過序列化和反序列化來檢索/存儲 State,這也比使用基于堆內(nèi)存的方式代價更昂貴。

State backend比較

StateBackend

in-flight

checkpoint

吞吐

推薦使用場景

MemoryStateBackend?

TM Memory

JM Memory

調(diào)試、無狀態(tài)或?qū)?shù)據(jù)丟失或重復(fù)無要求

FsStateBackend?????

TM Memory

FS/HDFS

普通狀態(tài)、窗口、KV 結(jié)構(gòu)

RocksDBStateBackend

RocksDB on TM

FS/HDFS

超大狀態(tài)、超長窗口、大型 KV 結(jié)構(gòu)

# 設(shè)置 statebackend 類型為 "rocksdb",其他可選項有 "filesystem" "jobmanager"

# 你也可以將這個屬性設(shè)置為 StateBackendFactory 的完整類名

# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

table_env.get_config().get_configuration().set_string("state.backend", "rocksdb")

# 設(shè)置 RocksDB statebackend 所需要的 checkpoint 目錄文章來源地址http://www.zghlxwxcb.cn/news/detail-539189.html

table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/")

到了這里,關(guān)于Flink流批一體計算(11):PyFlink Tabel API之TableEnvironment的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink流批一體計算(15):PyFlink Tabel API之SQL寫入Sink

    目錄 舉個例子 寫入Sink的各種情況 1. 將結(jié)果數(shù)據(jù)收集到客戶端 2. 將結(jié)果數(shù)據(jù)轉(zhuǎn)換為Pandas DataFrame,并收集到客戶端 3. 將結(jié)果寫入到一張 Sink 表中 4. 將結(jié)果寫入多張 Sink 表中 舉個例子 將計算結(jié)果寫入給 sink 表 寫入Sink的各種情況 1. 將結(jié)果數(shù)據(jù)收集到客戶端 你可以使用 TableR

    2024年02月11日
    瀏覽(18)
  • Flink流批一體計算(16):PyFlink DataStream API

    Flink流批一體計算(16):PyFlink DataStream API

    目錄 概述 Pipeline Dataflow 代碼示例WorldCount.py 執(zhí)行腳本W(wǎng)orldCount.py 概述 Apache Flink 提供了 DataStream API,用于構(gòu)建健壯的、有狀態(tài)的流式應(yīng)用程序。它提供了對狀態(tài)和時間細粒度控制,從而允許實現(xiàn)高級事件驅(qū)動系統(tǒng)。 用戶實現(xiàn)的Flink程序是由Stream和Transformation這兩個基本構(gòu)建塊組

    2024年02月11日
    瀏覽(25)
  • Flink流批一體計算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目錄 StreamExecutionEnvironment Watermark watermark策略簡介 使用 Watermark 策略 內(nèi)置水印生成器 處理空閑數(shù)據(jù)源 算子處理 Watermark 的方式 創(chuàng)建DataStream的方式 通過list對象創(chuàng)建 ??????使用DataStream connectors創(chuàng)建 使用Table SQL connectors創(chuàng)建 StreamExecutionEnvironment 編寫一個 Flink Python DataSt

    2024年02月11日
    瀏覽(54)
  • Flink流批一體計算(19):PyFlink DataStream API之State

    目錄 keyed state Keyed DataStream 使用 Keyed State 實現(xiàn)了一個簡單的計數(shù)窗口 狀態(tài)有效期 (TTL) 過期數(shù)據(jù)的清理 全量快照時進行清理 增量數(shù)據(jù)清理 在 RocksDB 壓縮時清理 Operator State算子狀態(tài) Broadcast State廣播狀態(tài) keyed state Keyed DataStream 使用 keyed state,首先需要為DataStream指定 key(主鍵)

    2024年02月10日
    瀏覽(43)
  • Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    目錄 1. 在上節(jié)數(shù)據(jù)流上執(zhí)行轉(zhuǎn)換操作,或者使用 sink 將數(shù)據(jù)寫入外部系統(tǒng)。 2. File Sink File Sink Format Types? Row-encoded Formats? Bulk-encoded Formats? 桶分配 滾動策略 3. 如何輸出結(jié)果 Print 集合數(shù)據(jù)到客戶端,execute_and_collect方法將收集數(shù)據(jù)到客戶端內(nèi)存 將結(jié)果發(fā)送到DataStream sink conne

    2024年02月11日
    瀏覽(23)
  • Flink流批一體計算(20):DataStream API和Table API互轉(zhuǎn)

    目錄 舉個例子 連接器 下載連接器(connector)和格式(format)jar 包 依賴管理 ?如何使用連接器 舉個例子 StreamExecutionEnvironment 集成了DataStream API,通過額外的函數(shù)擴展了TableEnvironment。 下面代碼演示兩種API如何互轉(zhuǎn) TableEnvironment 將采用StreamExecutionEnvironment所有的配置選項。 建

    2024年02月10日
    瀏覽(24)
  • Flink流批一體計算(1):流批一體和Flink概述

    Apache Flink應(yīng)運而生 數(shù)字化經(jīng)濟革命的浪潮正在顛覆性地改變著人類的工作方式和生活方式,數(shù)字化經(jīng)濟在全球經(jīng)濟增長中扮演著越來越重要的角色,以互聯(lián)網(wǎng)、云計算、大數(shù)據(jù)、物聯(lián)網(wǎng)、人工智能為代表的數(shù)字技術(shù)近幾年發(fā)展迅猛,數(shù)字技術(shù)與傳統(tǒng)產(chǎn)業(yè)的深度融合釋放出巨大

    2024年02月10日
    瀏覽(23)
  • flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)

    flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)

    前言:今天是學(xué)習(xí)flink的第四天啦!學(xué)習(xí)了物理分區(qū)的知識點,這一次學(xué)習(xí)了前4個簡單的物理分區(qū),稱之為簡單分區(qū)篇! Tips:我相信自己會越來會好的,明天攻克困難分區(qū)篇,加油! 3. 物理分區(qū) 3.1 Global Partitioner 該分區(qū)器會將所有的數(shù)據(jù)都發(fā)送到下游的某個算子實例(subta

    2024年02月19日
    瀏覽(23)
  • flink重溫筆記(五):Flink 流批一體 API 開發(fā)——物理分區(qū)(下)

    flink重溫筆記(五):Flink 流批一體 API 開發(fā)——物理分區(qū)(下)

    前言 :今天是學(xué)習(xí) flink 的第五天啦! 主要學(xué)習(xí)了物理分區(qū)較難理解的部分,在這個部分的三個分區(qū)的學(xué)習(xí)中, rescale partition 和 forward partition 其原理可以歸類 pointwise 模式,其他的 partition 其原理可以歸類 all_to_all 模式,而比較有趣的是 custom partitioning,這個可以進行根據(jù)值

    2024年02月19日
    瀏覽(21)
  • Flink流批一體計算(7):Flink優(yōu)化

    目錄 配置內(nèi)存 設(shè)置并行度 操作場景 具體設(shè)置 補充 配置進程參數(shù) 操作場景 具體配置 配置netty網(wǎng)絡(luò)通信 操作場景 具體配置 配置內(nèi)存 Flink 是依賴內(nèi)存計算,計算過程中內(nèi)存不夠?qū)?Flink 的執(zhí)行效率影響很大。可以通過監(jiān)控 GC ( Garbage Collection ),評估內(nèi)存使用及剩余情況來判

    2024年02月12日
    瀏覽(54)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包