国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink 數(shù)據(jù)序列化

這篇具有很好參考價值的文章主要介紹了Flink 數(shù)據(jù)序列化。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

為 Flink 量身定制的序列化框架

大家都知道現(xiàn)在大數(shù)據(jù)生態(tài)非常火,大多數(shù)技術(shù)組件都是運行在JVM上的,Flink也是運行在JVM上,基于JVM的數(shù)據(jù)分析引擎都需要將大量的數(shù)據(jù)存儲在內(nèi)存中,這就不得不面臨JVM的一些問題,比如Java對象存儲密度較低等。針對這些問題,最常用的方法就是實現(xiàn)一個顯式的內(nèi)存管理,也就是說用自定義的內(nèi)存池來進(jìn)行內(nèi)存的分配回收,接著將序列化后的對象存儲到內(nèi)存塊中。
現(xiàn)在Java生態(tài)圈中已經(jīng)有許多序列化框架,比如說Java serialization, Kryo,Apache Avro等等。但是Flink依然是選擇了自己定制的序列化框架,那么到底有什么意義呢?若Flink選擇自己定制的序列化框架,對類型信息了解越多,可以在早期完成類型檢查,更好的選取序列化方式,進(jìn)行數(shù)據(jù)布局,節(jié)省數(shù)據(jù)的存儲空間,直接操作二進(jìn)制數(shù)據(jù)。
Flink 數(shù)據(jù)序列化,Flink,flink,大數(shù)據(jù),職場和發(fā)展,java,后端,算法,開發(fā)語言Flink在其內(nèi)部構(gòu)建了一套自己的類型系統(tǒng),Flink現(xiàn)階段支持的類型分類如圖所示,從圖中可以看到Flink類型可以分為基礎(chǔ)類型Basic、數(shù)組Arrays、復(fù)合類型Composite、輔助類型Auxiliary、泛型和其它類型Generic。Flink支持任意的Java或是Scala類型。不需要像Hadoop一樣去實現(xiàn)一個特定的接口org.apache.hadoop.io.Writable,Flink能夠自動識別數(shù)據(jù)類型。
Flink 數(shù)據(jù)序列化,Flink,flink,大數(shù)據(jù),職場和發(fā)展,java,后端,算法,開發(fā)語言
TypeInformation的思維導(dǎo)圖如圖所示,從圖中可以看出,在Flink中每一個具體的類型都對應(yīng)了一個具體的TypeInformation實現(xiàn)類,例如BasicTypeInformation中的IntegerTypeInformationFractionalTypeInformation都具體的對應(yīng)了一個TypeInformation。然后還有BasicArrayTypeInformationCompositeType以及一些其它類型,也都具體對應(yīng)了一個TypeInformation

TypeInformationFlink類型系統(tǒng)的核心類。對于用戶自定義的Function來說,Flink需要一個類型信息來作為該函數(shù)的輸入輸出類型,即TypeInfomation。該類型信息類作為一個工具來生成對應(yīng)類型的序列化器TypeSerializer,并用于執(zhí)行語義檢查,比如當(dāng)一些字段在作為joingrouping的鍵時,檢查這些字段是否在該類型中存在。

Flink 的序列化過程

Flink序列化過程中,進(jìn)行序列化操作必須要有序列化器,那么序列化器從何而來?
每一個具體的數(shù)據(jù)類型都對應(yīng)一個TypeInformation的具體實現(xiàn),每一個TypeInformation都會為對應(yīng)的具體數(shù)據(jù)類型提供一個專屬的序列化器。通過 Flink的序列化過程圖可以看到TypeInformation會提供一個createSerialize()方法,通過這個方法就可以得到該類型進(jìn)行數(shù)據(jù)序列化操作與反序化操作的對象TypeSerializer。
Flink 數(shù)據(jù)序列化,Flink,flink,大數(shù)據(jù),職場和發(fā)展,java,后端,算法,開發(fā)語言對于大多數(shù)數(shù)據(jù)類型 Flink可以自動生成對應(yīng)的序列化器,能非常高效地對數(shù)據(jù)集進(jìn)行序列化和反序列化,比如,BasicTypeInfoWritableTypeInfo等,但針對GenericTypeInfo類型,Flink會使用Kyro進(jìn)行序列化和反序列化。其中,Tuple、PojoCaseClass類型是復(fù)合類型,它們可能嵌套一個或者多個數(shù)據(jù)類型。在這種情況下,它們的序列化器同樣是復(fù)合的。它們會將內(nèi)嵌類型的序列化委托給對應(yīng)類型的序列化器。

