国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink流批一體計(jì)算(15):PyFlink Tabel API之SQL寫入Sink

這篇具有很好參考價(jià)值的文章主要介紹了Flink流批一體計(jì)算(15):PyFlink Tabel API之SQL寫入Sink。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

舉個例子

寫入Sink的各種情況

1. 將結(jié)果數(shù)據(jù)收集到客戶端

2. 將結(jié)果數(shù)據(jù)轉(zhuǎn)換為Pandas DataFrame,并收集到客戶端

3. 將結(jié)果寫入到一張 Sink 表中

4. 將結(jié)果寫入多張 Sink 表中


舉個例子

將計(jì)算結(jié)果寫入給 sink 表

#將Table API結(jié)果表數(shù)據(jù)寫入sink表:
result_table.execute_insert("print").wait()
# 或者通過SQL查詢語句來寫入sink表:
table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()

寫入Sink的各種情況

1. 將結(jié)果數(shù)據(jù)收集到客戶端

你可以使用 TableResult.collect 將 Table 的結(jié)果收集到客戶端,結(jié)果的類型為迭代器類型。

以下代碼展示了如何使用 TableResult.collect() 方法:

#準(zhǔn)備source表
source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
#得到TableResult
res = table_env.execute_sql("select a + 1, b, c from %s" % source)
#遍歷結(jié)果
with res.collect() as results:
?? for result in results:
?????? print(result)
2. 將結(jié)果數(shù)據(jù)轉(zhuǎn)換為Pandas DataFrame,并收集到客戶端
3. 將結(jié)果寫入到一張 Sink 表中

你可以調(diào)用 execute_insert 方法來將 Table 對象中的數(shù)據(jù)寫入到一張 sink 表中:

table_env.execute_sql("""
??? CREATE TABLE sink_table (
??????? id BIGINT,
??????? data VARCHAR
??? ) WITH (
??????? 'connector' = 'print'
??? )
""")
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table.execute_insert("sink_table").wait()

也可以通過 SQL 來完成

table_env.create_temporary_view("table_source", table)
table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()
4. 將結(jié)果寫入多張 Sink 表中

你也可以使用 Statementset 在一個作業(yè)中將 Table 中的數(shù)據(jù)寫入到多張 sink 表中:

create_statement_set() 創(chuàng)建一個可接受 DML 語句或表的 Statementset 實(shí)例。 它可用于執(zhí)行包含多個 sink 的作業(yè)。文章來源地址http://www.zghlxwxcb.cn/news/detail-670650.html

