国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink系列Table API和SQL之:表和流的轉(zhuǎn)換

這篇具有很好參考價值的文章主要介紹了Flink系列Table API和SQL之:表和流的轉(zhuǎn)換。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、表和流的轉(zhuǎn)換

  • 從創(chuàng)建表環(huán)境開始,歷經(jīng)表的創(chuàng)建、查詢轉(zhuǎn)換和輸出,已經(jīng)可以使用Table API和SQL進行完整的流處理了。不過在應用的開發(fā)過程中,我們測試業(yè)務(wù)邏輯一般不會直接將結(jié)果直接寫入到外部系統(tǒng),而是在本地控制臺打印輸出。對于DataStream非常容易,直接調(diào)用print()方法就可以看到結(jié)果數(shù)據(jù)流的內(nèi)容了。但對于Table就比較悲劇,沒有提供print()方法。
  • 在Flink中可以將Table再轉(zhuǎn)換成DataStream,然后進行打印輸出。這就涉及了表和流的轉(zhuǎn)換

二、將表(Table)轉(zhuǎn)換成流(DataStream)

調(diào)用toDataStream()方法

  • 將一個Table對象轉(zhuǎn)換成DataStream非常簡單,只要直接調(diào)用表環(huán)境的方法toDataStream()就可以了。
Table aliceVisitTable = tableEnv.sqlQuery(
	"SELECT user,url " +
	"FROM EventTable " +
	"WHERE user = 'Alice' "
);

將表轉(zhuǎn)換成數(shù)據(jù)流,這里需要將要轉(zhuǎn)換的Table對象作為參數(shù)傳入。

tableEnv.toDataStream(aliceVisitTable).print();

調(diào)用toChangelogStream()方法

tableEnv.createTemporaryView("clickTable",eventTable);
Table aggResult = tableEnv.sqlQuery("select user,COUNT(url) as cnt from clickTable group by user");

tableEnv.toChangelogStream(aggResult).print("agg");

三、將流轉(zhuǎn)換成表

調(diào)用fromDataStream()方法

  • 想要將一個DataStream轉(zhuǎn)換成表也很簡單,可以通過調(diào)用表環(huán)境的fromDataStream()方法來實現(xiàn),返回的就是一個Table對象。例如,可以直接將事件流eventStream轉(zhuǎn)換成一個表。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

獲取表環(huán)境

        //創(chuàng)建表執(zhí)行環(huán)境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

讀取數(shù)據(jù)源

SingleOutputStreamOperator<Event> eventStream = env.addSource(...)

將數(shù)據(jù)流轉(zhuǎn)換成表

Table eventTable = tableEnv.fromDataStream(eventStream);

由于流中的數(shù)據(jù)本身就是定義好的POJO類型Event,所以我們將流轉(zhuǎn)換成表之后,每一行數(shù)據(jù)就對應著一個Event,而表中的列名就對應著Event中的屬性。

另外,還可以在fromDataStream()方法中增加參數(shù),用來指定提取哪些屬性作為表中的字段名,并可以任意指定位置。

提取Event中的timestamp和url作為表中的列

Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp"),$("url"));

需要注意的是,timestamp本身是SQL中的關(guān)鍵字,所以我們在定義表名、列名時要盡量避免。這時可以通過表達式的as()方法對字段進行重命名。

Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp").as("ts"),$("url"));

調(diào)用createTemporaryView()方法

  • 調(diào)用fromDataStream()方法簡單直觀,可以直接實現(xiàn)DataStream到Table的轉(zhuǎn)換。不過如果希望直接在SQL中引用這張表,就還需要調(diào)用表環(huán)境的createTemporaryView()方法來創(chuàng)建虛擬視圖。
  • 對于這種場景,更簡潔的調(diào)用方式,可以直接調(diào)用createTemporaryView()方法創(chuàng)建虛擬表,傳入的兩個參數(shù),第一個依然是注冊的表名,而第二個可以直接就是DataStream。之后可以傳入多個參數(shù),用來指定表中的字段:
tableEnv.createTemporaryView("EventTable",eventStream,$("timestamp").as("ts"),$("url"));

這樣接下來就可以直接在SQL中引用表EventTable了。