簡單的介紹下Pojo的類型規(guī)則,即在滿足一些條件的情況下,才會選用Pojo的序列化進(jìn)行相應(yīng)的序列化與反序列化的一個操作。即類必須是Public的,且類有一個public的無參數(shù)構(gòu)造函數(shù),該類(以及所有超類)中的所有非靜態(tài)no-static、非瞬態(tài)no-transient字段都是public的(和非最終的final)或者具有公共gettersetter方法,該方法遵循gettersetterJava bean命名約定。當(dāng)用戶定義的數(shù)據(jù)類型無法識別為POJO類型時,必須將其作為GenericType處理并使用Kryo進(jìn)行序列化。

Flink自帶了很多TypeSerializer子類,大多數(shù)情況下各種自定義類型都是常用類型的排列組合,因而可以直接復(fù)用,如果內(nèi)建的數(shù)據(jù)類型和序列化方式不能滿足你的需求,Flink的類型信息系統(tǒng)也支持用戶拓展。若用戶有一些特殊的需求,只需要實現(xiàn) TypeInformation、TypeSerializerTypeComparator即可定制自己類型的序列化和比較大小方式,來提升數(shù)據(jù)類型在序列化和比較時的性能。

序列化就是將數(shù)據(jù)結(jié)構(gòu)或者對象轉(zhuǎn)換成一個二進(jìn)制串的過程,在Java里面可以簡單地理解成一個byte數(shù)組。而反序列化恰恰相反,就是將序列化過程中所生成的二進(jìn)制串轉(zhuǎn)換成數(shù)據(jù)結(jié)構(gòu)或者對象的過程。下面就以內(nèi)嵌型的Tuple3這個對象為例,簡述一下它的序列化過程。
Flink 數(shù)據(jù)序列化,Flink,flink,大數(shù)據(jù),職場和發(fā)展,java,后端,算法,開發(fā)語言

Tuple3包含三個層面,一是int類型,一是double類型,還有一個是PersonPerson包含兩個字段,一是int型的ID,另一個是 String 類型的name,它在序列化操作時,會委托相應(yīng)具體序列化的序列化器進(jìn)行相應(yīng)的序列化操作。從圖中可以看到Tuple3 會把 int類型通過IntSerializer進(jìn)行序列化操作,此時int只需要占用四個字節(jié)就可以了。根據(jù)int占用四個字節(jié),這個能夠體現(xiàn)出Flink可序列化過程中的一個優(yōu)勢,即在知道數(shù)據(jù)類型的前提下,可以更好的進(jìn)行相應(yīng)的序列化與反序列化操作。相反,如果采用Java的序列化,雖然能夠存儲更多的屬性信息,但一次占據(jù)的存儲空間會受到一定的損耗。Person類會被當(dāng)成一個Pojo對象來進(jìn)行處理,PojoSerializer序列化器會把一些屬性信息使用一個字節(jié)存儲起來。同樣,其字段則采取相對應(yīng)的序列化器進(jìn)行相應(yīng)序列化,在序列化完的結(jié)果中,可以看到所有的數(shù)據(jù)都是由MemorySegment去支持。

MemorySegment具有什么作用呢? MemorySegmentFlink中會將對象序列化到預(yù)分配的內(nèi)存塊上,它代表1個固定長度的內(nèi)存,默認(rèn)大小為32 kb。MemorySegment代表Flink中的一個最小的內(nèi)存分配單元,相當(dāng)于是Java的一個byte數(shù)組。 每條記錄都會以序列化的形式存儲在一個或多個MemorySegment中。

Flink 序列化的最佳實踐

