一、表和流的轉(zhuǎn)換
- 從創(chuàng)建表環(huán)境開始,歷經(jīng)表的創(chuàng)建、查詢轉(zhuǎn)換和輸出,已經(jīng)可以使用Table API和SQL進行完整的流處理了。不過在應用的開發(fā)過程中,我們測試業(yè)務(wù)邏輯一般不會直接將結(jié)果直接寫入到外部系統(tǒng),而是在本地控制臺打印輸出。對于DataStream非常容易,直接調(diào)用print()方法就可以看到結(jié)果數(shù)據(jù)流的內(nèi)容了。但對于Table就比較悲劇,沒有提供print()方法。
- 在Flink中可以將Table再轉(zhuǎn)換成DataStream,然后進行打印輸出。這就涉及了表和流的轉(zhuǎn)換
二、將表(Table)轉(zhuǎn)換成流(DataStream)
調(diào)用toDataStream()方法
- 將一個Table對象轉(zhuǎn)換成DataStream非常簡單,只要直接調(diào)用表環(huán)境的方法toDataStream()就可以了。
Table aliceVisitTable = tableEnv.sqlQuery(
"SELECT user,url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
將表轉(zhuǎn)換成數(shù)據(jù)流,這里需要將要轉(zhuǎn)換的Table對象作為參數(shù)傳入。
tableEnv.toDataStream(aliceVisitTable).print();
調(diào)用toChangelogStream()方法
tableEnv.createTemporaryView("clickTable",eventTable);
Table aggResult = tableEnv.sqlQuery("select user,COUNT(url) as cnt from clickTable group by user");
tableEnv.toChangelogStream(aggResult).print("agg");
三、將流轉(zhuǎn)換成表
調(diào)用fromDataStream()方法
- 想要將一個DataStream轉(zhuǎn)換成表也很簡單,可以通過調(diào)用表環(huán)境的fromDataStream()方法來實現(xiàn),返回的就是一個Table對象。例如,可以直接將事件流eventStream轉(zhuǎn)換成一個表。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
獲取表環(huán)境
//創(chuàng)建表執(zhí)行環(huán)境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
讀取數(shù)據(jù)源
SingleOutputStreamOperator<Event> eventStream = env.addSource(...)
將數(shù)據(jù)流轉(zhuǎn)換成表
Table eventTable = tableEnv.fromDataStream(eventStream);
由于流中的數(shù)據(jù)本身就是定義好的POJO類型Event,所以我們將流轉(zhuǎn)換成表之后,每一行數(shù)據(jù)就對應著一個Event,而表中的列名就對應著Event中的屬性。
另外,還可以在fromDataStream()方法中增加參數(shù),用來指定提取哪些屬性作為表中的字段名,并可以任意指定位置。
提取Event中的timestamp和url作為表中的列
Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp"),$("url"));
需要注意的是,timestamp本身是SQL中的關(guān)鍵字,所以我們在定義表名、列名時要盡量避免。這時可以通過表達式的as()方法對字段進行重命名。
Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp").as("ts"),$("url"));
調(diào)用createTemporaryView()方法
- 調(diào)用fromDataStream()方法簡單直觀,可以直接實現(xiàn)DataStream到Table的轉(zhuǎn)換。不過如果希望直接在SQL中引用這張表,就還需要調(diào)用表環(huán)境的createTemporaryView()方法來創(chuàng)建虛擬視圖。
- 對于這種場景,更簡潔的調(diào)用方式,可以直接調(diào)用createTemporaryView()方法創(chuàng)建虛擬表,傳入的兩個參數(shù),第一個依然是注冊的表名,而第二個可以直接就是DataStream。之后可以傳入多個參數(shù),用來指定表中的字段:
tableEnv.createTemporaryView("EventTable",eventStream,$("timestamp").as("ts"),$("url"));
這樣接下來就可以直接在SQL中引用表EventTable了。
調(diào)用fromChangelogStream()方法
表環(huán)境還提供了一個方法fromChangelogStream(),可以將一個更新日志流轉(zhuǎn)換成表。這個方法要求流中的數(shù)據(jù)類型只能是Row,而且每一個數(shù)據(jù)都需要指定當前航的更新類型(RowKind)。所以一般是由連接器幫我們實現(xiàn)的。
四、支持的數(shù)據(jù)類型
- DataStream,流中的數(shù)據(jù)類型都是定義好的POJO類。如果DataStream中的類型是簡單的基本類型,還可以直接轉(zhuǎn)換成表么?這就涉及了Table中支持的數(shù)據(jù)類型。
- 整體來看,DataStream中支持的數(shù)據(jù)類型,Table中也都是支持的,只不過在進行轉(zhuǎn)換時需要注意一些細節(jié)。
原子類型:
- 在Flink中,基礎(chǔ)數(shù)據(jù)類型(Integer、Double、String)和通用數(shù)據(jù)類型(也就是不可再拆分的數(shù)據(jù)類型)統(tǒng)一稱做原子類型。原子類型的DataStream,轉(zhuǎn)換之后就成了只有一列的Table,列字段(field)的數(shù)據(jù)類型可以由原子類型推斷出。另外,還可以在fromDataStream()方法里增加參數(shù),用來重新命名列字段。
StreamTableEnvironment tableEnv = ...;
DataStream<Long> stream = ...;
將數(shù)據(jù)流轉(zhuǎn)換成動態(tài)表,動態(tài)表只有一個字段,重命名為myLong
Table table = tableEnv.fromDataStream(stream,$("myLong"));
Tuple類型
- 當原子類型不做重命名時,默認的字段名就是"f0",容易想到,其實就是將原子類型看做了一元組Tuple1的處理結(jié)果。
- Table支持Flink中定義的元組類型Tuple,對應在表中字段名默認就是元祖中元素的屬性名f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段還可以通過調(diào)用表達式的as()方法來進行重命名。
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long,Integer>> stream = ... ;
將數(shù)據(jù)流轉(zhuǎn)換成只包含f1字段的表
Table table = tableEnv.fromDataStream(stream,$("f1"));
將數(shù)據(jù)流轉(zhuǎn)換成包含f0和f1字段的表,在表中f0和f1位置交換
Table table = tableEnv.fromDataStream(stream,$("f1"),$("f0"));
將f1字段命名為myInt,f0命名為myLong
Table table = tableEnv.fromDataStream(stream,$("f1").as("myInt"),$("f0").as("myLong"));
Row類型文章來源:http://www.zghlxwxcb.cn/news/detail-400803.html
- Flink中還定義了一個在關(guān)系型表中更加通用的數(shù)據(jù)類型——行(Row),它是Table中數(shù)據(jù)的基本組織形式。Row類型也是一種復合類型,它的長度固定,而且無法直接推斷出每個字段的類型,所以在使用時必須指明具體的類型信息。在創(chuàng)建Table時調(diào)用的CREATE語句就會將所有的字段名稱和類型指定,這在Flink中被稱為表的模式結(jié)構(gòu)(Schema)。除此之外,Row類型還附加了一個屬性RowKind,用來表示當前行在更新操作中的類型。這樣,Row就可以用來表示更新日志流(changelog stream)中的數(shù)據(jù),從而架起了Flink中流和表的轉(zhuǎn)換橋梁。
- 所以在更新日志流中,元素的類型必須是Row,而且需要調(diào)用ofKind(0方法來指定更新類型。下面是一個具體的例子:
DataStream<Row> dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT,"Alice",12),
Row.ofKind(RowKind.INSERT,"Bob",5),
Row.ofKind(RowKind.UPDATE_BEFORE,"Alice",12),
Row.ofKind(RowKind.UPDATE_AFTER,"Alice",100)
);
將更新日志流轉(zhuǎn)換為表文章來源地址http://www.zghlxwxcb.cn/news/detail-400803.html
Table table = tableEnv.fromChangelogStream(dataStream);
到了這里,關(guān)于Flink系列Table API和SQL之:表和流的轉(zhuǎn)換的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!