調(diào)用fromChangelogStream()方法
表環(huán)境還提供了一個方法fromChangelogStream(),可以將一個更新日志流轉(zhuǎn)換成表。這個方法要求流中的數(shù)據(jù)類型只能是Row,而且每一個數(shù)據(jù)都需要指定當前航的更新類型(RowKind)。所以一般是由連接器幫我們實現(xiàn)的。

四、支持的數(shù)據(jù)類型

  • DataStream,流中的數(shù)據(jù)類型都是定義好的POJO類。如果DataStream中的類型是簡單的基本類型,還可以直接轉(zhuǎn)換成表么?這就涉及了Table中支持的數(shù)據(jù)類型。
  • 整體來看,DataStream中支持的數(shù)據(jù)類型,Table中也都是支持的,只不過在進行轉(zhuǎn)換時需要注意一些細節(jié)。

原子類型:

  • 在Flink中,基礎(chǔ)數(shù)據(jù)類型(Integer、Double、String)和通用數(shù)據(jù)類型(也就是不可再拆分的數(shù)據(jù)類型)統(tǒng)一稱做原子類型。原子類型的DataStream,轉(zhuǎn)換之后就成了只有一列的Table,列字段(field)的數(shù)據(jù)類型可以由原子類型推斷出。另外,還可以在fromDataStream()方法里增加參數(shù),用來重新命名列字段。
StreamTableEnvironment tableEnv = ...;

DataStream<Long> stream = ...;

將數(shù)據(jù)流轉(zhuǎn)換成動態(tài)表,動態(tài)表只有一個字段,重命名為myLong

Table table = tableEnv.fromDataStream(stream,$("myLong"));

Tuple類型

  • 當原子類型不做重命名時,默認的字段名就是"f0",容易想到,其實就是將原子類型看做了一元組Tuple1的處理結(jié)果。
  • Table支持Flink中定義的元組類型Tuple,對應在表中字段名默認就是元祖中元素的屬性名f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段還可以通過調(diào)用表達式的as()方法來進行重命名。
StreamTableEnvironment tableEnv = ...;

DataStream<Tuple2<Long,Integer>> stream = ... ;

將數(shù)據(jù)流轉(zhuǎn)換成只包含f1字段的表

Table table = tableEnv.fromDataStream(stream,$("f1"));

將數(shù)據(jù)流轉(zhuǎn)換成包含f0和f1字段的表,在表中f0和f1位置交換

Table table = tableEnv.fromDataStream(stream,$("f1"),$("f0"));

將f1字段命名為myInt,f0命名為myLong

Table table = tableEnv.fromDataStream(stream,$("f1").as("myInt"),$("f0").as("myLong"));