Flink常見的應(yīng)用場景有四種,即注冊子類型、注冊自定義序列化器、添加類型提示、手動創(chuàng)建TypeInformation,具體如下:
【1】注冊子類型: 如果函數(shù)簽名只描述了超類型,但是它們實際上在執(zhí)行期間使用了超類型的子類型,那么讓Flink了解這些子類型會大大提高性能??梢栽?code>StreamExecutionEnvironment或ExecutionEnvironment中調(diào)用.registertype (clazz) 注冊子類型信息。
【2】注冊自定義序列化器: 對于不適用于自己的序列化框架的數(shù)據(jù)類型,Flink會使用Kryo來進(jìn)行序列化,并不是所有的類型都與Kryo無縫連接,具體注冊方法在下文介紹。
【3】添加類型提示: 有時,當(dāng)Flink用盡各種手段都無法推測出泛型信息時,用戶需要傳入一個類型提示TypeHint,這個通常只在Java API中需要。
【4】手動創(chuàng)建TypeInformation 在某些API調(diào)用中,這可能是必需的,因為Java的泛型類型擦除導(dǎo)致Flink無法推斷數(shù)據(jù)類型。
其實在大多數(shù)情況下,用戶不必?fù)?dān)心序列化框架和注冊類型,因為Flink已經(jīng)提供了大量的序列化操作,不需要去定義自己的一些序列化器,但是在一些特殊場景下,需要去做一些相應(yīng)的處理。

實踐 - 類型聲明: 類型聲明去創(chuàng)建一個類型信息的對象是通過哪種方式?通常是用TypeInformation.of()方法來創(chuàng)建一個類型信息的對象,具體說明如下:
【1】對于非泛型類,直接傳入class對象即可

PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);

【2】對于泛型類,需要通過TypeHint來保存泛型類型信息

final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});

【3】預(yù)定義常量:BasicTypeInfo,這個類定義了一系列常用類型的快捷方式,對于String、Boolean、ByteShort、Integer、Long、Float、Double、Char等基本類型的類型聲明,可以直接使用。而且Flink還提供了完全等價的Typesorg.apache.flink.api.common.typeinfo.Types。特別需要注意的是,flink-table模塊也有一個Typesorg.apache.flink.table.api.Types,用于table模塊內(nèi)部的類型定義信息,用法稍有不同。使用IDE 的自動import時一定要小心。
【4】自定義TypeInfoTypeInfoFactory 通過自定義TypeInfo為任意類提供Flink原生內(nèi)存管理(而非Kryo),使存
儲更緊湊,運行時也更高效。需要注意在自定義類上使用@TypeInfo注解,隨后創(chuàng)建相應(yīng)的TypeInfoFactory并覆蓋createTypeInfo()方法。

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0,T1>{
    public T0 myfield0;
    public T1 myfield1;
}

public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple>{

    @Override
    public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInfomation<?>> genericParameters){
        return new MyTupleTypeInfo(genericParameters.get("T0").genericParameters.get("T1"));
    }
}

實踐 - 注冊子類型

Flink認(rèn)識父類,但不一定認(rèn)識子類的一些獨特特性,因此需要單獨注冊子類型。StreamExecutionEnvironmentExecutionEnvironment提供registerType()方法用來向Flink注冊子類信息。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerType(typeClass);

registerType()方法內(nèi)部,會使用TypeExtractor來提取類型信息,如上所示,獲取到的類型信息屬于PojoTypeInfo及其子類,那么需要將其注冊到一起,否則統(tǒng)一交給Kryo去處理,Flink并不過問 ( 這種情況下性能會變差 )。

實踐 - Kryo 序列化

對于Flink無法序列化的類型(例如用戶自定義類型,沒有registerType,也沒有自定義TypeInfoTypeInfoFactory),默認(rèn)會交給 Kryo處理,如果Kryo仍然無法處理(例如Guava、Thrift、Protobuf等第三方庫的一些類),有兩種解決方案:
【1】強制使用Avro來代替Kryo

env.getConfig().enableForceAvro();

【2】為Kryo增加自定義的Serializer以增強Kryo的功能

env.getConfig().addDefaultKryoSerializer(clazz, serializer);

注:如果希望完全禁用Kryo100%使用Flink的序列化機制),可以通過Kryoenv.getConfig().disableGenericTypes()的方式完成,但注意一切無法處理的類都將導(dǎo)致異常,這種對于調(diào)試非常有效。

Flink 通信層的序列化

FlinkTask之間如果需要跨網(wǎng)絡(luò)傳輸數(shù)據(jù)記錄, 那么就需要將數(shù)據(jù)序列化之后寫入NetworkBufferPool,然后下層的Task讀出之后再進(jìn)行反序列化操作,最后進(jìn)行邏輯處理。為了使得記錄以及事件能夠被寫入 Buffer,隨后在消費時再從Buffer中讀出,
Flink提供了數(shù)據(jù)記錄序列化器RecordSerializer與反序列化器RecordDeserializer以及事件序列化器EventSerializer。

