目錄
舉個例子
寫入Sink的各種情況
1. 將結(jié)果數(shù)據(jù)收集到客戶端
2. 將結(jié)果數(shù)據(jù)轉(zhuǎn)換為Pandas DataFrame,并收集到客戶端
3. 將結(jié)果寫入到一張 Sink 表中
4. 將結(jié)果寫入多張 Sink 表中
舉個例子
將計(jì)算結(jié)果寫入給 sink 表
#將Table API結(jié)果表數(shù)據(jù)寫入sink表:
result_table.execute_insert("print").wait()
# 或者通過SQL查詢語句來寫入sink表:
table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()
寫入Sink的各種情況
1. 將結(jié)果數(shù)據(jù)收集到客戶端
你可以使用 TableResult.collect 將 Table 的結(jié)果收集到客戶端,結(jié)果的類型為迭代器類型。
以下代碼展示了如何使用 TableResult.collect() 方法:
#準(zhǔn)備source表
source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
#得到TableResult
res = table_env.execute_sql("select a + 1, b, c from %s" % source)
#遍歷結(jié)果
with res.collect() as results:
?? for result in results:
?????? print(result)
2. 將結(jié)果數(shù)據(jù)轉(zhuǎn)換為Pandas DataFrame,并收集到客戶端
3. 將結(jié)果寫入到一張 Sink 表中
你可以調(diào)用 execute_insert 方法來將 Table 對象中的數(shù)據(jù)寫入到一張 sink 表中:
table_env.execute_sql("""
??? CREATE TABLE sink_table (
??????? id BIGINT,
??????? data VARCHAR
??? ) WITH (
??????? 'connector' = 'print'
??? )
""")
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table.execute_insert("sink_table").wait()
也可以通過 SQL 來完成
table_env.create_temporary_view("table_source", table)
table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()
4. 將結(jié)果寫入多張 Sink 表中
你也可以使用 Statementset 在一個作業(yè)中將 Table 中的數(shù)據(jù)寫入到多張 sink 表中:文章來源:http://www.zghlxwxcb.cn/news/detail-670650.html
create_statement_set() 創(chuàng)建一個可接受 DML 語句或表的 Statementset 實(shí)例。 它可用于執(zhí)行包含多個 sink 的作業(yè)。文章來源地址http://www.zghlxwxcb.cn/news/detail-670650.html
# 準(zhǔn)備 source 表和 sink 表
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""
??? CREATE TABLE first_sink_table (
??????? id BIGINT,
??????? data VARCHAR
??? ) WITH (
??????? 'connector' = 'print'
??? )
""")
table_env.execute_sql("""
??? CREATE TABLE second_sink_table (
??????? id BIGINT,
??????? data VARCHAR
??? ) WITH (
??????? 'connector' = 'print'
??? )
""")
# 創(chuàng)建 statement set
statement_set = table_env.create_statement_set()
# 將 "table" 的數(shù)據(jù)寫入 "first_sink_table"
statement_set.add_insert("first_sink_table", table)
# 通過一條 sql 插入語句將數(shù)據(jù)從 "simple_source" 寫入到 "second_sink_table"
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
# 執(zhí)行 statement set
statement_set.execute().wait()
到了這里,關(guān)于Flink流批一體計(jì)算(15):PyFlink Tabel API之SQL寫入Sink的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!