一、Flink API的類型:
1.低級api:
????????提供了對時間和狀態(tài)的細粒度控制,簡潔性和易用性較差,主要應用在對一些復雜事件的處理邏輯上。
2.核心api:
????????要提供了針對流數(shù)據(jù)和離線數(shù)據(jù)的處理,對低級API進行了一些封裝,提供了filter、sum、max、min等高級函數(shù),簡單且易用,所以在工作中應用比較廣泛。核心api分成兩類
(1)DataStream API:用于處理無界數(shù)據(jù)流,提供了各種操作符來處理流數(shù)據(jù)。
(2)DataSet API:用于處理有界數(shù)據(jù)集,提供了各種操作符來處理批處理數(shù)據(jù)。
3.Table api/sql:
(1)Table API:一般與DataSet或者DataStream緊密關聯(lián),首先通過一個DataSet或DataStream創(chuàng)建出一個Table;然后用類似于filter、join或者select關系型轉化操作來轉化為一個新的Table對象;最后將一個Table對象轉回一個DataSet或DataStream。與SQL不同的是,Table API的查詢不是一個指定的SQL字符串,而是調用指定的API方法
(2)SQL:Flink的SQL集成是基于Apache Calcite的,Apache Calcite實現(xiàn)了標準的SQL,使用起來比其他API更加靈活,因為可以直接使用SQL語句。Table API和SQL可以很容易地結合在一塊使用,它們都返回Table對象
二、Flink DataStream的常用API:
DataStream API主要分為3塊:DataSource、Transformation、Sink。
1.DataSource 輸入源:
Flink針對DataStream提供了大量的已經(jīng)實現(xiàn)的DataSource(數(shù)據(jù)源)接口,比如下面4種。
(1)基于文件:讀取文本文件,文件遵循TextInputFormat逐行讀取規(guī)則并返回
(2)基于Socket:從Socket中讀取數(shù)據(jù),元素可以通過一個分隔符分開
(3)基于集合:通過Java的Collection集合創(chuàng)建一個數(shù)據(jù)流,集合中的所有元素必須是相同類型的
(4)自定義輸入:addSource可以實現(xiàn)讀取第三方數(shù)據(jù)源的數(shù)據(jù)。Flink也提供了一批內置的Connector(連接器)。連接器會提供對應的Source支持,如mq(kafka/RabbitMQ)、es、redis、mysql(通過JDBC連接器)等等
2.Transformation 轉換器:
????????它對一個或多個輸入數(shù)據(jù)源進行計算處理,比如Map、FlatMap和Filter等操作,F(xiàn)link針對DataStream提供了大量的已經(jīng)實現(xiàn)的算子:
(1)Map:輸入一個元素,然后返回一個元素,中間可以進行清洗轉換等操作。
(2)FlatMap:輸入一個元素,可以返回零個、一個或者多個元素。
(3)Filter:過濾函數(shù),對傳入的數(shù)據(jù)進行判斷,符合條件的數(shù)據(jù)會被留下。
(4)KeyBy:根據(jù)指定的Key進行分組,Key相同的數(shù)據(jù)會進入同一個分區(qū)。
????????KeyBy的兩種典型用法如下:
????????????????DataStream.keyBy("someKey") 指定對象中的someKey段作為分組Key。
????????????????DataStream.keyBy(0) 指定Tuple中的第一個元素作為分組Key
(5)Reduce:對數(shù)據(jù)進行聚合操作,結合當前元素和上一次Reduce返回的值進行聚合操作,然后返回一個新的值
(6)Aggregations:sum()、min()、max()等
(7)Union:合并多個流,新的流會包含所有流中的數(shù)據(jù),但是Union有一個限制,就是所有合并的流類型必須是一致的
(8)Connect:和Union類似,但是只能連接兩個流,兩個流的數(shù)據(jù)類型可以不同,會對兩個流中的數(shù)據(jù)應用不同的處理方法。
(9)coMap和coFlatMap:在ConnectedStream中需要使用這種函數(shù),類似于Map和flatMap
(10)Split:根據(jù)規(guī)則把一個數(shù)據(jù)流切分為多個流
(11)Select:和Split配合使用,選擇切分后的流
3.Sink 輸出源:
Flink針對DataStream提供了大量的已經(jīng)實現(xiàn)的數(shù)據(jù)目的地(Sink)
(1)writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取。
(2)print() / printToErr():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中。
(3)自定義輸出:addSink可以實現(xiàn)把數(shù)據(jù)輸出到第三方存儲介質中。如hdfs、mysql(通過JDBC連接器)、es、kafka、redis
三、Flink DataSet的常用API分析
DataSet API也可以分為3塊來分析:DataSource、Transformation和Sink,使用類似,這里只說對比DataStream的區(qū)別:
-
數(shù)據(jù)類型:DataSet API適用于處理有界數(shù)據(jù),即離線批處理;DataStream API適用于處理無界數(shù)據(jù),即實時流處理。
-
數(shù)據(jù)處理方式:DataSet API采用批處理方式,即將數(shù)據(jù)讀取到內存中,進行批量計算,然后將結果寫出;DataStream API采用流處理方式,即從數(shù)據(jù)流中逐個讀取數(shù)據(jù),進行實時計算,然后將結果發(fā)送到下游。
-
時間處理:在DataStream API中,時間處理非常重要,可以通過時間窗口、時間滑動窗口等方式對數(shù)據(jù)進行處理。而在DataSet API中,時間處理相對簡單,通常只需要使用時間戳進行排序和分組即可。
-
窗口:DataStream API中支持各種窗口,如滾動窗口、滑動窗口、會話窗口等,而DataSet API中則不支持窗口。
-
穩(wěn)定性:由于DataStream API中處理的是實時數(shù)據(jù)流,因此對數(shù)據(jù)的穩(wěn)定性要求較高,需要考慮數(shù)據(jù)丟失、重復等問題。而DataSet API中處理的是靜態(tài)數(shù)據(jù),不需要考慮這些問題。
四、Table API和SQL的基本使用
????????Flink針對標準的流處理和批處理提供了兩種關系型API:Table API和SQL。Table API允許用戶以一種很直觀的方式進行select、filter和join操作;Flink SQL支持基于 ApacheCalcite實現(xiàn)的標準SQL。針對批處理和流處理可以提供相同的處理語義和結果。
????????Table API和SQL是關系型API,用戶可以像操作MySQL數(shù)據(jù)庫表一樣來操作數(shù)據(jù),而不需要通過編寫Java代碼來完成Flink Function,更不需要手工為Java代碼調優(yōu)。另外,SQL作為一個非程序員可操作的語言,學習成本很低,如果一個系統(tǒng)提供SQL支持,將很容易被用戶接受。
1.Table API的基本使用
(1)創(chuàng)建TableEnvironment對象
TableEnvironment是Flink Table API的主要入口,它提供了各種方法來創(chuàng)建Table對象、注冊表、執(zhí)行查詢等操作??梢酝ㄟ^以下方式創(chuàng)建TableEnvironment對象:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
(2)創(chuàng)建Table對象
在Table API中,可以通過fromDataStream()方法將DataStream轉換為Table,也可以通過其他方法創(chuàng)建Table對象。例如,以下代碼創(chuàng)建了一個Table對象:
val table = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("city", DataTypes.STRING())
),
Rows.rowOf("Alice",25, "Beijing"),
Rows.rowOf("Bob",30, "Shanghai"),
Rows.rowOf("Charlie",35, "Hangzhou")
)
(3)執(zhí)行查詢
在Table對象上可以執(zhí)行各種查詢操作,比如filter、select、group by等。例如,以下代碼對Table進行了一個簡單的select操作:
val result = table.select("name, age").where("age >30")
(4)輸出結果
最后,可以通過toRetractStream()方法將Table對象轉換為DataStream輸出結果。例如,以下代碼將查詢結果輸出到控制臺:
result.toRetractStream[(String, Int)].print()
2.SQL的基本使用
(1)創(chuàng)建TableEnvironment對象:
和Table API一樣,SQL也需要TableEnvironment對象來執(zhí)行查詢等操作??梢酝ㄟ^以下方式創(chuàng)建TableEnvironment對象:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
(2)注冊表:
在SQL中,需要將數(shù)據(jù)源注冊為表,并為其定義表架構??梢酝ㄟ^以下代碼將DataStream注冊為表:
val ds: DataStream[(String, Int)] = ...
tEnv.createTemporaryView("my_table", ds, 'name, 'age)
其中,'name和'age是數(shù)據(jù)源中的字段名,用于定義表架構。
(3)執(zhí)行查詢:
在SQL中,可以通過executeSql()方法執(zhí)行SQL查詢。例如,以下代碼查詢了my_table表中年齡大于30的記錄:
val result = tEnv.executeSql("SELECT name, age FROM my_table WHERE age >30")
(4)輸出結果:
和Table API一樣,最后可以通過toRetractStream()方法將查詢結果轉換為DataStream輸出結果。例如,以下代碼將查詢結果輸出到控制臺:
result.toRetractStream[(String, Int)].print()
四、Flink支持的DataType分析
Flink支持Java和Scala中的大部分數(shù)據(jù)類型。
Java Tuple和Scala Case Class。
Java POJO:Java實體類。
Primitive Type:默認支持Java和Scala基本數(shù)據(jù)類型。
General Class Type:默認支持大多數(shù)Java和Scala Class。
Hadoop Writable:支持Hadoop中實現(xiàn)了org.apache.Hadoop.Writable的數(shù)據(jù)類型。
Special Type:比如Scala中的Either Option和Try。
根據(jù)類型分組:
1.基本數(shù)據(jù)類型:BOOLEAN、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DECIMAL、CHAR、VARCHAR、BINARY、VARBINARY。
2.時間類型:DATE、TIME、TIMESTAMP、INTERVAL YEAR、INTERVAL MONTH、INTERVAL DAY、INTERVAL HOUR、INTERVAL MINUTE、INTERVAL SECOND。
3.復合類型:ARRAY、MAP、ROW。
4.未知類型:NULL、RAW。文章來源:http://www.zghlxwxcb.cn/news/detail-402622.html
Flink通過DataType來描述表中列的數(shù)據(jù)類型,從而進行類型檢查和類型推斷。同時,F(xiàn)link還支持使用UDF(User-Defined Function)自定義的數(shù)據(jù)類型。文章來源地址http://www.zghlxwxcb.cn/news/detail-402622.html
到了這里,關于Flink學習筆記(二)Flink常用API詳解的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!