目錄
StreamExecutionEnvironment
Watermark
watermark策略簡介
使用 Watermark 策略
內(nèi)置水印生成器
處理空閑數(shù)據(jù)源
算子處理 Watermark 的方式
創(chuàng)建DataStream的方式
通過list對象創(chuàng)建
??????使用DataStream connectors創(chuàng)建
使用Table & SQL connectors創(chuàng)建
StreamExecutionEnvironment
編寫一個 Flink Python DataStream API 程序,首先需要聲明一個執(zhí)行環(huán)境StreamExecutionEnvironment,這是流式程序執(zhí)行的上下文。
你將通過它來設(shè)置作業(yè)的屬性(例如默認(rèn)并發(fā)度、重啟策略等)、創(chuàng)建源、并最終觸發(fā)作業(yè)的執(zhí)行。
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
創(chuàng)建了 StreamExecutionEnvironment 之后,你可以使用它來聲明數(shù)據(jù)源。數(shù)據(jù)源從外部系統(tǒng)(如 Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取數(shù)據(jù)到 Flink 作業(yè)里。
為了簡單起見,本教程讀取文件作為數(shù)據(jù)源。
ds = env.from_source(
??? source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
?????????????????????? ????????????????????????input_path)
???????????????????? .process_static_file_set().build(),
??? watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
??? source_name="file_source"
)
Watermark
大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。
為了解決亂序數(shù)據(jù),flink引入watermark。引入watermark機(jī)制則會等待晚到的數(shù)據(jù)一段時間,等待時間到則觸發(fā)計算,如果數(shù)據(jù)延遲很大,通常也會被丟棄或者另外處理。
為了使用事件時間語義,Flink 應(yīng)用程序需要知道事件時間戳對應(yīng)的字段,意味著數(shù)據(jù)流中的每個元素都需要擁有可分配的事件時間戳。其通常通過使用 TimestampAssigner API 從元素中的某個字段去訪問/提取時間戳。
watermark策略簡介
時間戳的分配與 watermark 的生成是齊頭并進(jìn)的,其可以告訴 Flink 應(yīng)用程序事件時間的進(jìn)度。其可以通過指定 WatermarkGenerator 來配置 watermark 的生成方式。
使用 Flink API 時需要設(shè)置一個同時包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場景下構(gòu)建自己的 watermark 策略。
使用 Watermark 策略
WatermarkStrategy 可以在 Flink 應(yīng)用程序中的兩處使用,第一種是直接在數(shù)據(jù)源上使用,第二種是直接在非數(shù)據(jù)源的操作之后使用。
第一種方式相比會更好,因為數(shù)據(jù)源可以利用 watermark 生成邏輯中有關(guān)分片/分區(qū)(shards/partitions/splits)的信息。使用這種方式,數(shù)據(jù)源通??梢愿珳?zhǔn)地跟蹤 watermark,整體 watermark 生成將更精確。直接在源上指定 WatermarkStrategy 意味著你必須使用特定數(shù)據(jù)源接口。
僅當(dāng)無法直接在數(shù)據(jù)源上設(shè)置策略時,才應(yīng)該使用第二種方式(在任意轉(zhuǎn)換操作之后設(shè)置 WatermarkStrategy)
內(nèi)置水印生成器
水印策略定義了如何在流源中生成水印。WatermarkStrategy是生成水印的WatermarkGenerator和分配記錄內(nèi)部時間戳的TimestampAssigner的生成器/工廠。
BoundedOutOfOrderness(Duration),為創(chuàng)建WatermarkStrategy常見的內(nèi)置策略。
for_bound_out_of_ordernness(max_out_of_orderness:pyflink.common.time.Duration)為記錄無序的情況創(chuàng)建水印策略,但可以設(shè)置事件無序程度的上限。
無序綁定B意味著一旦遇到時間戳為T的事件,就不會再出現(xiàn)早于(T-B)的事件。
for_bound_out_of_ordernness(5)
for_mononous_timestamps()為時間戳單調(diào)遞增的情況創(chuàng)建水印策略。
水印是定期生成的,并嚴(yán)格遵循數(shù)據(jù)中的最新時間戳。該策略引入的延遲主要是生成水印的周期間隔。
WatermarkStrategy.for_monotonous_timestamps()
with_timestamp_assigner(timestamp_assigner:pyflink.common.watermark_strategy.TimestampAssigner)
創(chuàng)建一個新的WatermarkStrategy,該策略通過實現(xiàn)TimestampAssigner接口使用給定的TimestampAssigner。
參數(shù): timestamp_assigner 給定的TimestampAssigner。
Return: 包裝TimestampAssigner的WaterMarkStrategy。
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()
??? with_timestamp_assigner(MyTimestampAssigner())
處理空閑數(shù)據(jù)源
如果數(shù)據(jù)源中的某一個分區(qū)/分片在一段時間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著 WatermarkGenerator 也不會獲得任何新數(shù)據(jù)去生成 watermark。我們稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時候就會出現(xiàn)問題。由于下游算子 watermark 的計算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會發(fā)生變化。
為了解決這個問題,你可以使用 WatermarkStrategy 來檢測空閑輸入并將其標(biāo)記為空閑狀態(tài)。WatermarkStrategy 為此提供了一個工具接口withIdleness(Duration.ofMinutes(1))
with_idleness(idle_timeout:pyfrink.common.time.Duration)
創(chuàng)建一個新的豐富的WatermarkStrategy,它也在創(chuàng)建的WatermarkGenerator中執(zhí)行空閑檢測。
參數(shù):idle_timeout–空閑超時。
Return:配置了空閑檢測的新水印策略。
算子處理 Watermark 的方式
一般情況下,在將 watermark 轉(zhuǎn)發(fā)到下游之前,需要算子對其進(jìn)行觸發(fā)的事件完全進(jìn)行處理。例如,WindowOperator 將首先計算該 watermark 觸發(fā)的所有窗口數(shù)據(jù),當(dāng)且僅當(dāng)由此 watermark 觸發(fā)計算進(jìn)而生成的所有數(shù)據(jù)被轉(zhuǎn)發(fā)到下游之后,其才會被發(fā)送到下游。換句話說,由于此 watermark 的出現(xiàn)而產(chǎn)生的所有數(shù)據(jù)元素都將在此 watermark 之前發(fā)出。
相同的規(guī)則也適用于 TwoInputstreamOperator。但是,在這種情況下,算子當(dāng)前的 watermark 會取其兩個輸入的最小值。
創(chuàng)建DataStream的方式
通過list對象創(chuàng)建
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(
??? collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
??? type_info=Types.ROW([Types.INT(), Types.STRING()]))
??????使用DataStream connectors創(chuàng)建
使用add_source函數(shù),此函數(shù)僅支持FlinkKafkaConsumer,僅在streaming執(zhí)行模式下使用
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \
??? .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
kafka_consumer = FlinkKafkaConsumer(
??? topics='test_source_topic',
??? deserialization_schema=deserialization_schema,
??? properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
ds = env.add_source(kafka_consumer)
使用from_source函數(shù),此函數(shù)僅支持NumberSequenceSource和FileSource自定義數(shù)據(jù)源,僅在streaming執(zhí)行模式下使用文章來源:http://www.zghlxwxcb.cn/news/detail-678222.html
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
ds = env.from_source(
??? source=seq_num_source,
??? watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
??? source_name='seq_num_source',
??? type_info=Types.LONG())
???????使用Table & SQL connectors創(chuàng)建
首先用Table & SQL connectors創(chuàng)建表,再轉(zhuǎn)換為DataStream.文章來源地址http://www.zghlxwxcb.cn/news/detail-678222.html
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
??????? CREATE TABLE my_source (
????????? a INT,
????????? b VARCHAR
??????? ) WITH (
????????? 'connector' = 'datagen',
????????? 'number-of-rows' = '10'
??????? )
??? """)
ds = t_env.to_append_stream(
??? t_env.from_path('my_source'),
??? Types.ROW([Types.INT(), Types.STRING()]))
到了這里,關(guān)于Flink流批一體計算(17):PyFlink DataStream API之StreamExecutionEnvironment的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!