舉個例子
查詢 source 表,同時執(zhí)行計算
# 通過 Table API 創(chuàng)建一張表:
source_table = table_env.from_path("datagen")
# 或者通過 SQL 查詢語句創(chuàng)建一張表:
source_table = table_env.sql_query("SELECT * FROM datagen")
result_table = source_table.select(source_table.id + 1, source_table.data)
Table API 查詢
Table 對象有許多方法,可以用于進(jìn)行關(guān)系操作。
這些方法返回新的 Table 對象,表示對輸入 Table 應(yīng)用關(guān)系操作之后的結(jié)果。
這些關(guān)系操作可以由多個方法調(diào)用組成,例如 table.group_by(...).select(...)。
Table API 文檔描述了流和批處理上所有支持的 Table API 操作。
以下示例展示了一個簡單的 Table API 聚合查詢:
from pyflink.table import Environmentsettings, TableEnvironment
# 通過 batch table environment 來執(zhí)行查詢
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],['name', 'country', 'revenue'])
# 計算所有來自法國客戶的收入
revenue = orders \
??? .select(orders.name, orders.country, orders.revenue) \
??? .where(orders.country == 'FRANCE') \
??? .group_by(orders.name) \
??? .select(orders.name, orders.revenue.sum.alias('rev_sum'))
revenue.to_pandas()
Table API 也支持行操作的 API, 這些行操作包括 Map Operation, FlatMap Operation, Aggregate Operation 和 FlatAggregate Operation.
以下示例展示了一個簡單的 Table API 基于行操作的查詢
from pyflink.table import Environmentsettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd
# 通過 batch table environment 來執(zhí)行查詢
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)], ['name', 'country', 'revenue'])
map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),
??? result_type=DataTypes.ROW(
??????? [DataTypes.FIELD("name", DataTypes.STRING()),
??????? DataTypes.FIELD("revenue", DataTypes.BIGINT())]),
??? func_type="pandas")
orders.map(map_function).alias('name', 'revenue').to_pandas()
SQL 查詢
Flink 的 SQL 基于 Apache Calcite,它實現(xiàn)了標(biāo)準(zhǔn)的 SQL。SQL 查詢語句使用字符串來表達(dá)。SQL 支持Flink 對流和批處理。
下面示例展示了一個簡單的 SQL 聚合查詢:
from pyflink.table import Environmentsettings, TableEnvironment
# 通過 stream table environment 來執(zhí)行查詢
env_settings = Environmentsettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql("""
??? CREATE TABLE random_source (
??????? id BIGINT,
??????? data TINYINT
??? ) WITH (
??????? 'connector' = 'datagen',
??????? 'fields.id.kind'='sequence',
??????? 'fields.id.start'='1',
??????? 'fields.id.end'='8',
??????? 'fields.data.kind'='sequence',
??????? 'fields.data.start'='4',
??????? 'fields.data.end'='11'
??? )
""")
table_env.execute_sql("""
??? CREATE TABLE print_sink (
????? ??id BIGINT,
??????? data_sum TINYINT
??? ) WITH (
??????? 'connector' = 'print'
??? )
""")
table_env.execute_sql("""
??? INSERT INTO print_sink
??????? SELECT id, sum(data) as data_sum FROM
??????????? (SELECT id / 2 as id, data FROM random_source)
??????? WHERE id > 1
??????? GROUP BY id
""").wait()
Table API 和 SQL 的混合使用
Table API 中的 Table 對象和 SQL 中的 Table 可以自由地相互轉(zhuǎn)換。
下面例子展示了如何在 SQL 中使用 Table 對象:
create_temporary_view(view_path, table)? 將一個 `Table` 對象注冊為一張臨時表,類似于 SQL 的臨時表。
# 創(chuàng)建一張 sink 表來接收結(jié)果數(shù)據(jù)
table_env.execute_sql("""
??? CREATE TABLE table_sink (
??????? id BIGINT,
??????? data VARCHAR
??? ) WITH (
??????? 'connector' = 'print'
??? )
""")
# 將 Table API 表轉(zhuǎn)換成 SQL 中的視圖
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)
# 將 Table API 表的數(shù)據(jù)寫入結(jié)果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()
下面例子展示了如何在 Table API 中使用 SQL 表:
sql_query(query)?? 執(zhí)行一條 SQL 查詢,并將查詢的結(jié)果作為一個 `Table` 對象。
# 創(chuàng)建一張 SQL source 表
table_env.execute_sql("""
??? CREATE TABLE sql_source (
??????? id BIGINT,
??????? data TINYINT
??? ) WITH (
??????? 'connector' = 'datagen',
??????? 'fields.id.kind'='sequence',
? ??????'fields.id.start'='1',
??????? 'fields.id.end'='4',
??????? 'fields.data.kind'='sequence',
??????? 'fields.data.start'='4',
??????? 'fields.data.end'='7'
??? )
""")
# 將 SQL 表轉(zhuǎn)換成 Table API 表
table = table_env.from_path("sql_source")
# 或者通過 SQL 查詢語句創(chuàng)建表
table = table_env.sql_query("SELECT * FROM sql_source")
# 將表中的數(shù)據(jù)寫出
table.to_pandas()
優(yōu)化
數(shù)據(jù)傾斜
當(dāng)數(shù)據(jù)發(fā)生傾斜(某一部分?jǐn)?shù)據(jù)量特別大),雖然沒有GC(Gabage Collection,垃圾回收),但是task執(zhí)行時間嚴(yán)重不一致。
- 需要重新設(shè)計key,以更小粒度的key使得task大小合理化。
- 修改并行度。
- 調(diào)用rebalance操作,使數(shù)據(jù)分區(qū)均勻。
緩沖區(qū)超時設(shè)置
由于task在執(zhí)行過程中存在數(shù)據(jù)通過網(wǎng)絡(luò)進(jìn)行交換,數(shù)據(jù)在不同服務(wù)器之間傳遞的緩沖區(qū)超時時間可以通過setBufferTimeout進(jìn)行設(shè)置。
當(dāng)設(shè)置“setBufferTimeout(-1)”,會等待緩沖區(qū)滿之后才會刷新,使其達(dá)到最大吞吐量;當(dāng)設(shè)置“setBufferTimeout(0)”時,可以最小化延遲,數(shù)據(jù)一旦接收到就會刷新;當(dāng)設(shè)置“setBufferTimeout”大于0時,緩沖區(qū)會在該時間之后超時,然后進(jìn)行緩沖區(qū)的刷新。文章來源:http://www.zghlxwxcb.cn/news/detail-658866.html
示例可以參考如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-658866.html
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
到了這里,關(guān)于Flink流批一體計算(14):PyFlink Tabel API之SQL查詢的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!