国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink流批一體計算(17):PyFlink DataStream API之StreamExecutionEnvironment

這篇具有很好參考價值的文章主要介紹了Flink流批一體計算(17):PyFlink DataStream API之StreamExecutionEnvironment。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

StreamExecutionEnvironment

Watermark

watermark策略簡介

使用 Watermark 策略

內(nèi)置水印生成器

處理空閑數(shù)據(jù)源

算子處理 Watermark 的方式

創(chuàng)建DataStream的方式

通過list對象創(chuàng)建

??????使用DataStream connectors創(chuàng)建

使用Table & SQL connectors創(chuàng)建


StreamExecutionEnvironment

編寫一個 Flink Python DataStream API 程序,首先需要聲明一個執(zhí)行環(huán)境StreamExecutionEnvironment,這是流式程序執(zhí)行的上下文。

你將通過它來設(shè)置作業(yè)的屬性(例如默認(rèn)并發(fā)度、重啟策略等)、創(chuàng)建源、并最終觸發(fā)作業(yè)的執(zhí)行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)

創(chuàng)建了 StreamExecutionEnvironment 之后,你可以使用它來聲明數(shù)據(jù)源。數(shù)據(jù)源從外部系統(tǒng)(如 Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取數(shù)據(jù)到 Flink 作業(yè)里。

為了簡單起見,本教程讀取文件作為數(shù)據(jù)源。

ds = env.from_source(
??? source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
?????????????????????? ????????????????????????input_path)
???????????????????? .process_static_file_set().build(),
??? watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
??? source_name="file_source"
)

Watermark

大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。

為了解決亂序數(shù)據(jù),flink引入watermark。引入watermark機(jī)制則會等待晚到的數(shù)據(jù)一段時間,等待時間到則觸發(fā)計算,如果數(shù)據(jù)延遲很大,通常也會被丟棄或者另外處理。

為了使用事件時間語義,Flink 應(yīng)用程序需要知道事件時間戳對應(yīng)的字段,意味著數(shù)據(jù)流中的每個元素都需要擁有可分配的事件時間戳。其通常通過使用 TimestampAssigner API 從元素中的某個字段去訪問/提取時間戳。

watermark策略簡介

時間戳的分配與 watermark 的生成是齊頭并進(jìn)的,其可以告訴 Flink 應(yīng)用程序事件時間的進(jìn)度。其可以通過指定 WatermarkGenerator 來配置 watermark 的生成方式。

使用 Flink API 時需要設(shè)置一個同時包含 TimestampAssigner WatermarkGenerator WatermarkStrategy。WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場景下構(gòu)建自己的 watermark 策略。

使用 Watermark 策略

WatermarkStrategy 可以在 Flink 應(yīng)用程序中的兩處使用,第一種是直接在數(shù)據(jù)源上使用,第二種是直接在非數(shù)據(jù)源的操作之后使用。

第一種方式相比會更好,因為數(shù)據(jù)源可以利用 watermark 生成邏輯中有關(guān)分片/分區(qū)(shards/partitions/splits)的信息。使用這種方式,數(shù)據(jù)源通??梢愿珳?zhǔn)地跟蹤 watermark,整體 watermark 生成將更精確。直接在源上指定 WatermarkStrategy 意味著你必須使用特定數(shù)據(jù)源接口。

僅當(dāng)無法直接在數(shù)據(jù)源上設(shè)置策略時,才應(yīng)該使用第二種方式(在任意轉(zhuǎn)換操作之后設(shè)置 WatermarkStrategy

內(nèi)置水印生成器

水印策略定義了如何在流源中生成水印。WatermarkStrategy是生成水印的WatermarkGenerator和分配記錄內(nèi)部時間戳的TimestampAssigner的生成器/工廠。

BoundedOutOfOrdernessDuration),為創(chuàng)建WatermarkStrategy常見的內(nèi)置策略。

for_bound_out_of_ordernness(max_out_of_ordernesspyflink.common.time.Duration)為記錄無序的情況創(chuàng)建水印策略,但可以設(shè)置事件無序程度的上限。

無序綁定B意味著一旦遇到時間戳為T的事件,就不會再出現(xiàn)早于(T-B)的事件。

for_bound_out_of_ordernness(5)

for_mononous_timestamps()為時間戳單調(diào)遞增的情況創(chuàng)建水印策略。

水印是定期生成的,并嚴(yán)格遵循數(shù)據(jù)中的最新時間戳。該策略引入的延遲主要是生成水印的周期間隔。

WatermarkStrategy.for_monotonous_timestamps()

with_timestamp_assigner(timestamp_assigner:pyflink.common.watermark_strategy.TimestampAssigner)

創(chuàng)建一個新的WatermarkStrategy,該策略通過實現(xiàn)TimestampAssigner接口使用給定的TimestampAssigner。

