Flink API
總共分為4
層這里主要整理Table API
的使用
Table API
是流處理和批處理通用的關系型API
,Table API
可以基于流輸入或者批輸入來運行而不需要進行任何修改。Table API
是SQL
語言的超集并專門為Apache Flink
設計的,Table API
是Scala
和Java
語言集成式的API
。與常規(guī)SQL
語言中將查詢指定為字符串不同,Table API
查詢是以Java
或Scala
中的語言嵌入樣式來定義的,具有IDE
支持如:自動完成和語法檢測。需要引入的pom
依賴如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.12</artifactId>
<version>1.7.2</version>
</dependency>
Table API & SQL
TableAPI: WordCount
案例
tab.groupBy("word").select("word,count(1) as count")
SQL: WordCount
案例
SELECT word,COUNT(*) AS cnt FROM MyTable GROUP BY word
【1】聲明式: 用戶只關系做什么,不用關心怎么做;
【2】高性能: 支持查詢優(yōu)化,可以獲取更好的執(zhí)行性能,因為它的底層有一個優(yōu)化器,跟SQL
底層有優(yōu)化器是一樣的。
【3】流批統(tǒng)一: 相同的統(tǒng)計邏輯,即可以流模型運行,也可以批模式運行;
【4】標準穩(wěn)定: 語義遵循SQL
標準,不易改動。當升級等底層修改,不用考慮API
兼容問題;
【5】易理解: 語義明確,所見即所得;
Table API 特點
Table API
使得多聲明的數(shù)據處理寫起來比較容易。
1 #例如,我們將a<10的數(shù)據過濾插入到xxx表中
2 table.filter(a<10).insertInto("xxx")
3 #我們將a>10的數(shù)據過濾插入到y(tǒng)yy表中
4 table.filter(a>10).insertInto("yyy")
Talbe
是Flink
自身的一種API
使得更容易擴展標準的SQL
(當且僅當需要的時候),兩者的關系如下:
Table API 編程
WordCount
編程示例
package org.apache.flink.table.api.example.stream;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class JavaStreamWordCount {
public static void main(String[] args) throws Exception {
//獲取執(zhí)行環(huán)境:CTRL + ALT + V
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
//指定一個路徑
String path = JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath();
//指定文件格式和分隔符,對應的Schema(架構)這里只有一列,類型是String
tEnv.connect(new FileSystem().path(path))
.withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
.withSchema(new Schema().field("word", Types.STRING))
.inAppendMode()
.registerTableSource("fileSource");//將source注冊到env中
//通過 scan 拿到table,然后執(zhí)行table的操作。
Table result = tEnv.scan("fileSource")
.groupBy("word")
.select("word, count(1) as count");
//將table輸出
tEnv.toRetractStream(result, Row.class).print();
//執(zhí)行
env.execute();
}
}
怎么定義一個 Table
Table myTable = tableEnvironment.scan("myTable")
都是從Environment
中scan
出來的。而這個myTable
又是我們注冊進去的。問題就是有哪些方式可以注冊Table
。
【1】Table descriptor: 類似于上述的WordCount
,指定一個文件系統(tǒng)fs
,也可以是kafka
等,還需要一些格式和Schema
等。
tEnv.connect(new FileSystem().path(path))
.withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
.withSchema(new Schema().field("word", Types.STRING))
.inAppendMode()
.registerTableSource("fileSource");//將source注冊到env中
【2】自定義一個 Table source: 然后把自己的Table source
注冊進去。
TableSource csvSource = new CsvTableSource(path,new String[]{"word"},new TypeInformation[]{Types.STRING});
tEnv.registerTableSource("sourceTable2", csvSource);
【3】注冊一個 DataStream: 例如下面一個String
類型的DataStream
,命名為myTable3
對應的schema
只有一列叫word
。
DataStream<String> stream = ...
// register the DataStream as table " myTable3" with
// fields "word"
tableEnv.registerDataStream("myTable3", stream, "word");
動態(tài)表
如果流中的數(shù)據類型是case class
可以直接根據case class
的結構生成table
tableEnv.fromDataStream(ecommerceLogDstream)
或者根據字段順序單獨命名:用單引放到字段前面來標識字段名。
tableEnv.fromDataStream(ecommerceLogDstream,'mid,'uid ......)
最后的動態(tài)表可以轉換為流進行輸出,如果不是簡單的插入就使用toRetractStream
table.toAppendStream[(String,String)]
如何輸出一個table
當我們獲取到一個結構表的時候(table
類型)執(zhí)行insertInto
目標表中:resultTable.insertInto("TargetTable");
【1】Table descriptor: 類似于注入,最終使用Sink進行輸出,例如如下輸出到targetTable
中,主要是最后一段的區(qū)別。
tEnv
.connect(new FileSystem().path(path)).withFormat(new OldCsv().field("word", Types.STRING)
.lineDelimiter("\n")).withSchema(new Schema()
.field("word", Types.STRING))
.registerTableSink("targetTable");
【2】自定義一個 Table sink: 輸出到自己的 sinkTable2注冊進去。
TableSink csvSink = new CsvTableSink(path,new String[]{"word"},new TypeInformation[]{Types.STRING});
tEnv.registerTableSink("sinkTable2", csvSink);
【3】輸出一個 DataStream: 例如下面產生一個RetractStream
,對應要給Tuple2
的聯(lián)系。Boolean
這行記錄時add
還是delete
。如果使用了groupby
,table
轉化為流的時候只能使用toRetractStream
。得到的第一個boolean
型字段標識 true
就是最新的數(shù)據(Insert
),false
表示過期老數(shù)據(Delete
)。如果使用的api
包括時間窗口,那么窗口的字段必須出現(xiàn)在groupBy
中。
// emit the result table to a DataStream
DataStream<Tuple2<Boolean, Row>> stream = tableEnv.toRetractStream(resultTable, Row.class)
stream.filter(_._1).print()
案例代碼:
package com.zzx.flink
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.table.api.java.Tumble
import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableEnvironment}
object FlinkTableAndSql {
def main(args: Array[String]): Unit = {
//執(zhí)行環(huán)境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//設置 時間特定為 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//讀取數(shù)據 MyKafkaConsumer 為自定義的 kafka 工具類,并傳入 topic
val dstream: DataStream[String] = env.addSource(MyKafkaConsumer.getConsumer("FLINKTABLE&SQL"))
//將字符串轉換為對象
val ecommerceLogDstream:DataStream[SensorReding] = dstream.map{
/* 引入如下依賴
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>*/
//將 String 轉換為 SensorReding
jsonString => JSON.parseObject(jsonString,classOf[SensorReding])
}
//告知 watermark 和 evetTime如何提取
val ecommerceLogWithEventTimeDStream: DataStream[SensorReding] =ecommerceLogDstream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[SensorReding](Time.seconds(0)) {
override def extractTimestamp(t: SensorReding): Long = {
t.timestamp
}
})
//設置并行度
ecommerceLogDstream.setParallelism(1)
//創(chuàng)建 Table 執(zhí)行環(huán)境
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
var ecommerceTable: Table = tableEnv.fromTableSource(ecommerceLogWithEventTimeDStream ,'mid,'uid,'ch,'ts.rowtime)
//通過 table api進行操作
//每10秒統(tǒng)計一次各個渠道的個數(shù) table api解決
//groupby window=滾動式窗口 用 eventtime 來確定開窗時間
val resultTalbe: Table = ecommerceTable.window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt).select('ch,'ch.count)
var ecommerceTalbe: String = "xxx"
//通過 SQL 執(zhí)行
val resultSQLTable: Table = tableEnv.sqlQuery("select ch,count(ch) from "+ ecommerceTalbe +"group by ch,Tumble(ts,interval '10' SECOND")
//把 Table 轉化成流輸出
//val appstoreDStream: DataStream[(String,String,Long)] = appstoreTable.toAppendStream[(String,String,Long)]
val resultDStream: DataStream[(Boolean,(String,Long))] = resultSQLTable.toRetractStream[(String,Long)]
//過濾
resultDStream.filter(_._1)
env.execute()
}
}
object MyKafkaConsumer {
def getConsumer(sourceTopic: String): FlinkKafkaConsumer011[String] ={
val bootstrapServers = "hadoop1:9092"
// kafkaConsumer 需要的配置參數(shù)
val props = new Properties
// 定義kakfa 服務的地址,不需要將所有broker指定上
props.put("bootstrap.servers", bootstrapServers)
// 制定consumer group
props.put("group.id", "test")
// 是否自動確認offset
props.put("enable.auto.commit", "true")
// 自動確認offset的時間間隔
props.put("auto.commit.interval.ms", "1000")
// key的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// value的序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//從kafka讀取數(shù)據,需要實現(xiàn) SourceFunction 他給我們提供了一個
val consumer = new FlinkKafkaConsumer011[String](sourceTopic, new SimpleStringSchema, props)
consumer
}
}
關于時間窗口
【1】用到時間窗口,必須提前聲明時間字段,如果是processTime
直接在創(chuàng)建動態(tài)表時進行追加就可以。如下的ps.proctime
。
val ecommerceLogTable: Table = tableEnv
.fromDataStream( ecommerceLogWithEtDstream,
`mid,`uid,`appid,`area,`os,`ps.proctime )
【2】如果是EventTime
要在創(chuàng)建動態(tài)表時聲明。如下的ts.rowtime
。
val ecommerceLogTable: Table = tableEnv
.fromDataStream( ecommerceLogWithEtDstream,
'mid,'uid,'appid,'area,'os,'ts.rowtime)
【3】滾動窗口可以使用Tumble over 10000.millis on
來表示
val table: Table = ecommerceLogTable.filter("ch = 'appstore'")
.window(Tumble over 10000.millis on 'ts as 'tt)
.groupBy('ch,'tt)
.select("ch,ch.count")
如何查詢一個 table
為了會有GroupedTable
等,為了增加限制,寫出正確的API
。
Table API 操作分類
1、與sql
對齊的操作,select
、as
、filter
等;
2、提升Table API
易用性的操作;
——Columns Operation
易用性: 假設有一張100
列的表,我們需要去掉一列,需要怎么操作?第三個API
可以幫你完成。我們先獲取表中的所有Column
,然后通過dropColumn
去掉不需要的列即可。主要是一個Table
上的算子。
Operators | Examples |
---|---|
AddColumns | Table orders = tableEnv.scan(“Orders”); Table result = orders.addColumns(“concat(c,‘sunny’)as desc”); 添加新的列,要求是列名不能重復。 |
addOrReplaceColumns | Table orders = tableEnv.scan(“Orders”); Table result = order.addOrReplaceColumns(“concat(c,‘sunny’) as desc”);添加列,如果存在則覆蓋 |
DropColumns | Table orders = tableEnv.scan(“Orders”); Table result = orders.dropColumns(“b c”); |
RenameColumns | Table orders = tableEnv.scan(“Orders”); Table result = orders.RenameColumns("b as b2,c as c2);列重命名 |
——Columns Function
易用性: 假設有一張表,我么需要獲取第20-80
列,該如何獲取。類似一個函數(shù),可以用在列選擇的任何地方,例如:Table.select(withColumns(a,1 to 10))
、GroupBy
等等。
語法 | 描述 |
---|---|
withColumns(…) | 選擇你指定的列 |
withoutColumns(…) | 反選你指定的列 |
列的操作語法(建議): 如下,它們都是上層包含下層的關系。
columnOperation:
withColumns(columnExprs) / withoutColumns(columnExprs) #可以接收多個參數(shù) columnExpr
columnExprs:
columnExpr [, columnExpr]* #可以分為如下三種情況
columnExpr:
columnRef | columnIndex to columnIndex | columnName to columnName #1 cloumn引用 2下標范圍操作 3名字的范圍操作
columnRef:
columnName(The field name that exists in the table) | columnIndex(a positive integer starting at 1)
Example: withColumns(a, b, 2 to 10, w to z)
Row based operation
/Map operation
易用性:
//方法簽名: 接收一個 scalarFunction 參數(shù),返回一個 Table
def map(scalarFunction: Expression): Table
class MyMap extends ScalarFunction {
var param : String = ""
//eval 方法接收一些輸入
def eval([user defined inputs]): Row = {
val result = new Row(3)
// Business processing based on data and parameters
// 根據數(shù)據和參數(shù)進行業(yè)務處理,返回最終結果
result
}
//指定結果對應的類型,例如這里 Row的類型,Row有三列
override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] = {
Types.ROW(Types.STRING, Types.INT, Types.LONG)
}
}
//使用 fun('e) 得到一個 Row 并定義名稱 abc 然后獲取 ac列
val res = tab
.map(fun('e)).as('a, 'b, 'c)
.select('a, 'c)
//好處:當你的列很多的時候,并且每一類都需要返回一個結果的時候
table.select(udf1(), udf2(), udf3()….)
VS
table.map(udf())
Map
是輸入一條輸出一條FlatMap operation
易用性:
//方法簽名:出入一個tableFunction
def flatMap(tableFunction: Expression): Table
#tableFunction 實現(xiàn)的列子,返回一個 User類型,是一個 POJOs類型,Flink能夠自動識別類型。
case class User(name: String, age: Int)
class MyFlatMap extends TableFunction[User] {
def eval([user defined inputs]): Unit = {
for(..){
collect(User(name, age))
}
}
}
//使用
val res = tab
.flatMap(fun('e,'f)).as('name, 'age)
.select('name, 'age)
Benefit
//好處
table.joinLateral(udtf) VS table.flatMap(udtf())
FlatMap
是輸入一行輸出多行FlatAggregate operation
功能性:文章來源:http://www.zghlxwxcb.cn/news/detail-761693.html
#方法簽名:輸入 tableAggregateFunction 與 AggregateFunction 很相似
def flatAggregate(tableAggregateFunction: Expression): FlatAggregateTable
class FlatAggregateTable(table: Table, groupKey: Seq[Expression], tableAggFun: Expression)
class TopNAcc {
var data: MapView[JInt, JLong] = _ // (rank -> value)
...
}
class TopN(n: Int) extends TableAggregateFunction[(Int, Long), TopNAccum] {
def accumulate(acc: TopNAcc, [user defined inputs]) {
...
}
#可以那多 column,進行多個輸出
def emitValue(acc: TopNAcc, out: Collector[(Int, Long)]): Unit = {
...
}
...retract/merge
}
#用法
val res = tab
.groupBy(‘a)
.flatAggregate(
flatAggFunc(‘e,’f) as (‘a, ‘b, ‘c))
.select(‘a, ‘c)
#好處
新增了一種agg,輸出多行
FlatAggregate operation
輸入多行輸出多行Aggregate
與FlatAggregate
的區(qū)別: 使用Max
和Top2
的場景比較Aggregate
和FlatAggregate
之間的差別。如下有一張輸入表,表有三列(ID
、NAME
、PRICE
),然后對Price
求最大指和Top2
。Max
操作是藍線,首先創(chuàng)建累加器,然后在累加器上accumulate
操作,例如6過去是6,3過去沒有6大還是6等等。得到最終得到8的結果。TOP2
操作時紅線,首先創(chuàng)建累加器,然后在累加器上accumulate
操作,例如6過去是6,3過去因為是兩個元素所以3也保存,當5過來時,和最小的比較,3就被淘汰了等等。得到最終得到8和6的結果。
總結:文章來源地址http://www.zghlxwxcb.cn/news/detail-761693.html
到了這里,關于Flink Table API 與 SQL 編程整理的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!