Function發(fā)送的數(shù)據(jù)被封裝成SerializationDelegate,它將任意元素公開為IOReadableWritable以進(jìn)行序列化,通過setInstance()來傳入要序列化的數(shù)據(jù)。在Flink通信層的序列化中,有幾個問題值得關(guān)注,具體如下:
【1】何時確定Function的輸入輸出類型?
Flink 數(shù)據(jù)序列化,Flink,flink,大數(shù)據(jù),職場和發(fā)展,java,后端,算法,開發(fā)語言

在構(gòu)建StreamTransformation的時候通過TypeExtractor工具確定Function的輸入輸出類型。TypeExtractor類可以根據(jù)方法簽名、子類信息等蛛絲馬跡自動提取或恢復(fù)類型信息。
【2】何時確定Function的序列化 / 反序列化器?
構(gòu)造StreamGraph時, 通過TypeInfomationcreateSerializer()方法獲取對應(yīng)類型的序列化器TypeSerializer,并在addOperator()的過程中執(zhí)行setSerializers() 操作,設(shè)置StreamConfigTYPESERIALIZERIN1、TYPESERIALIZERIN2TYPESERIALIZEROUT_1屬性。
【3】何時進(jìn)行真正的序列化 / 反序列化操作? 這個過程與TypeSerializer又是怎么聯(lián)系在一起的呢?
構(gòu)造StreamGraph時, 通過TypeInfomationcreateSerializer()方法獲取對應(yīng)類型的序列化器TypeSerializer,并在addOperator()的過程中執(zhí)行setSerializers()操作,設(shè)置StreamConfigTYPESERIALIZERIN1 、 TYPESERIALIZERIN2、 TYPESERIALIZEROUT_1屬性。
【4】何時進(jìn)行真正的序列化 / 反序列化操作? 這個過程與TypeSerializer又是怎么聯(lián)系在一起的呢?
Flink 數(shù)據(jù)序列化,Flink,flink,大數(shù)據(jù),職場和發(fā)展,java,后端,算法,開發(fā)語言

大家都應(yīng)該清楚TaskStreamTask兩個概念,Task是直接受TaskManager管理和調(diào)度的,而Task又會調(diào)用StreamTask,而StreamTask中真正封裝了算子的處理邏輯。在run()方法中,首先將反序列化后的數(shù)據(jù)封裝成StreamRecord交給算子處理;然后將處理結(jié)果通過Collector發(fā)送給下游 ( 在構(gòu)建Collector時已經(jīng)確定了SerializtionDelegate),并通過RecordWriter寫入器將序列化后的結(jié)果寫入DataOutput;最后序列化的操作交給SerializerDelegate處理,實際還是通過TypeSerializerserialize()方法完成。文章來源地址http://www.zghlxwxcb.cn/news/detail-760366.html

