国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink流批一體計算(14):PyFlink Tabel API之SQL查詢

這篇具有很好參考價值的文章主要介紹了Flink流批一體計算(14):PyFlink Tabel API之SQL查詢。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

舉個例子

查詢 source 表,同時執(zhí)行計算

# 通過 Table API 創(chuàng)建一張表:
source_table = table_env.from_path("datagen")
# 或者通過 SQL 查詢語句創(chuàng)建一張表:
source_table = table_env.sql_query("SELECT * FROM datagen")
result_table = source_table.select(source_table.id + 1, source_table.data)

Table API 查詢

Table 對象有許多方法,可以用于進(jìn)行關(guān)系操作。

這些方法返回新的 Table 對象,表示對輸入 Table 應(yīng)用關(guān)系操作之后的結(jié)果。

這些關(guān)系操作可以由多個方法調(diào)用組成,例如 table.group_by(...).select(...)。

Table API 文檔描述了流和批處理上所有支持的 Table API 操作。

以下示例展示了一個簡單的 Table API 聚合查詢:

from pyflink.table import Environmentsettings, TableEnvironment
# 通過 batch table environment 來執(zhí)行查詢
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],['name', 'country', 'revenue'])
# 計算所有來自法國客戶的收入
revenue = orders \
??? .select(orders.name, orders.country, orders.revenue) \
??? .where(orders.country == 'FRANCE') \
??? .group_by(orders.name) \
??? .select(orders.name, orders.revenue.sum.alias('rev_sum'))
revenue.to_pandas()

Table API 也支持行操作的 API, 這些行操作包括 Map Operation, FlatMap Operation, Aggregate Operation 和 FlatAggregate Operation.

以下示例展示了一個簡單的 Table API 基于行操作的查詢

from pyflink.table import Environmentsettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd

# 通過 batch table environment 來執(zhí)行查詢
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)], ['name', 'country', 'revenue'])
map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),
??? result_type=DataTypes.ROW(
??????? [DataTypes.FIELD("name", DataTypes.STRING()),
??????? DataTypes.FIELD("revenue", DataTypes.BIGINT())]),
??? func_type="pandas")
orders.map(map_function).alias('name', 'revenue').to_pandas()

SQL 查詢

Flink 的 SQL 基于 Apache Calcite,它實現(xiàn)了標(biāo)準(zhǔn)的 SQL。SQL 查詢語句使用字符串來表達(dá)。SQL 支持Flink 對流和批處理。

下面示例展示了一個簡單的 SQL 聚合查詢:

from pyflink.table import Environmentsettings, TableEnvironment

# 通過 stream table environment 來執(zhí)行查詢

env_settings = Environmentsettings.in_streaming_mode()

table_env = TableEnvironment.create(env_settings)

table_env.execute_sql("""

??? CREATE TABLE random_source (

??????? id BIGINT,

??????? data TINYINT

??? ) WITH (

??????? 'connector' = 'datagen',

??????? 'fields.id.kind'='sequence',

??????? 'fields.id.start'='1',

??????? 'fields.id.end'='8',

??????? 'fields.data.kind'='sequence',

??????? 'fields.data.start'='4',

??????? 'fields.data.end'='11'

??? )

""")

table_env.execute_sql("""

??? CREATE TABLE print_sink (

????? ??id BIGINT,

??????? data_sum TINYINT

??? ) WITH (

??????? 'connector' = 'print'

??? )

""")

table_env.execute_sql("""

??? INSERT INTO print_sink

??????? SELECT id, sum(data) as data_sum FROM

??????????? (SELECT id / 2 as id, data FROM random_source)

??????? WHERE id > 1

??????? GROUP BY id

""").wait()

Table API 和 SQL 的混合使用

Table API 中的 Table 對象和 SQL 中的 Table 可以自由地相互轉(zhuǎn)換。

下面例子展示了如何在 SQL 中使用 Table 對象:

create_temporary_view(view_path, table)? 將一個 `Table` 對象注冊為一張臨時表,類似于 SQL 的臨時表。

# 創(chuàng)建一張 sink 表來接收結(jié)果數(shù)據(jù)
table_env.execute_sql("""
??? CREATE TABLE table_sink (
??????? id BIGINT,
??????? data VARCHAR
??? ) WITH (
??????? 'connector' = 'print'
??? )
""")
# 將 Table API 表轉(zhuǎn)換成 SQL 中的視圖
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)
# 將 Table API 表的數(shù)據(jù)寫入結(jié)果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()