Row類型

  • Flink中還定義了一個在關(guān)系型表中更加通用的數(shù)據(jù)類型——行(Row),它是Table中數(shù)據(jù)的基本組織形式。Row類型也是一種復合類型,它的長度固定,而且無法直接推斷出每個字段的類型,所以在使用時必須指明具體的類型信息。在創(chuàng)建Table時調(diào)用的CREATE語句就會將所有的字段名稱和類型指定,這在Flink中被稱為表的模式結(jié)構(gòu)(Schema)。除此之外,Row類型還附加了一個屬性RowKind,用來表示當前行在更新操作中的類型。這樣,Row就可以用來表示更新日志流(changelog stream)中的數(shù)據(jù),從而架起了Flink中流和表的轉(zhuǎn)換橋梁。
  • 所以在更新日志流中,元素的類型必須是Row,而且需要調(diào)用ofKind(0方法來指定更新類型。下面是一個具體的例子:
DataStream<Row> dataStream = env.fromElements(
	Row.ofKind(RowKind.INSERT,"Alice",12),
	Row.ofKind(RowKind.INSERT,"Bob",5),
	Row.ofKind(RowKind.UPDATE_BEFORE,"Alice",12),
	Row.ofKind(RowKind.UPDATE_AFTER,"Alice",100)
);

將更新日志流轉(zhuǎn)換為表文章來源地址http://www.zghlxwxcb.cn/news/detail-400803.html

Table table = tableEnv.fromChangelogStream(dataStream);

到了這里,關(guān)于Flink系列Table API和SQL之:表和流的轉(zhuǎn)換的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 22、Flink 的table api與sql之創(chuàng)建表的DDL

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月12日
    瀏覽(24)
  • Flink系列Table API和SQL之:滾動窗口、滑動窗口、累計窗口、分組聚合

    Flink系列Table API和SQL之:滾動窗口、滑動窗口、累計窗口、分組聚合

    有了時間屬性,接下來就可以定義窗口進行計算了。窗口可以將無界流切割成大小有限的桶(bucket)來做計算,通過截取有限數(shù)據(jù)集來處理無限的流數(shù)據(jù)。在DataStream API中提供了對不同類型的窗口進行定義和處理的接口,而在Table API和SQL中,類似的功能也都可以實現(xiàn)。 在Flink 1

    2023年04月27日
    瀏覽(23)
  • Flink Table API 與 SQL 編程整理

    Flink Table API 與 SQL 編程整理

    Flink API 總共分為 4 層這里主要整理 Table API 的使用 Table API 是流處理和批處理通用的關(guān)系型 API , Table API 可以基于流輸入或者批輸入來運行而不需要進行任何修改。 Table API 是 SQL 語言的超集并專門為 Apache Flink 設(shè)計的, Table API 是 Scala 和 Java 語言集成式的 API 。與常規(guī) SQL 語言

    2024年02月04日
    瀏覽(25)
  • Flink Table API/SQL 多分支sink

    在某個場景中,需要從Kafka中獲取數(shù)據(jù),經(jīng)過轉(zhuǎn)換處理后,需要同時sink到多個輸出源中(kafka、mysql、hologres)等。兩次調(diào)用execute, 阿里云Flink vvr引擎報錯: 使用 StreamStatementSet. 具體參考官網(wǎng): https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-betwe

    2024年02月11日
    瀏覽(23)
  • 【Flink SQL】Flink SQL 基礎(chǔ)概念(一):SQL & Table 運行環(huán)境、基本概念及常用 API

    《 Flink SQL 基礎(chǔ)概念 》系列,共包含以下 5 篇文章: Flink SQL 基礎(chǔ)概念(一):SQL Table 運行環(huán)境、基本概念及常用 API Flink SQL 基礎(chǔ)概念(二):數(shù)據(jù)類型 Flink SQL 基礎(chǔ)概念(三):SQL 動態(tài)表 連續(xù)查詢 Flink SQL 基礎(chǔ)概念(四):SQL 的時間屬性 Flink SQL 基礎(chǔ)概念(五):SQL 時區(qū)問

    2024年03月21日
    瀏覽(99)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    ? ? ? ?今天一天爭取搞完最后這一部分,學完趕緊把 Kafka 和 Flume 學完,就要開始做實時數(shù)倉了。據(jù)說是應屆生得把實時數(shù)倉搞個 80%~90% 才能差不多找個工作,太牛馬了。 ????????之前我們已經(jīng)用過了一些簡單的內(nèi)置連接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官網(wǎng):

    2024年01月24日
    瀏覽(52)
  • 《十堂課學習 Flink》第五章:Table API 以及 Flink SQL 入門

    《十堂課學習 Flink》第五章:Table API 以及 Flink SQL 入門

    第四章中介紹了 DataStream API 以及 DataSet API 的入門案例,本章開始介紹 Table API 以及基于此的高層應用 Flink SQL 的基礎(chǔ)。 Flink 提供了兩個關(guān)系A(chǔ)PI——Table API 和 SQL——用于統(tǒng)一的流和批處理。Table API 是一種針對Java、Scala和Python的語言集成查詢API,它允許以非常直觀的方式組合來

    2024年02月03日
    瀏覽(48)
  • Flink(十三)Flink 的table api與sql的基本概念、通用api介紹及入門示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月15日
    瀏覽(23)
  • Flink SQL和Table API實現(xiàn)消費kafka寫入mysql

    Flink SQL和Table API實現(xiàn)消費kafka寫入mysql

    1、構(gòu)建 table環(huán)境 2、構(gòu)建source kafka 方式一:API 方式二:Flink SQL 3、構(gòu)建sink mysql? 4、寫入將source表寫入sink表 方式一:API 方式二:Flink SQL 5、手動執(zhí)行 6、測試 (1)連接kafka生產(chǎn)者 (2)造數(shù)據(jù) (3)mysql查看入庫情況

    2024年01月16日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包