為 Flink 量身定制的序列化框架
大家都知道現(xiàn)在大數(shù)據(jù)生態(tài)非常火,大多數(shù)技術(shù)組件都是運行在JVM
上的,Flink
也是運行在JVM
上,基于JVM
的數(shù)據(jù)分析引擎都需要將大量的數(shù)據(jù)存儲在內(nèi)存中,這就不得不面臨JVM
的一些問題,比如Java
對象存儲密度較低等。針對這些問題,最常用的方法就是實現(xiàn)一個顯式的內(nèi)存管理,也就是說用自定義的內(nèi)存池來進(jìn)行內(nèi)存的分配回收,接著將序列化后的對象存儲到內(nèi)存塊中。
現(xiàn)在Java
生態(tài)圈中已經(jīng)有許多序列化框架,比如說Java serialization, Kryo,Apache Avro
等等。但是Flink
依然是選擇了自己定制的序列化框架,那么到底有什么意義呢?若Flink
選擇自己定制的序列化框架,對類型信息了解越多,可以在早期完成類型檢查,更好的選取序列化方式,進(jìn)行數(shù)據(jù)布局,節(jié)省數(shù)據(jù)的存儲空間,直接操作二進(jìn)制數(shù)據(jù)。Flink
在其內(nèi)部構(gòu)建了一套自己的類型系統(tǒng),Flink
現(xiàn)階段支持的類型分類如圖所示,從圖中可以看到Flink
類型可以分為基礎(chǔ)類型Basic
、數(shù)組Arrays
、復(fù)合類型Composite
、輔助類型Auxiliary
、泛型和其它類型Generic
。Flink
支持任意的Java
或是Scala
類型。不需要像Hadoop
一樣去實現(xiàn)一個特定的接口org.apache.hadoop.io.Writable
,Flink
能夠自動識別數(shù)據(jù)類型。TypeInformation
的思維導(dǎo)圖如圖所示,從圖中可以看出,在Flink
中每一個具體的類型都對應(yīng)了一個具體的TypeInformation
實現(xiàn)類,例如BasicTypeInformation
中的IntegerTypeInformation
和FractionalTypeInformation
都具體的對應(yīng)了一個TypeInformation
。然后還有BasicArrayTypeInformation
、CompositeType
以及一些其它類型,也都具體對應(yīng)了一個TypeInformation
。
TypeInformation
是Flink
類型系統(tǒng)的核心類。對于用戶自定義的Function
來說,Flink
需要一個類型信息來作為該函數(shù)的輸入輸出類型,即TypeInfomation
。該類型信息類作為一個工具來生成對應(yīng)類型的序列化器TypeSerializer
,并用于執(zhí)行語義檢查,比如當(dāng)一些字段在作為join
或grouping
的鍵時,檢查這些字段是否在該類型中存在。
Flink 的序列化過程
在Flink
序列化過程中,進(jìn)行序列化操作必須要有序列化器,那么序列化器從何而來?
每一個具體的數(shù)據(jù)類型都對應(yīng)一個TypeInformation
的具體實現(xiàn),每一個TypeInformation
都會為對應(yīng)的具體數(shù)據(jù)類型提供一個專屬的序列化器。通過 Flink
的序列化過程圖可以看到TypeInformation
會提供一個createSerialize()
方法,通過這個方法就可以得到該類型進(jìn)行數(shù)據(jù)序列化操作與反序化操作的對象TypeSerializer
。對于大多數(shù)數(shù)據(jù)類型
Flink
可以自動生成對應(yīng)的序列化器,能非常高效地對數(shù)據(jù)集進(jìn)行序列化和反序列化,比如,BasicTypeInfo
、WritableTypeInfo
等,但針對GenericTypeInfo
類型,Flink
會使用Kyro
進(jìn)行序列化和反序列化。其中,Tuple
、Pojo
和CaseClass
類型是復(fù)合類型,它們可能嵌套一個或者多個數(shù)據(jù)類型。在這種情況下,它們的序列化器同樣是復(fù)合的。它們會將內(nèi)嵌類型的序列化委托給對應(yīng)類型的序列化器。
簡單的介紹下Pojo
的類型規(guī)則,即在滿足一些條件的情況下,才會選用Pojo
的序列化進(jìn)行相應(yīng)的序列化與反序列化的一個操作。即類必須是Public
的,且類有一個public
的無參數(shù)構(gòu)造函數(shù),該類(以及所有超類)中的所有非靜態(tài)no-static
、非瞬態(tài)no-transient
字段都是public
的(和非最終的final
)或者具有公共getter
和setter
方法,該方法遵循getter
和setter
的Java bean
命名約定。當(dāng)用戶定義的數(shù)據(jù)類型無法識別為POJO
類型時,必須將其作為GenericType
處理并使用Kryo
進(jìn)行序列化。
Flink
自帶了很多TypeSerializer
子類,大多數(shù)情況下各種自定義類型都是常用類型的排列組合,因而可以直接復(fù)用,如果內(nèi)建的數(shù)據(jù)類型和序列化方式不能滿足你的需求,Flink
的類型信息系統(tǒng)也支持用戶拓展。若用戶有一些特殊的需求,只需要實現(xiàn) TypeInformation
、TypeSerializer
和TypeComparator
即可定制自己類型的序列化和比較大小方式,來提升數(shù)據(jù)類型在序列化和比較時的性能。
序列化就是將數(shù)據(jù)結(jié)構(gòu)或者對象轉(zhuǎn)換成一個二進(jìn)制串的過程,在Java
里面可以簡單地理解成一個byte
數(shù)組。而反序列化恰恰相反,就是將序列化過程中所生成的二進(jìn)制串轉(zhuǎn)換成數(shù)據(jù)結(jié)構(gòu)或者對象的過程。下面就以內(nèi)嵌型的Tuple3
這個對象為例,簡述一下它的序列化過程。
Tuple3
包含三個層面,一是int
類型,一是double
類型,還有一個是Person
。Person
包含兩個字段,一是int
型的ID
,另一個是 String
類型的name
,它在序列化操作時,會委托相應(yīng)具體序列化的序列化器進(jìn)行相應(yīng)的序列化操作。從圖中可以看到Tuple3
會把 int
類型通過IntSerializer
進(jìn)行序列化操作,此時int
只需要占用四個字節(jié)就可以了。根據(jù)int
占用四個字節(jié),這個能夠體現(xiàn)出Flink
可序列化過程中的一個優(yōu)勢,即在知道數(shù)據(jù)類型的前提下,可以更好的進(jìn)行相應(yīng)的序列化與反序列化操作。相反,如果采用Java
的序列化,雖然能夠存儲更多的屬性信息,但一次占據(jù)的存儲空間會受到一定的損耗。Person
類會被當(dāng)成一個Pojo
對象來進(jìn)行處理,PojoSerializer
序列化器會把一些屬性信息使用一個字節(jié)存儲起來。同樣,其字段則采取相對應(yīng)的序列化器進(jìn)行相應(yīng)序列化,在序列化完的結(jié)果中,可以看到所有的數(shù)據(jù)都是由MemorySegment
去支持。
MemorySegment
具有什么作用呢? MemorySegment
在Flink
中會將對象序列化到預(yù)分配的內(nèi)存塊上,它代表1
個固定長度的內(nèi)存,默認(rèn)大小為32 kb
。MemorySegment
代表Flink
中的一個最小的內(nèi)存分配單元,相當(dāng)于是Java
的一個byte
數(shù)組。 每條記錄都會以序列化的形式存儲在一個或多個MemorySegment
中。
Flink 序列化的最佳實踐
Flink
常見的應(yīng)用場景有四種,即注冊子類型、注冊自定義序列化器、添加類型提示、手動創(chuàng)建TypeInformation
,具體如下:
【1】注冊子類型: 如果函數(shù)簽名只描述了超類型,但是它們實際上在執(zhí)行期間使用了超類型的子類型,那么讓Flink
了解這些子類型會大大提高性能??梢栽?code>StreamExecutionEnvironment或ExecutionEnvironment
中調(diào)用.registertype (clazz)
注冊子類型信息。
【2】注冊自定義序列化器: 對于不適用于自己的序列化框架的數(shù)據(jù)類型,Flink
會使用Kryo
來進(jìn)行序列化,并不是所有的類型都與Kryo
無縫連接,具體注冊方法在下文介紹。
【3】添加類型提示: 有時,當(dāng)Flink
用盡各種手段都無法推測出泛型信息時,用戶需要傳入一個類型提示TypeHint
,這個通常只在Java API
中需要。
【4】手動創(chuàng)建TypeInformation
: 在某些API
調(diào)用中,這可能是必需的,因為Java
的泛型類型擦除導(dǎo)致Flink
無法推斷數(shù)據(jù)類型。
其實在大多數(shù)情況下,用戶不必?fù)?dān)心序列化框架和注冊類型,因為Flink
已經(jīng)提供了大量的序列化操作,不需要去定義自己的一些序列化器,但是在一些特殊場景下,需要去做一些相應(yīng)的處理。
實踐 - 類型聲明: 類型聲明去創(chuàng)建一個類型信息的對象是通過哪種方式?通常是用TypeInformation.of()
方法來創(chuàng)建一個類型信息的對象,具體說明如下:
【1】對于非泛型類,直接傳入class
對象即可
PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
【2】對于泛型類,需要通過TypeHint
來保存泛型類型信息
final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});
【3】預(yù)定義常量: 如BasicTypeInfo
,這個類定義了一系列常用類型的快捷方式,對于String
、Boolean
、Byte
、Short
、Integer
、Long
、Float
、Double
、Char
等基本類型的類型聲明,可以直接使用。而且Flink
還提供了完全等價的Types
類org.apache.flink.api.common.typeinfo.Types
。特別需要注意的是,flink-table
模塊也有一個Types
類org.apache.flink.table.api.Types
,用于table
模塊內(nèi)部的類型定義信息,用法稍有不同。使用IDE
的自動import
時一定要小心。
【4】自定義TypeInfo
和TypeInfoFactory
: 通過自定義TypeInfo
為任意類提供Flink
原生內(nèi)存管理(而非Kryo
),使存
儲更緊湊,運行時也更高效。需要注意在自定義類上使用@TypeInfo
注解,隨后創(chuàng)建相應(yīng)的TypeInfoFactory
并覆蓋createTypeInfo()
方法。
@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0,T1>{
public T0 myfield0;
public T1 myfield1;
}
public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple>{
@Override
public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInfomation<?>> genericParameters){
return new MyTupleTypeInfo(genericParameters.get("T0").genericParameters.get("T1"));
}
}
實踐 - 注冊子類型
Flink
認(rèn)識父類,但不一定認(rèn)識子類的一些獨特特性,因此需要單獨注冊子類型。StreamExecutionEnvironment
和 ExecutionEnvironment
提供registerType()
方法用來向Flink
注冊子類信息。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerType(typeClass);
在registerType()
方法內(nèi)部,會使用TypeExtractor
來提取類型信息,如上所示,獲取到的類型信息屬于PojoTypeInfo
及其子類,那么需要將其注冊到一起,否則統(tǒng)一交給Kryo
去處理,Flink
并不過問 ( 這種情況下性能會變差 )。
實踐 - Kryo 序列化
對于Flink
無法序列化的類型(例如用戶自定義類型,沒有registerType
,也沒有自定義TypeInfo
和TypeInfoFactory
),默認(rèn)會交給 Kryo
處理,如果Kryo
仍然無法處理(例如Guava
、Thrift
、Protobuf
等第三方庫的一些類),有兩種解決方案:
【1】強制使用Avro
來代替Kryo
env.getConfig().enableForceAvro();
【2】為Kryo
增加自定義的Serializer
以增強Kryo
的功能
env.getConfig().addDefaultKryoSerializer(clazz, serializer);
注:如果希望完全禁用Kryo
(100%
使用Flink
的序列化機制),可以通過Kryoenv.getConfig().disableGenericTypes()
的方式完成,但注意一切無法處理的類都將導(dǎo)致異常,這種對于調(diào)試非常有效。
Flink 通信層的序列化
Flink
的Task
之間如果需要跨網(wǎng)絡(luò)傳輸數(shù)據(jù)記錄, 那么就需要將數(shù)據(jù)序列化之后寫入NetworkBufferPool
,然后下層的Task
讀出之后再進(jìn)行反序列化操作,最后進(jìn)行邏輯處理。為了使得記錄以及事件能夠被寫入 Buffer,隨后在消費時再從Buffer
中讀出,Flink
提供了數(shù)據(jù)記錄序列化器RecordSerializer
與反序列化器RecordDeserializer
以及事件序列化器EventSerializer
。
Function
發(fā)送的數(shù)據(jù)被封裝成SerializationDelegate
,它將任意元素公開為IOReadableWritable
以進(jìn)行序列化,通過setInstance()
來傳入要序列化的數(shù)據(jù)。在Flink
通信層的序列化中,有幾個問題值得關(guān)注,具體如下:
【1】何時確定Function
的輸入輸出類型?
在構(gòu)建StreamTransformation
的時候通過TypeExtractor
工具確定Function
的輸入輸出類型。TypeExtractor
類可以根據(jù)方法簽名、子類信息等蛛絲馬跡自動提取或恢復(fù)類型信息。
【2】何時確定Function
的序列化 / 反序列化器?
構(gòu)造StreamGraph
時, 通過TypeInfomation
的createSerializer()
方法獲取對應(yīng)類型的序列化器TypeSerializer
,并在addOperator()
的過程中執(zhí)行setSerializers() 操作,設(shè)置StreamConfig
的TYPESERIALIZERIN1
、TYPESERIALIZERIN2
、 TYPESERIALIZEROUT_1
屬性。
【3】何時進(jìn)行真正的序列化 / 反序列化操作? 這個過程與TypeSerializer
又是怎么聯(lián)系在一起的呢?
構(gòu)造StreamGraph
時, 通過TypeInfomation
的createSerializer()
方法獲取對應(yīng)類型的序列化器TypeSerializer
,并在addOperator()
的過程中執(zhí)行setSerializers()
操作,設(shè)置StreamConfig
的TYPESERIALIZERIN1
、 TYPESERIALIZERIN2
、 TYPESERIALIZEROUT_1
屬性。
【4】何時進(jìn)行真正的序列化 / 反序列化操作? 這個過程與TypeSerializer
又是怎么聯(lián)系在一起的呢?文章來源:http://www.zghlxwxcb.cn/news/detail-760366.html
大家都應(yīng)該清楚Task
和StreamTask
兩個概念,Task
是直接受TaskManager
管理和調(diào)度的,而Task
又會調(diào)用StreamTask
,而StreamTask
中真正封裝了算子的處理邏輯。在run()
方法中,首先將反序列化后的數(shù)據(jù)封裝成StreamRecord
交給算子處理;然后將處理結(jié)果通過Collector
發(fā)送給下游 ( 在構(gòu)建Collector
時已經(jīng)確定了SerializtionDelegate
),并通過RecordWriter
寫入器將序列化后的結(jié)果寫入DataOutput
;最后序列化的操作交給SerializerDelegate
處理,實際還是通過TypeSerializer
的serialize()
方法完成。文章來源地址http://www.zghlxwxcb.cn/news/detail-760366.html
到了這里,關(guān)于Flink 數(shù)據(jù)序列化的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!