下面例子展示了如何在 Table API 中使用 SQL 表:

sql_query(query)?? 執(zhí)行一條 SQL 查詢,并將查詢的結(jié)果作為一個 `Table` 對象。

# 創(chuàng)建一張 SQL source 表
table_env.execute_sql("""
??? CREATE TABLE sql_source (
??????? id BIGINT,
??????? data TINYINT
??? ) WITH (
??????? 'connector' = 'datagen',
??????? 'fields.id.kind'='sequence',
? ??????'fields.id.start'='1',
??????? 'fields.id.end'='4',
??????? 'fields.data.kind'='sequence',
??????? 'fields.data.start'='4',
??????? 'fields.data.end'='7'
??? )
""")

# 將 SQL 表轉(zhuǎn)換成 Table API 表
table = table_env.from_path("sql_source")
# 或者通過 SQL 查詢語句創(chuàng)建表
table = table_env.sql_query("SELECT * FROM sql_source")
# 將表中的數(shù)據(jù)寫出
table.to_pandas()

優(yōu)化

數(shù)據(jù)傾斜

當(dāng)數(shù)據(jù)發(fā)生傾斜(某一部分?jǐn)?shù)據(jù)量特別大),雖然沒有GCGabage Collection,垃圾回收),但是task執(zhí)行時間嚴(yán)重不一致。

  • 需要重新設(shè)計key,以更小粒度的key使得task大小合理化。
  • 修改并行度。
  • 調(diào)用rebalance操作,使數(shù)據(jù)分區(qū)均勻。
緩沖區(qū)超時設(shè)置

由于task在執(zhí)行過程中存在數(shù)據(jù)通過網(wǎng)絡(luò)進(jìn)行交換,數(shù)據(jù)在不同服務(wù)器之間傳遞的緩沖區(qū)超時時間可以通過setBufferTimeout進(jìn)行設(shè)置。

當(dāng)設(shè)置“setBufferTimeout(-1)”,會等待緩沖區(qū)滿之后才會刷新,使其達(dá)到最大吞吐量;當(dāng)設(shè)置“setBufferTimeout(0)”時,可以最小化延遲,數(shù)據(jù)一旦接收到就會刷新;當(dāng)設(shè)置“setBufferTimeout”大于0時,緩沖區(qū)會在該時間之后超時,然后進(jìn)行緩沖區(qū)的刷新。

示例可以參考如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-658866.html

env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