參數(shù): timestamp_assigner 給定的TimestampAssigner。

Return: 包裝TimestampAssigner的WaterMarkStrategy。

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

??? with_timestamp_assigner(MyTimestampAssigner())

處理空閑數(shù)據(jù)源

如果數(shù)據(jù)源中的某一個分區(qū)/分片在一段時間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著 WatermarkGenerator 也不會獲得任何新數(shù)據(jù)去生成 watermark。我們稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時候就會出現(xiàn)問題。由于下游算子 watermark 的計算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會發(fā)生變化。

為了解決這個問題,你可以使用 WatermarkStrategy 來檢測空閑輸入并將其標(biāo)記為空閑狀態(tài)。WatermarkStrategy 為此提供了一個工具接口withIdleness(Duration.ofMinutes(1))

with_idleness(idle_timeout:pyfrink.common.time.Duration)

創(chuàng)建一個新的豐富的WatermarkStrategy,它也在創(chuàng)建的WatermarkGenerator中執(zhí)行空閑檢測。

參數(shù):idle_timeout–空閑超時。

Return:配置了空閑檢測的新水印策略。

算子處理 Watermark 的方式

一般情況下,在將 watermark 轉(zhuǎn)發(fā)到下游之前,需要算子對其進(jìn)行觸發(fā)的事件完全進(jìn)行處理。例如,WindowOperator 將首先計算該 watermark 觸發(fā)的所有窗口數(shù)據(jù),當(dāng)且僅當(dāng)由此 watermark 觸發(fā)計算進(jìn)而生成的所有數(shù)據(jù)被轉(zhuǎn)發(fā)到下游之后,其才會被發(fā)送到下游。換句話說,由于此 watermark 的出現(xiàn)而產(chǎn)生的所有數(shù)據(jù)元素都將在此 watermark 之前發(fā)出。

相同的規(guī)則也適用于 TwoInputstreamOperator。但是,在這種情況下,算子當(dāng)前的 watermark 會取其兩個輸入的最小值。

創(chuàng)建DataStream的方式

通過list對象創(chuàng)建
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment


env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(
??? collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
??? type_info=Types.ROW([Types.INT(), Types.STRING()]))
??????使用DataStream connectors創(chuàng)建

使用add_source函數(shù),此函數(shù)僅支持FlinkKafkaConsumer,僅在streaming執(zhí)行模式下使用

from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer


env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \
??? .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_consumer = FlinkKafkaConsumer(
??? topics='test_source_topic',
??? deserialization_schema=deserialization_schema,
??? properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

ds = env.add_source(kafka_consumer)

使用from_source函數(shù),此函數(shù)僅支持NumberSequenceSource和FileSource自定義數(shù)據(jù)源,僅在streaming執(zhí)行模式下使用

from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource

env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
ds = env.from_source(
??? source=seq_num_source,
??? watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
??? source_name='seq_num_source',
??? type_info=Types.LONG())
???????使用Table & SQL connectors創(chuàng)建

首先用Table & SQL connectors創(chuàng)建表,再轉(zhuǎn)換為DataStream.文章來源地址http://www.zghlxwxcb.cn/news/detail-678222.html

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

t_env.execute_sql("""
??????? CREATE TABLE my_source (
????????? a INT,
????????? b VARCHAR
??????? ) WITH (
????????? 'connector' = 'datagen',
????????? 'number-of-rows' = '10'
??????? )
??? """)

ds = t_env.to_append_stream(

??? t_env.from_path('my_source'),

??? Types.ROW([Types.INT(), Types.STRING()]))

到了這里,關(guān)于Flink流批一體計算(17):PyFlink DataStream API之StreamExecutionEnvironment的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink流批一體計算(12):PyFlink Tabel API之構(gòu)建作業(yè)

    目錄 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通過 DDL 語句來注冊源表和結(jié)果表 2. 創(chuàng)建一個作業(yè) 3. 提交作業(yè)Submitting PyFlink Jobs 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    瀏覽(21)
  • Flink流批一體計算(11):PyFlink Tabel API之TableEnvironment

    目錄 概述 設(shè)置重啟策略 什么是flink的重啟策略(Restartstrategy) flink的重啟策略(Restartstrategy)實戰(zhàn) flink的4種重啟策略 FixedDelayRestartstrategy(固定延時重啟策略) FailureRateRestartstrategy(故障率重啟策略) NoRestartstrategy(不重啟策略) 配置State Backends 以及 Checkpointing Checkpoint 啟用和配置

    2024年02月13日
    瀏覽(46)
  • Flink流批一體計算(20):DataStream API和Table API互轉(zhuǎn)

    目錄 舉個例子 連接器 下載連接器(connector)和格式(format)jar 包 依賴管理 ?如何使用連接器 舉個例子 StreamExecutionEnvironment 集成了DataStream API,通過額外的函數(shù)擴(kuò)展了TableEnvironment。 下面代碼演示兩種API如何互轉(zhuǎn) TableEnvironment 將采用StreamExecutionEnvironment所有的配置選項。 建

    2024年02月10日
    瀏覽(24)
  • Flink流批一體計算(14):PyFlink Tabel API之SQL查詢

    舉個例子 查詢 source 表,同時執(zhí)行計算 Table API 查詢 Table 對象有許多方法,可以用于進(jìn)行關(guān)系操作。 這些方法返回新的 Table 對象,表示對輸入 Table 應(yīng)用關(guān)系操作之后的結(jié)果。 這些關(guān)系操作可以由多個方法調(diào)用組成,例如 table.group_by(...).select(...)。 Table API 文檔描述了流和批

    2024年02月12日
    瀏覽(23)
  • Flink流批一體計算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 創(chuàng)建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用來: ·創(chuàng)建 Table ·將 Table 注冊成臨時表 ·執(zhí)行 SQL 查詢 ·注冊用戶自定義的 (標(biāo)量,表值,或者聚合) 函數(shù) ·配置作業(yè) ·管理 Python 依賴 ·提交作業(yè)執(zhí)行 創(chuàng)建 source 表 創(chuàng)建 sink

    2024年02月12日
    瀏覽(23)
  • Flink流批一體計算(15):PyFlink Tabel API之SQL寫入Sink

    目錄 舉個例子 寫入Sink的各種情況 1. 將結(jié)果數(shù)據(jù)收集到客戶端 2. 將結(jié)果數(shù)據(jù)轉(zhuǎn)換為Pandas DataFrame,并收集到客戶端 3. 將結(jié)果寫入到一張 Sink 表中 4. 將結(jié)果寫入多張 Sink 表中 舉個例子 將計算結(jié)果寫入給 sink 表 寫入Sink的各種情況 1. 將結(jié)果數(shù)據(jù)收集到客戶端 你可以使用 TableR

    2024年02月11日
    瀏覽(18)
  • 流批一體計算引擎-7-[Flink]的DataStream連接器

    流批一體計算引擎-7-[Flink]的DataStream連接器

    參考官方手冊DataStream Connectors 一、預(yù)定義的Source和Sink 一些比較基本的Source和Sink已經(jīng)內(nèi)置在Flink里。 1、預(yù)定義data sources支持從文件、目錄、socket,以及collections和iterators中讀取數(shù)據(jù)。 2、預(yù)定義data sinks支持把數(shù)據(jù)寫入文件、標(biāo)準(zhǔn)輸出(stdout)、標(biāo)準(zhǔn)錯誤輸出(stderr)和 sock

    2023年04月08日
    瀏覽(22)
  • Flink流批一體計算(1):流批一體和Flink概述

    Apache Flink應(yīng)運而生 數(shù)字化經(jīng)濟(jì)革命的浪潮正在顛覆性地改變著人類的工作方式和生活方式,數(shù)字化經(jīng)濟(jì)在全球經(jīng)濟(jì)增長中扮演著越來越重要的角色,以互聯(lián)網(wǎng)、云計算、大數(shù)據(jù)、物聯(lián)網(wǎng)、人工智能為代表的數(shù)字技術(shù)近幾年發(fā)展迅猛,數(shù)字技術(shù)與傳統(tǒng)產(chǎn)業(yè)的深度融合釋放出巨大

    2024年02月10日
    瀏覽(23)
  • flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)

    flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)

    前言:今天是學(xué)習(xí)flink的第四天啦!學(xué)習(xí)了物理分區(qū)的知識點,這一次學(xué)習(xí)了前4個簡單的物理分區(qū),稱之為簡單分區(qū)篇! Tips:我相信自己會越來會好的,明天攻克困難分區(qū)篇,加油! 3. 物理分區(qū) 3.1 Global Partitioner 該分區(qū)器會將所有的數(shù)據(jù)都發(fā)送到下游的某個算子實例(subta

    2024年02月19日
    瀏覽(23)
  • flink重溫筆記(五):Flink 流批一體 API 開發(fā)——物理分區(qū)(下)

    flink重溫筆記(五):Flink 流批一體 API 開發(fā)——物理分區(qū)(下)

    前言 :今天是學(xué)習(xí) flink 的第五天啦! 主要學(xué)習(xí)了物理分區(qū)較難理解的部分,在這個部分的三個分區(qū)的學(xué)習(xí)中, rescale partition 和 forward partition 其原理可以歸類 pointwise 模式,其他的 partition 其原理可以歸類 all_to_all 模式,而比較有趣的是 custom partitioning,這個可以進(jìn)行根據(jù)值

    2024年02月19日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包