# 準(zhǔn)備 source 表和 sink 表
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""
??? CREATE TABLE first_sink_table (
??????? id BIGINT,
??????? data VARCHAR
??? ) WITH (
??????? 'connector' = 'print'
??? )
""")
table_env.execute_sql("""
??? CREATE TABLE second_sink_table (
??????? id BIGINT,
??????? data VARCHAR
??? ) WITH (
??????? 'connector' = 'print'
??? )
""")
# 創(chuàng)建 statement set
statement_set = table_env.create_statement_set()
# 將 "table" 的數(shù)據(jù)寫入 "first_sink_table"
statement_set.add_insert("first_sink_table", table)
# 通過一條 sql 插入語句將數(shù)據(jù)從 "simple_source" 寫入到 "second_sink_table"
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
# 執(zhí)行 statement set
statement_set.execute().wait()

到了這里,關(guān)于Flink流批一體計(jì)算(15):PyFlink Tabel API之SQL寫入Sink的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink流批一體計(jì)算(12):PyFlink Tabel API之構(gòu)建作業(yè)

    目錄 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通過 DDL 語句來注冊源表和結(jié)果表 2. 創(chuàng)建一個作業(yè) 3. 提交作業(yè)Submitting PyFlink Jobs 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    瀏覽(21)
  • Flink流批一體計(jì)算(16):PyFlink DataStream API

    Flink流批一體計(jì)算(16):PyFlink DataStream API

    目錄 概述 Pipeline Dataflow 代碼示例WorldCount.py 執(zhí)行腳本W(wǎng)orldCount.py 概述 Apache Flink 提供了 DataStream API,用于構(gòu)建健壯的、有狀態(tài)的流式應(yīng)用程序。它提供了對狀態(tài)和時間細(xì)粒度控制,從而允許實(shí)現(xiàn)高級事件驅(qū)動系統(tǒng)。 用戶實(shí)現(xiàn)的Flink程序是由Stream和Transformation這兩個基本構(gòu)建塊組

    2024年02月11日
    瀏覽(25)
  • Flink流批一體計(jì)算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目錄 StreamExecutionEnvironment Watermark watermark策略簡介 使用 Watermark 策略 內(nèi)置水印生成器 處理空閑數(shù)據(jù)源 算子處理 Watermark 的方式 創(chuàng)建DataStream的方式 通過list對象創(chuàng)建 ??????使用DataStream connectors創(chuàng)建 使用Table SQL connectors創(chuàng)建 StreamExecutionEnvironment 編寫一個 Flink Python DataSt

    2024年02月11日
    瀏覽(55)
  • Flink流批一體計(jì)算(19):PyFlink DataStream API之State

    目錄 keyed state Keyed DataStream 使用 Keyed State 實(shí)現(xiàn)了一個簡單的計(jì)數(shù)窗口 狀態(tài)有效期 (TTL) 過期數(shù)據(jù)的清理 全量快照時進(jìn)行清理 增量數(shù)據(jù)清理 在 RocksDB 壓縮時清理 Operator State算子狀態(tài) Broadcast State廣播狀態(tài) keyed state Keyed DataStream 使用 keyed state,首先需要為DataStream指定 key(主鍵)

    2024年02月10日
    瀏覽(43)
  • Flink流批一體計(jì)算(18):PyFlink DataStream API之計(jì)算和Sink

    Flink流批一體計(jì)算(18):PyFlink DataStream API之計(jì)算和Sink

    目錄 1. 在上節(jié)數(shù)據(jù)流上執(zhí)行轉(zhuǎn)換操作,或者使用 sink 將數(shù)據(jù)寫入外部系統(tǒng)。 2. File Sink File Sink Format Types? Row-encoded Formats? Bulk-encoded Formats? 桶分配 滾動策略 3. 如何輸出結(jié)果 Print 集合數(shù)據(jù)到客戶端,execute_and_collect方法將收集數(shù)據(jù)到客戶端內(nèi)存 將結(jié)果發(fā)送到DataStream sink conne

    2024年02月11日
    瀏覽(23)
  • Flink流批一體計(jì)算(20):DataStream API和Table API互轉(zhuǎn)

    目錄 舉個例子 連接器 下載連接器(connector)和格式(format)jar 包 依賴管理 ?如何使用連接器 舉個例子 StreamExecutionEnvironment 集成了DataStream API,通過額外的函數(shù)擴(kuò)展了TableEnvironment。 下面代碼演示兩種API如何互轉(zhuǎn) TableEnvironment 將采用StreamExecutionEnvironment所有的配置選項(xiàng)。 建

    2024年02月10日
    瀏覽(24)
  • Flink流批一體計(jì)算(1):流批一體和Flink概述

    Apache Flink應(yīng)運(yùn)而生 數(shù)字化經(jīng)濟(jì)革命的浪潮正在顛覆性地改變著人類的工作方式和生活方式,數(shù)字化經(jīng)濟(jì)在全球經(jīng)濟(jì)增長中扮演著越來越重要的角色,以互聯(lián)網(wǎng)、云計(jì)算、大數(shù)據(jù)、物聯(lián)網(wǎng)、人工智能為代表的數(shù)字技術(shù)近幾年發(fā)展迅猛,數(shù)字技術(shù)與傳統(tǒng)產(chǎn)業(yè)的深度融合釋放出巨大

    2024年02月10日
    瀏覽(23)
  • flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)

    flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)

    前言:今天是學(xué)習(xí)flink的第四天啦!學(xué)習(xí)了物理分區(qū)的知識點(diǎn),這一次學(xué)習(xí)了前4個簡單的物理分區(qū),稱之為簡單分區(qū)篇! Tips:我相信自己會越來會好的,明天攻克困難分區(qū)篇,加油! 3. 物理分區(qū) 3.1 Global Partitioner 該分區(qū)器會將所有的數(shù)據(jù)都發(fā)送到下游的某個算子實(shí)例(subta

    2024年02月19日
    瀏覽(23)
  • flink重溫筆記(五):Flink 流批一體 API 開發(fā)——物理分區(qū)(下)

    flink重溫筆記(五):Flink 流批一體 API 開發(fā)——物理分區(qū)(下)

    前言 :今天是學(xué)習(xí) flink 的第五天啦! 主要學(xué)習(xí)了物理分區(qū)較難理解的部分,在這個部分的三個分區(qū)的學(xué)習(xí)中, rescale partition 和 forward partition 其原理可以歸類 pointwise 模式,其他的 partition 其原理可以歸類 all_to_all 模式,而比較有趣的是 custom partitioning,這個可以進(jìn)行根據(jù)值

    2024年02月19日
    瀏覽(21)
  • Flink流批一體計(jì)算(7):Flink優(yōu)化

    目錄 配置內(nèi)存 設(shè)置并行度 操作場景 具體設(shè)置 補(bǔ)充 配置進(jìn)程參數(shù) 操作場景 具體配置 配置netty網(wǎng)絡(luò)通信 操作場景 具體配置 配置內(nèi)存 Flink 是依賴內(nèi)存計(jì)算,計(jì)算過程中內(nèi)存不夠?qū)?Flink 的執(zhí)行效率影響很大??梢酝ㄟ^監(jiān)控 GC ( Garbage Collection ),評估內(nèi)存使用及剩余情況來判

    2024年02月12日
    瀏覽(54)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包