到了這里,關(guān)于Flink流批一體計算(14):PyFlink Tabel API之SQL查詢的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink流批一體計算(11):PyFlink Tabel API之TableEnvironment

    目錄 概述 設(shè)置重啟策略 什么是flink的重啟策略(Restartstrategy) flink的重啟策略(Restartstrategy)實戰(zhàn) flink的4種重啟策略 FixedDelayRestartstrategy(固定延時重啟策略) FailureRateRestartstrategy(故障率重啟策略) NoRestartstrategy(不重啟策略) 配置State Backends 以及 Checkpointing Checkpoint 啟用和配置

    2024年02月13日
    瀏覽(46)
  • Flink流批一體計算(16):PyFlink DataStream API

    Flink流批一體計算(16):PyFlink DataStream API

    目錄 概述 Pipeline Dataflow 代碼示例WorldCount.py 執(zhí)行腳本W(wǎng)orldCount.py 概述 Apache Flink 提供了 DataStream API,用于構(gòu)建健壯的、有狀態(tài)的流式應(yīng)用程序。它提供了對狀態(tài)和時間細(xì)粒度控制,從而允許實現(xiàn)高級事件驅(qū)動系統(tǒng)。 用戶實現(xiàn)的Flink程序是由Stream和Transformation這兩個基本構(gòu)建塊組

    2024年02月11日
    瀏覽(25)
  • Flink流批一體計算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目錄 StreamExecutionEnvironment Watermark watermark策略簡介 使用 Watermark 策略 內(nèi)置水印生成器 處理空閑數(shù)據(jù)源 算子處理 Watermark 的方式 創(chuàng)建DataStream的方式 通過list對象創(chuàng)建 ??????使用DataStream connectors創(chuàng)建 使用Table SQL connectors創(chuàng)建 StreamExecutionEnvironment 編寫一個 Flink Python DataSt

    2024年02月11日
    瀏覽(54)
  • Flink流批一體計算(19):PyFlink DataStream API之State

    目錄 keyed state Keyed DataStream 使用 Keyed State 實現(xiàn)了一個簡單的計數(shù)窗口 狀態(tài)有效期 (TTL) 過期數(shù)據(jù)的清理 全量快照時進(jìn)行清理 增量數(shù)據(jù)清理 在 RocksDB 壓縮時清理 Operator State算子狀態(tài) Broadcast State廣播狀態(tài) keyed state Keyed DataStream 使用 keyed state,首先需要為DataStream指定 key(主鍵)

    2024年02月10日
    瀏覽(43)
  • Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    目錄 1. 在上節(jié)數(shù)據(jù)流上執(zhí)行轉(zhuǎn)換操作,或者使用 sink 將數(shù)據(jù)寫入外部系統(tǒng)。 2. File Sink File Sink Format Types? Row-encoded Formats? Bulk-encoded Formats? 桶分配 滾動策略 3. 如何輸出結(jié)果 Print 集合數(shù)據(jù)到客戶端,execute_and_collect方法將收集數(shù)據(jù)到客戶端內(nèi)存 將結(jié)果發(fā)送到DataStream sink conne

    2024年02月11日
    瀏覽(23)
  • Flink流批一體計算(20):DataStream API和Table API互轉(zhuǎn)

    目錄 舉個例子 連接器 下載連接器(connector)和格式(format)jar 包 依賴管理 ?如何使用連接器 舉個例子 StreamExecutionEnvironment 集成了DataStream API,通過額外的函數(shù)擴(kuò)展了TableEnvironment。 下面代碼演示兩種API如何互轉(zhuǎn) TableEnvironment 將采用StreamExecutionEnvironment所有的配置選項。 建

    2024年02月10日
    瀏覽(24)
  • Flink流批一體計算(1):流批一體和Flink概述

    Apache Flink應(yīng)運而生 數(shù)字化經(jīng)濟(jì)革命的浪潮正在顛覆性地改變著人類的工作方式和生活方式,數(shù)字化經(jīng)濟(jì)在全球經(jīng)濟(jì)增長中扮演著越來越重要的角色,以互聯(lián)網(wǎng)、云計算、大數(shù)據(jù)、物聯(lián)網(wǎng)、人工智能為代表的數(shù)字技術(shù)近幾年發(fā)展迅猛,數(shù)字技術(shù)與傳統(tǒng)產(chǎn)業(yè)的深度融合釋放出巨大

    2024年02月10日
    瀏覽(23)
  • flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)

    flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)

    前言:今天是學(xué)習(xí)flink的第四天啦!學(xué)習(xí)了物理分區(qū)的知識點,這一次學(xué)習(xí)了前4個簡單的物理分區(qū),稱之為簡單分區(qū)篇! Tips:我相信自己會越來會好的,明天攻克困難分區(qū)篇,加油! 3. 物理分區(qū) 3.1 Global Partitioner 該分區(qū)器會將所有的數(shù)據(jù)都發(fā)送到下游的某個算子實例(subta

    2024年02月19日
    瀏覽(23)
  • flink重溫筆記(五):Flink 流批一體 API 開發(fā)——物理分區(qū)(下)

    flink重溫筆記(五):Flink 流批一體 API 開發(fā)——物理分區(qū)(下)

    前言 :今天是學(xué)習(xí) flink 的第五天啦! 主要學(xué)習(xí)了物理分區(qū)較難理解的部分,在這個部分的三個分區(qū)的學(xué)習(xí)中, rescale partition 和 forward partition 其原理可以歸類 pointwise 模式,其他的 partition 其原理可以歸類 all_to_all 模式,而比較有趣的是 custom partitioning,這個可以進(jìn)行根據(jù)值

    2024年02月19日
    瀏覽(21)
  • Flink流批一體計算(7):Flink優(yōu)化

    目錄 配置內(nèi)存 設(shè)置并行度 操作場景 具體設(shè)置 補充 配置進(jìn)程參數(shù) 操作場景 具體配置 配置netty網(wǎng)絡(luò)通信 操作場景 具體配置 配置內(nèi)存 Flink 是依賴內(nèi)存計算,計算過程中內(nèi)存不夠?qū)?Flink 的執(zhí)行效率影響很大??梢酝ㄟ^監(jiān)控 GC ( Garbage Collection ),評估內(nèi)存使用及剩余情況來判

    2024年02月12日
    瀏覽(54)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包