到了這里,關(guān)于Flink 數(shù)據(jù)序列化的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【Java 基礎(chǔ)篇】Java序列化與反序列化詳解

    在Java中,序列化和反序列化是一種將對象轉(zhuǎn)換為字節(jié)流和將字節(jié)流轉(zhuǎn)換為對象的機制。通過序列化,可以將對象存儲到文件中、傳輸?shù)骄W(wǎng)絡(luò)上,或者在分布式系統(tǒng)中進(jìn)行對象的傳遞。本文將詳細(xì)介紹Java序列化和反序列化的原理、使用方法和常見應(yīng)用場景,并提供一些示例代

    2024年02月09日
    瀏覽(24)
  • Java安全基礎(chǔ)之Java序列化與反序列化

    目錄 ObjectInputStream 和 ObjectOutputStream java.io.Serializable 自定義序列化和反序列化 Java 的序列化(Serialization)是指將對象轉(zhuǎn)換為字節(jié)序列的過程,而反序列化(Deserialization)則是將字節(jié)序列轉(zhuǎn)換回對象的過程。 序列化和反序列化通常用于在網(wǎng)絡(luò)上傳輸對象或者將對象持久化到文

    2024年04月22日
    瀏覽(18)
  • Java序列化和反序列化機制

    在閱讀 ArrayList 源碼的時候,注意到,其內(nèi)部的成員變量動態(tài)數(shù)組 elementData 被Java中的 transient 修飾 transient 意味著Java在序列化時會跳過該字段(不序列化該字段) 而Java在默認(rèn)情況下會序列化類(實現(xiàn)了 Java.io.Serializable 接口的類)的所有非瞬態(tài)(未被 transient 修飾

    2024年03月15日
    瀏覽(27)
  • 小迪安全47WEB 攻防-通用漏洞&Java 反序列化&EXP 生成&數(shù)據(jù)提取&組件安全

    小迪安全47WEB 攻防-通用漏洞&Java 反序列化&EXP 生成&數(shù)據(jù)提取&組件安全

    # 知識點: 1 、 Java 反序列化演示 - 原生 API 接口 2 、 Java 反序列化漏洞利用 -Ysoserial 使用 3 、 Java 反序列化漏洞發(fā)現(xiàn)利用點 - 函數(shù) 數(shù)據(jù) 4 、 Java 反序列化考點 - 真實 CTF 賽題 - 審計分析 # 內(nèi)容點: 1 、明白 -Java 反序列化原理 2 、判斷 -Java 反序列化漏洞 3 、學(xué)會 -Ysoserial 工具

    2024年04月10日
    瀏覽(60)
  • java中的序列化和反序列化

    objectOutputStream 對象的序列化,以流的形式將對象寫入文件 構(gòu)造方法: objectOutputStream(OutputStream out) 傳入一個字節(jié)輸入流創(chuàng)建objectOutputStream對象 成員方法: void writeObject(object obj) 將指定的對象寫入objectOutputStream 使用步驟: 創(chuàng)建一個類,這個類實現(xiàn)Serializable接口,Serializable是一

    2024年02月14日
    瀏覽(19)
  • Java中序列化和反序列化解釋

    在Java中,序列化(Serialization)是指將對象的狀態(tài)轉(zhuǎn)換為字節(jié)流的過程,以便將其保存到文件、在網(wǎng)絡(luò)中傳輸或持久化到數(shù)據(jù)庫中。而反序列化(Deserialization)則是將字節(jié)流轉(zhuǎn)換回對象的過程,恢復(fù)對象的狀態(tài)。 序列化和反序列化主要用于以下場景: 1. 對象持久化:通過序列

    2024年02月07日
    瀏覽(23)
  • Java 反序列化之 XStream 反序列化

    XStream 是一個簡單的基于 Java 庫,Java 對象序列化到 XML,反之亦然(即:可以輕易的將 Java 對象和 XML 文檔相互轉(zhuǎn)換)。 下面看下如何使用 XStream 進(jìn)行序列化和反序列化操作的。 先定義接口類 IPerson.java 接著定義 Person 類實現(xiàn)前面的接口: XStream 序列化是調(diào)用? XStream.toXML() ?來實

    2024年02月10日
    瀏覽(17)
  • [Java反序列化]—Shiro反序列化(一)

    [Java反序列化]—Shiro反序列化(一)

    IDEA搭建shiro550復(fù)現(xiàn)環(huán)境_普通網(wǎng)友的博客-CSDN博客 Apache Shiro框架提供了記住密碼的功能(RememberMe),用戶登錄成功后會生成經(jīng)過加密并編碼的cookie。在服務(wù)端對rememberMe的cookie值,先base64解碼然后AES解密再反序列化,就導(dǎo)致了反序列化RCE漏洞。 那么,Payload產(chǎn)生的過程: 命令

    2024年02月06日
    瀏覽(25)
  • Java反序列化和PHP反序列化的區(qū)別

    Java反序列化和PHP反序列化的區(qū)別

    反序列化存在的意義是為了數(shù)據(jù)傳輸,類是無法直接進(jìn)行傳輸?shù)摹Mㄟ^序列化后轉(zhuǎn)換為字符串格式或者JSON格式進(jìn)行傳輸 。 序列化與反序列化 seriallization 序列化 : 將對象轉(zhuǎn)化為便于傳輸?shù)母袷剑?常見的序列化格式:二進(jìn)制格式,字節(jié)數(shù)組,json字符串,xml字符串。 deseriall

    2024年02月07日
    瀏覽(19)
  • Java反序列化:URLDNS的反序列化調(diào)試分析

    URLDNS鏈子是Java反序列化分析的第0課,網(wǎng)上也有很多優(yōu)質(zhì)的分析文章。 筆者作為Java安全初學(xué)者,也從0到1調(diào)試了一遍,現(xiàn)在給出調(diào)試筆記。 Java原生鏈序列化:利用Java.io.ObjectInputStream對象輸出流的writerObject方法實現(xiàn)Serializable接口,將對象轉(zhuǎn)化成字節(jié)序列。 Java原生鏈反序列化

    2024年02月15日
    瀏覽(14)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包