国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink 內(nèi)容分享(四):Fink原理、實(shí)戰(zhàn)與性能優(yōu)化(四)

這篇具有很好參考價(jià)值的文章主要介紹了Flink 內(nèi)容分享(四):Fink原理、實(shí)戰(zhàn)與性能優(yōu)化(四)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

目錄

Transformations

Sink

分區(qū)策略


Transformations

Transformations算子可以將一個(gè)或者多個(gè)算子轉(zhuǎn)換成一個(gè)新的數(shù)據(jù)流,使用Transformations算子組合可以處理復(fù)雜的業(yè)務(wù)處理。

Map

DataStream → DataStream

遍歷數(shù)據(jù)流中的每一個(gè)元素,產(chǎn)生一個(gè)新的元素。

FlatMap

DataStream → DataStream

遍歷數(shù)據(jù)流中的每一個(gè)元素,產(chǎn)生N個(gè)元素 N=0,1,2......。

Filter

DataStream → DataStream

過(guò)濾算子,根據(jù)數(shù)據(jù)流的元素計(jì)算出一個(gè)boolean類型的值,true代表保留,false代表過(guò)濾掉。

KeyBy

DataStream → KeyedStream

根據(jù)數(shù)據(jù)流中指定的字段來(lái)分區(qū),相同指定字段值的數(shù)據(jù)一定是在同一個(gè)分區(qū)中,內(nèi)部分區(qū)使用的是HashPartitioner。

指定分區(qū)字段的方式有三種:

  • 根據(jù)索引號(hào)指定。

  • 通過(guò)匿名函數(shù)來(lái)指定。

  • 通過(guò)實(shí)現(xiàn)KeySelector接口 ?指定分區(qū)字段。

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStreamSource<Long>?stream?=?env.fromSequence(1,?100);
????????stream.map((MapFunction<Long,?Tuple2<Long,?Integer>>)?(Long?x)?->?new?Tuple2<>(x?%?3,?1),?TypeInformation.of(new?TypeHint<Tuple2<Long,?Integer>>()?{}))
????????????????//根據(jù)索引號(hào)來(lái)指定分區(qū)字段:.keyBy(0)
????????????????//通過(guò)傳入匿名函數(shù)?指定分區(qū)字段:.keyBy(x=>x._1)
????????????????//通過(guò)實(shí)現(xiàn)KeySelector接口??指定分區(qū)字段????????????????
????????????????.keyBy((KeySelector<Tuple2<Long,?Integer>,?Long>)?(Tuple2<Long,?Integer>?value)?->?value.f0,?BasicTypeInfo.LONG_TYPE_INFO)
????????????????.sum(1).print();
????????env.execute("Flink?Job");
????}

Reduce

適用于KeyedStream

KeyedStream:根據(jù)key分組 → DataStream

注意,reduce是基于分區(qū)后的流對(duì)象進(jìn)行聚合,也就是說(shuō),DataStream類型的對(duì)象無(wú)法調(diào)用reduce方法

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Tuple2<String,?Integer>>?dataStream?=?env.fromElements(
????????????????new?Tuple2<>("apple",?3),
????????????????new?Tuple2<>("banana",?1),
????????????????new?Tuple2<>("apple",?5),
????????????????new?Tuple2<>("banana",?2),
????????????????new?Tuple2<>("apple",?4)
????????);
????????//?使用reduce操作,將input中的所有元素合并到一起
????????DataStream<Tuple2<String,?Integer>>?result?=?dataStream
????????????????.keyBy(0)
????????????????.reduce((ReduceFunction<Tuple2<String,?Integer>>)?(value1,?value2)?->?new?Tuple2<>(value1.f0,?value1.f1?+?value2.f1));
????????result.print();
????????env.execute();
????}

Aggregations

KeyedStream → DataStream

Aggregations代表的是一類聚合算子,上面說(shuō)的reduce就屬于Aggregations,以下是一些常用的:

  • sum(): 計(jì)算數(shù)字類型字段的總和。

  • min(): 計(jì)算最小值。

  • max(): 計(jì)算最大值。

  • count(): 計(jì)數(shù)元素個(gè)數(shù)。

  • avg(): 計(jì)算平均值。

另外,F(xiàn)link 還支持自定義聚合函數(shù),即使用?AggregateFunction?接口實(shí)現(xiàn)更復(fù)雜的聚合邏輯。

Union 真合并

DataStream → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams

合并兩個(gè)或者更多的數(shù)據(jù)流產(chǎn)生一個(gè)新的數(shù)據(jù)流,這個(gè)新的數(shù)據(jù)流中包含了所合并的數(shù)據(jù)流的元素

注意:需要保證數(shù)據(jù)流中元素類型一致

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Tuple2<String,?Integer>>?ds1?=?env.fromCollection(Arrays.asList(Tuple2.of("a",1),Tuple2.of("b",2),Tuple2.of("c",3)));
????????DataStream<Tuple2<String,?Integer>>?ds2?=?env.fromCollection(Arrays.asList(Tuple2.of("d",4),Tuple2.of("e",5),Tuple2.of("f",6)));
????????DataStream<Tuple2<String,?Integer>>?ds3?=?env.fromCollection(Arrays.asList(Tuple2.of("g",7),Tuple2.of("h",8)));
????????DataStream<Tuple2<String,?Integer>>?unionStream?=?ds1.union(ds2,ds3);
????????unionStream.print();
????????env.execute();
????}

在 Flink 中,Union 操作被稱為 "真合并" 是因?yàn)樗鼘蓚€(gè)或多個(gè)數(shù)據(jù)流完全融合在一起,沒(méi)有特定的順序,并且不會(huì)去除重復(fù)項(xiàng)。這種操作方式類似于在數(shù)學(xué)概念中的集合聯(lián)合(Union)操作,所以被稱為 "真合并"。

請(qǐng)注意,與其他一些數(shù)據(jù)處理框架中的 Union 操作相比,例如 Spark 中的 Union 會(huì)根據(jù)某些條件去除重復(fù)的元素,F(xiàn)link 的 Union 行為更接近于數(shù)學(xué)上的集合聯(lián)合理論。

Connect 假合并

DataStream,DataStream → ConnectedStreams

合并兩個(gè)數(shù)據(jù)流并且保留兩個(gè)數(shù)據(jù)流的數(shù)據(jù)類型,能夠共享兩個(gè)流的狀態(tài)

public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();

????????DataStream<String>?ds1?=?env.socketTextStream("localhost",?8888);
????????DataStream<String>?ds2?=?env.socketTextStream("localhost",?9999);

????????DataStream<Tuple2<String,?Integer>>?wcStream1?=?ds1
????????????????.flatMap(new?Tokenizer())
????????????????.keyBy(value?->?value.f0)
????????????????.sum(1);

????????DataStream<Tuple2<String,?Integer>>?wcStream2?=?ds2
????????????????.flatMap(new?Tokenizer())
????????????????.keyBy(value?->?value.f0)
????????????????.sum(1);

????????ConnectedStreams<Tuple2<String,?Integer>,?Tuple2<String,?Integer>>?connectedStreams?=?wcStream1.connect(wcStream2);
????}

union不同,connect只能連接兩個(gè)流,并且這兩個(gè)流的類型可以不同。connect后的兩個(gè)流會(huì)被看作是兩個(gè)不同的流,可以使用CoMap或者CoFlatMap函數(shù)分別處理這兩個(gè)流。

CoMap, CoFlatMap

ConnectedStreams → DataStream

CoMap, CoFlatMap并不是具體算子名字,而是一類操作的名稱

  • 凡是基于ConnectedStreams數(shù)據(jù)流做map遍歷,這類操作叫做CoMap。

  • 凡是基于ConnectedStreams數(shù)據(jù)流做flatMap遍歷,這類操作叫做CoFlatMap。

CoMap實(shí)現(xiàn):

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????//?創(chuàng)建兩個(gè)不同的數(shù)據(jù)流
????????DataStream<Integer>?nums?=?env.fromElements(1,?2,?3,?4,?5);
????????DataStream<String>?text?=?env.fromElements("a",?"b",?"c");
????????//?連接兩個(gè)數(shù)據(jù)流
????????ConnectedStreams<Integer,?String>?connected?=?nums.connect(text);
????????//?使用?CoMap?處理連接的流
????????DataStream<String>?result?=?connected.map(new?CoMapFunction<Integer,?String,?String>()?{
????????????@Override
????????????public?String?map1(Integer?value)?{
????????????????return?String.valueOf(value*2);
????????????}
????????????@Override
????????????public?String?map2(String?value)?{
????????????????return?"hello?"?+?value;
????????????}
????????});
????????result.print();
????????env.execute("CoMap?example");
????}

CoFlatMap實(shí)現(xiàn)方式:

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Integer>?nums?=?env.fromElements(1,?2,?3,?4,?5);
????????DataStream<String>?text?=?env.fromElements("a",?"b",?"c");
????????ConnectedStreams<Integer,?String>?connected?=?nums.connect(text);
????????DataStream<String>?result?=?connected.flatMap(new?CoFlatMapFunction<Integer,?String,?String>()?{
????????????@Override
????????????public?void?flatMap1(Integer?value,?Collector<String>?out)?{
????????????????out.collect(String.valueOf(value*2));
????????????????out.collect(String.valueOf(value*3));
????????????}
????????????@Override
????????????public?void?flatMap2(String?value,?Collector<String>?out)?{
????????????????out.collect("hello?"?+?value);
????????????????out.collect("hi?"?+?value);
????????????}
????????});
????????result.print();
????????env.execute("CoFlatMap?example");
????}

Split/Select

DataStream → SplitStream

根據(jù)條件將一個(gè)流分成多個(gè)流,示例代碼如下:

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStreamSource<Long>?data?=?env.generateSequence(0,?10);
????????SplitStream<Long>?split?=?data.split((OutputSelector<Long>)?value?->?{
????????????List<String>?output?=?new?ArrayList<>();
????????????if?(value?%?2?==?0)?{
????????????????output.add("even");
????????????}?else?{
????????????????output.add("odd");
????????????}
????????????return?output;
????????});
????????split.select("odd").print();
????????env.execute("Flink?SplitStream?Example");
????}

select()用于從SplitStream中選擇一個(gè)或者多個(gè)數(shù)據(jù)流。

split.select("odd").print();

SideOutput

注意:在Flink 1.12 及之后的版本中,SplitStream 已經(jīng)被棄用并移除,一般推薦使用 Side Outputs(側(cè)輸出流)來(lái)替代 Split和Select

示例代碼如下:

private?static?final?OutputTag<String>?evenOutput?=?new?OutputTag<String>("even"){};
private?static?final?OutputTag<String>?oddOutput?=?new?OutputTag<String>("odd"){};

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?input?=?env.fromElements("1",?"2",?"3",?"4",?"5");
????????SingleOutputStreamOperator<String>?processed?=?input.process(new?ProcessFunction<String,?String>()?{
????????????@Override
????????????public?void?processElement(String?value,?Context?ctx,?Collector<String>?out){
????????????????int?i?=?Integer.parseInt(value);
????????????????if?(i?%?2?==?0)?{
????????????????????ctx.output(evenOutput,?value);
????????????????}?else?{
????????????????????ctx.output(oddOutput,?value);
????????????????}
????????????}
????????});
????????DataStream<String>?evenStream?=?processed.getSideOutput(evenOutput);
????????DataStream<String>?oddStream?=?processed.getSideOutput(oddOutput);
????????evenStream.print("even");
????????oddStream.print("odd");
????????env.execute("Side?Output?Example");
????}

Iterate

DataStream → IterativeStream → DataStream

Iterate算子提供了對(duì)數(shù)據(jù)流迭代的支持

一個(gè)數(shù)據(jù)集通過(guò)迭代運(yùn)算符被劃分為兩部分:“反饋”部分(feedback)和“輸出”部分(output)。反饋部分被反饋到迭代頭(iteration head),從而形成下一次迭代。輸出部分則構(gòu)成該迭代的結(jié)果:

public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Long>?input?=?env.fromElements(10L);
????????//?定義迭代流,最大迭代10次
????????IterativeStream<Long>?iteration?=?input.iterate(10000L);
????????//?定義迭代邏輯
????????DataStream<Long>?minusOne?=?iteration.map((MapFunction<Long,?Long>)?value?->?value?-?1);
????????//?定義反饋流(滿足條件繼續(xù)迭代)和輸出流(不滿足條件的結(jié)果)
????????DataStream<Long>?stillGreaterThanZero?=?minusOne.filter(value?->?value?>?0).setParallelism(1);;
????????DataStream<Long>?lessThanZero?=?minusOne.filter(value?->?value?<=?0);
????????//?關(guān)閉迭代,定義反饋流
????????iteration.closeWith(stillGreaterThanZero);
????????//?打印結(jié)果
????????lessThanZero.print();
????????env.execute("Iterative?Stream?Example");
????}

普通函數(shù) & 富函數(shù)

Apache Flink 中有兩種類型的函數(shù): 「普通函數(shù)(Regular Functions)」和 「富函數(shù)(Rich Functions)」。主要區(qū)別在于富函數(shù)相比普通函數(shù)提供了更多生命周期方法和上下文信息。

  • 普通函數(shù):這些函數(shù)只需要覆蓋一個(gè)或幾個(gè)特定方法,如?MapFunction?需要實(shí)現(xiàn)?map()?方法。它們沒(méi)有生命周期方法,也不能訪問(wèn)執(zhí)行環(huán)境的上下文。

  • 富函數(shù):除了覆蓋特定函數(shù)外,富函數(shù)還提供了對(duì) Flink API 更多的控制和操作,包括:

    • 生命周期管理:可以覆蓋?open()?和?close()?方法以便在函數(shù)啟動(dòng)前和關(guān)閉后做一些設(shè)置或清理工作。

    • 獲取運(yùn)行時(shí)上下文信息:例如,通過(guò)?getRuntimeContext()?方法獲取并行任務(wù)的信息,如當(dāng)前子任務(wù)的索引等。

    • 狀態(tài)管理和容錯(cuò):可以定義和使用托管狀態(tài)(Managed State),這在構(gòu)建容錯(cuò)系統(tǒng)時(shí)非常重要。

簡(jiǎn)而言之,如果你需要在函數(shù)中使用 Flink 的高級(jí)功能,如狀態(tài)管理或訪問(wèn)運(yùn)行時(shí)上下文,則需要使用富函數(shù)。如果不需要這些功能,使用普通函數(shù)即可。

普通函數(shù)類 富函數(shù)類
MapFunction RichMapFunction
FlatMapFunction RichFlatMapFunction
FilterFunction RichFilterFunction
...... ......

普通函數(shù):

public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????List<String>?words?=?Arrays.asList("hello",?"world",?"flink",?"hello",?"world");
????????env.fromCollection(words)
????????????????.map(new?MapFunction<String,?Tuple2<String,?Integer>>()?{
????????????????????@Override
????????????????????public?Tuple2<String,?Integer>?map(String?value)?{
????????????????????????return?new?Tuple2<>(value,?1);
????????????????????}
????????????????})
????????????????.keyBy(0)
????????????????.sum(1)
????????????????.print();

????????env.execute("Word?Count?Example");
????}

富函數(shù):

public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????List<String>?words?=?Arrays.asList("hello",?"world",?"flink",?"hello",?"world");
????????env.fromCollection(words)
????????????????.map(new?RichMapFunction<String,?Tuple2<String,?Integer>>()?{
????????????????????@Override
????????????????????public?void?open(Configuration?parameters)?throws?Exception?{
????????????????????????super.open(parameters);
????????????????????????//?可以在這里設(shè)置相關(guān)的配置或者資源,如數(shù)據(jù)庫(kù)連接等
????????????????????}
????????????????????@Override
????????????????????public?Tuple2<String,?Integer>?map(String?value)?throws?Exception?{
????????????????????????return?new?Tuple2<>(value,?1);
????????????????????}
????????????????????@Override
????????????????????public?void?close()?throws?Exception?{
????????????????????????super.close();
????????????????????????//?可以在這里完成資源的清理工作
????????????????????}
????????????????})
????????????????.keyBy(0)
????????????????.sum(1)
????????????????.print();
????????env.execute("Word?Count?Example");
????}

ProcessFunction(處理函數(shù))

ProcessFunction屬于低層次的API,在類繼承關(guān)系上屬于富函數(shù)。

我們前面講的map、filterflatMap等算子都是基于這層封裝出來(lái)的。

越低層次的API,功能越強(qiáng)大,用戶能夠獲取的信息越多,比如可以拿到元素狀態(tài)信息、事件時(shí)間、設(shè)置定時(shí)器等

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?dataStream?=?env.socketTextStream("localhost",?9999)
????????????????.map(new?MapFunction<String,?Tuple2<String,?Integer>>()?{
????????????????????@Override
????????????????????public?Tuple2<String,?Integer>?map(String?value)?{
????????????????????????return?new?Tuple2<>(value,?1);
????????????????????}
????????????????})
????????????????.keyBy(0)
????????????????.process(new?AlertFunction());
????????dataStream.print();
????????env.execute("Process?Function?Example");
????}

????public?static?class?AlertFunction?extends?KeyedProcessFunction<Tuple,?Tuple2<String,?Integer>,?String>?{
????????private?transient?ValueState<Integer>?countState;
????????@Override
????????public?void?open(Configuration?config)?{
????????????ValueStateDescriptor<Integer>?descriptor?=
????????????????????new?ValueStateDescriptor<>(
????????????????????????????"countState",?//?state?name
????????????????????????????TypeInformation.of(new?TypeHint<Integer>()?{}),?//?type?information
????????????????????????????0);?//?default?value
????????????countState?=?getRuntimeContext().getState(descriptor);
????????}
????????@Override
????????public?void?processElement(Tuple2<String,?Integer>?value,?Context?ctx,?Collector<String>?out)?throws?Exception?{
????????????Integer?currentCount?=?countState.value();
????????????currentCount?+=?1;
????????????countState.update(currentCount);
????????????if?(currentCount?>=?3)?{
????????????????out.collect("Warning!?The?key?'"?+?value.f0?+?"'?has?been?seen?"?+?currentCount?+?"?times.");
????????????}
????????}
????}

這里,我們創(chuàng)建一個(gè)名為AlertFunction的處理函數(shù)類,并繼承KeyedProcessFunction。其中,ValueState用于保存狀態(tài)信息,每個(gè)鍵會(huì)有其自己的狀態(tài)實(shí)例。當(dāng)計(jì)數(shù)達(dá)到或超過(guò)三次時(shí),該系統(tǒng)將發(fā)出警告。這個(gè)例子主要展示了處理函數(shù)與其他運(yùn)算符相比的兩個(gè)優(yōu)點(diǎn):訪問(wèn)鍵控狀態(tài)和生命周期管理方法(例如open())。

注意:上述示例假設(shè)你已經(jīng)在本地的9999端口上設(shè)置了一個(gè)socket服務(wù)器,用于流式傳輸文本數(shù)據(jù)。如果沒(méi)有,你需要替換這部分以適應(yīng)你的輸入源。

Sink

在Flink中,"Sink"是數(shù)據(jù)流計(jì)算的最后一步。它代表了一個(gè)輸出端點(diǎn),在那里計(jì)算結(jié)果被發(fā)送或存儲(chǔ)。換句話說(shuō),Sink是數(shù)據(jù)流處理過(guò)程中的結(jié)束節(jié)點(diǎn),負(fù)責(zé)將處理后的數(shù)據(jù)輸出到外部系統(tǒng),如數(shù)據(jù)庫(kù)、文件、消息隊(duì)列等。

Flink內(nèi)置了大量Sink,可以將Flink處理后的數(shù)據(jù)輸出到HDFS、kafka、Redis、ES、MySQL等。

Redis Sink

Flink處理的數(shù)據(jù)可以存儲(chǔ)到Redis中,以便實(shí)時(shí)查詢。

首先,需要導(dǎo)入Flink和Redis的連接器依賴:

<!--?Flink?Redis?connector?-->
????????<dependency>
????????????<groupId>org.apache.bahir</groupId>
????????????<artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
????????????<version>1.1.0</version>
????????</dependency>

下面的代碼展示了"Word Count"(詞頻統(tǒng)計(jì))操作,并將結(jié)果存儲(chǔ)到Redis數(shù)據(jù)庫(kù)中:

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?text?=?env.fromElements(
????????????????"Hello?World",
????????????????"Hello?Flink",
????????????????"Hello?Java");
????????DataStream<Tuple2<String,?Integer>>?counts?=
????????????????text.flatMap(new?Tokenizer())
????????????????????????.keyBy(value?->?value.f0)
????????????????????????.sum(1);
????????FlinkJedisPoolConfig?conf?=?new?FlinkJedisPoolConfig.Builder().setHost("localhost").build();
????????counts.addSink(new?RedisSink<>(conf,?new?RedisExampleMapper()));
????????env.execute("Word?Count?Example");
????}

????public?static?final?class?Tokenizer?implements?FlatMapFunction<String,?Tuple2<String,?Integer>>?{
????????@Override
????????public?void?flatMap(String?value,?Collector<Tuple2<String,?Integer>>?out)?{
????????????String[]?words?=?value.toLowerCase().split("\\W+");

????????????for?(String?word?:?words)?{
????????????????if?(word.length()?>?0)?{
????????????????????out.collect(new?Tuple2<>(word,?1));
????????????????}
????????????}
????????}
????}

????public?static?final?class?RedisExampleMapper?implements?RedisMapper<Tuple2<String,?Integer>>?{
????????@Override
????????public?RedisCommandDescription?getCommandDescription()?{
????????????return?new?RedisCommandDescription(RedisCommand.HSET);
????????}
????????@Override
????????public?String?getKeyFromData(Tuple2<String,?Integer>?data)?{
????????????return?data.f0;
????????}
????????@Override
????????public?String?getValueFromData(Tuple2<String,?Integer>?data)?{
????????????return?data.f1.toString();
????????}
????}

Kafka Sink

處理結(jié)果寫(xiě)入到kafka topic中,F(xiàn)link也是支持的,需要添加連接器依賴,跟讀取kafka數(shù)據(jù)用的連接器依賴相同,之前添加過(guò)就不需要再添加了。

<dependency>
????<groupId>org.apache.flink</groupId>
????<artifactId>flink-connector-kafka_2.12</artifactId>
????<version>1.13.6</version>
</dependency>

還是用上面詞頻統(tǒng)計(jì)的例子:

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?text?=?env.fromElements(
????????????????"Hello?World",
????????????????"Hello?Flink",
????????????????"Hello?Java");
????????DataStream<Tuple2<String,?Integer>>?counts?=
????????????????text.flatMap(new?Tokenizer())
????????????????????????.keyBy(value?->?value.f0)
????????????????????????.sum(1);
????????//?Define?Kafka?properties
????????Properties?properties?=?new?Properties();
????????properties.setProperty("bootstrap.servers",?"localhost:9092");
????????//?Write?the?data?stream?to?Kafka
????????counts.map(new?MapFunction<Tuple2<String,Integer>,?String>()?{
????????????????????@Override
????????????????????public?String?map(Tuple2<String,Integer>?value)?throws?Exception?{
????????????????????????return?value.f0?+?","?+?value.f1.toString();
????????????????????}
????????????????})
????????????????.addSink(new?FlinkKafkaProducer<>("my-topic",?new?SimpleStringSchema(),?properties));
????????env.execute("Word?Count?Example");
????}

MySQL Sink

Flink處理結(jié)果寫(xiě)入到MySQL中,這并不是Flink默認(rèn)支持的,需要添加MySQL的驅(qū)動(dòng)依賴:

<!--?https://mvnrepository.com/artifact/mysql/mysql-connector-java?-->
<dependency>
????<groupId>mysql</groupId>
????<artifactId>mysql-connector-java</artifactId>
????<version>8.0.28</version>
</dependency>

因?yàn)椴皇莾?nèi)嵌支持的,所以需要基于SinkFunction自定義Sink。

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?text?=?env.fromElements(
????????????????"Hello?World",
????????????????"Hello?Flink",
????????????????"Hello?Java");
????????DataStream<Tuple2<String,?Integer>>?counts?=
????????????????text.flatMap(new?Tokenizer())
????????????????????????.keyBy(value?->?value.f0)
????????????????????????.sum(1);
????????//?Transform?the?Tuple2<String,?Integer>?to?a?format?acceptable?by?MySQL
????????DataStream<String>?mysqlData?=?counts.map(new?MapFunction<Tuple2<String,?Integer>,?String>()?{
????????????@Override
????????????public?String?map(Tuple2<String,?Integer>?value)?throws?Exception?{
????????????????return?"'"?+?value.f0?+?"',"?+?value.f1.toString();
????????????}
????????});
????????//?Write?the?data?stream?to?MySQL
????????mysqlData.addSink(new?MySqlSink());
????????env.execute("Word?Count?Example");
????}

????public?static?class?MySqlSink?implements?SinkFunction<String>?{
????????private?Connection?connection;
????????private?PreparedStatement?preparedStatement;

????????@Override
????????public?void?invoke(String?value,?Context?context)?throws?Exception?{
????????????if(connection?==?null)?{
????????????????connection?=?DriverManager.getConnection("jdbc:mysql://localhost:3306/test",?"username",?"password");
????????????????preparedStatement?=?connection.prepareStatement("INSERT?INTO?my_table(word,?count)?VALUES("+?value?+")");
????????????}
????????????preparedStatement.executeUpdate();
????????}
????}
}

HBase Sink

需要導(dǎo)入HBase的依賴:

????????<!--?https://mvnrepository.com/artifact/org.apache.hbase/hbase-client?-->
????????<dependency>
????????????<groupId>org.apache.hbase</groupId>
????????????<artifactId>hbase-client</artifactId>
????????????<version>2.5.2</version>
????????</dependency>
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?text?=?env.fromElements(
????????????????"Hello?World",
????????????????"Hello?Flink",
????????????????"Hello?Java");
????????DataStream<Tuple2<String,?Integer>>?counts?=
????????????????text.flatMap(new?Tokenizer())
????????????????????????.keyBy(value?->?value.f0)
????????????????????????.sum(1);
????????counts.addSink(new?HBaseSink());
????????env.execute("Word?Count?Example");
????}

????public?static?final?class?Tokenizer?implements?FlatMapFunction<String,?Tuple2<String,?Integer>>?{
????????@Override
????????public?void?flatMap(String?value,?Collector<Tuple2<String,?Integer>>?out)?{
????????????String[]?words?=?value.toLowerCase().split("\\W+");
????????????for?(String?word?:?words)?{
????????????????if?(word.length()?>?0)?{
????????????????????out.collect(new?Tuple2<>(word,?1));
????????????????}
????????????}
????????}
????}

????public?static?class?HBaseSink?extends?RichSinkFunction<Tuple2<String,?Integer>>?{
????????private?org.apache.hadoop.conf.Configuration?config;
????????private?org.apache.hadoop.hbase.client.Connection?connection;
????????private?Table?table;
????????@Override
????????public?void?invoke(Tuple2<String,?Integer>?value,?Context?context)?throws?IOException?{
????????????Put?put?=?new?Put(Bytes.toBytes(value.f0));
????????????put.addColumn(Bytes.toBytes("cf"),?Bytes.toBytes("count"),?Bytes.toBytes(value.f1));
????????????table.put(put);
????????}

????????@Override
????????public?void?open(Configuration?parameters)?throws?Exception?{
????????????config?=?HBaseConfiguration.create();
????????????config.set("hbase.zookeeper.quorum",?"localhost");
????????????config.set("hbase.zookeeper.property.clientPort",?"2181");
????????????connection?=?ConnectionFactory.createConnection(config);
????????????table?=?connection.getTable(TableName.valueOf("my-table"));
????????}

????????@Override
????????public?void?close()?throws?Exception?{
????????????table.close();
????????????connection.close();
????????}
????}

HBaseSink類是RichSinkFunction的實(shí)現(xiàn),用于將結(jié)果寫(xiě)入HBase數(shù)據(jù)庫(kù)。在invoke方法中,它將接收到的每個(gè)二元組(單詞和計(jì)數(shù))寫(xiě)入HBase。在open方法中,它創(chuàng)建了與HBase的連接,并指定了要寫(xiě)入的表。在close方法中,它關(guān)閉了與HBase的連接和表。

分區(qū)策略

在 Apache Flink 中,分區(qū)(Partitioning)是將數(shù)據(jù)流按照一定的規(guī)則劃分成多個(gè)子數(shù)據(jù)流或分片,以便在不同的并行任務(wù)或算子中并行處理數(shù)據(jù)。分區(qū)是實(shí)現(xiàn)并行計(jì)算和數(shù)據(jù)流處理的基礎(chǔ)機(jī)制。Flink 的分區(qū)決定了數(shù)據(jù)在作業(yè)中的流動(dòng)方式,以及在并行任務(wù)之間如何分配和處理數(shù)據(jù)。

在 Flink 中,數(shù)據(jù)流可以看作是一個(gè)有向圖,圖中的節(jié)點(diǎn)代表算子(Operators),邊代表數(shù)據(jù)流(Data Streams)。數(shù)據(jù)從源算子流向下游算子,這些算子可能并行地處理輸入數(shù)據(jù),而分區(qū)就是決定數(shù)據(jù)如何從一個(gè)算子傳遞到另一個(gè)算子的機(jī)制。

下面介紹Flink中常用的幾種分區(qū)策略。

shuffle

場(chǎng)景:增大分區(qū)、提高并行度,解決數(shù)據(jù)傾斜。

DataStream → DataStream

分區(qū)元素隨機(jī)均勻分發(fā)到下游分區(qū),網(wǎng)絡(luò)開(kāi)銷比較大

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Long>?stream?=?env.fromSequence(1,?10).setParallelism(1);
????????System.out.println(stream.getParallelism());
????????stream.shuffle().print();
????????env.execute();
????}

輸出結(jié)果:上游數(shù)據(jù)比較隨意地分發(fā)到下游

1>?7
7>?1
2>?8
4>?5
8>?3
1>?9
8>?4
8>?10
6>?2
6>?6

rebalance

場(chǎng)景:增大分區(qū)、提高并行度,解決數(shù)據(jù)傾斜

DataStream → DataStream

輪詢分區(qū)元素,均勻的將元素分發(fā)到下游分區(qū),下游每個(gè)分區(qū)的數(shù)據(jù)比較均勻,在發(fā)生數(shù)據(jù)傾斜時(shí)非常有用,網(wǎng)絡(luò)開(kāi)銷比較大

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Long>?stream?=?env.fromSequence(1,?10).setParallelism(1);
????????System.out.println(stream.getParallelism());
????????stream.rebalance().print();
????????env.execute();
????}

輸出:上游數(shù)據(jù)比較均勻的分發(fā)到下游

2>?2
1>?1
8>?8
5>?5
7>?7
4>?4
3>?3
6>?6
1>?9
2>?10

rescale

場(chǎng)景:減少分區(qū),防止發(fā)生大量的網(wǎng)絡(luò)傳輸,不會(huì)發(fā)生全量的重分區(qū)

DataStream → DataStream

通過(guò)輪詢分區(qū)元素,將一個(gè)元素集合從上游分區(qū)發(fā)送給下游分區(qū),發(fā)送單位是集合,而不是一個(gè)個(gè)元素

和其他重分區(qū)策略(如 rebalance、forward、broadcast 等)不同的是,rescale 在運(yùn)行時(shí)不會(huì)改變并行度,而且它只在本地(同一個(gè) TaskManager 內(nèi))進(jìn)行數(shù)據(jù)交換,所以它比其他重分區(qū)策略更加高效

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();

????????DataStream<String>?dataStream?=?env.fromElements("1",?"2",?"3",?"4",?"5");

????????//?使用MapFunction將元素轉(zhuǎn)換為整數(shù)類型
????????DataStream<Integer>?intStream?=?dataStream.map(new?MapFunction<String,?Integer>()?{
????????????@Override
????????????public?Integer?map(String?value)?{
????????????????return?Integer.parseInt(value);
????????????}
????????});
????????//?使用rescale()進(jìn)行重分區(qū)
????????DataStream<Integer>?rescaledStream?=?intStream.rescale();
????????rescaledStream.print();
????????env.execute("Rescale?Example");
????}

在這個(gè)例子中,我們創(chuàng)建了一個(gè)字符串類型的DataStream然后通過(guò)map()將每一個(gè)元素轉(zhuǎn)換為整數(shù)。然后,我們對(duì)結(jié)果DataStream應(yīng)用rescale()操作來(lái)重分區(qū)數(shù)據(jù)。

值得注意的是,rescale()的實(shí)際影響取決于你的并行度和集群環(huán)境,如果不同的并行實(shí)例都在同一臺(tái)機(jī)器上,或者并行度只有1,那么可能不會(huì)看到rescale()的效果。而在大規(guī)模并行處理的情況下,使用rescale()操作可以提高數(shù)據(jù)處理的效率。

此外,我們不能直接在打印結(jié)果中看到rescale的影響,因?yàn)樗淖兊氖莾?nèi)部數(shù)據(jù)分布和處理方式,而不是輸出的結(jié)果。如果想觀察rescale的作用,需要通過(guò)Flink的Web UI或者日志來(lái)查看任務(wù)執(zhí)行情況,如數(shù)據(jù)流的分布、各個(gè)子任務(wù)的運(yùn)行狀態(tài)等信息。

broadcast

場(chǎng)景:需要使用映射表、并且映射表會(huì)經(jīng)常發(fā)生變動(dòng)的場(chǎng)景

DataStream → DataStream

上游中每一個(gè)元素內(nèi)容廣播到下游每一個(gè)分區(qū)中

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Integer>?dataStream?=?env.fromElements(1,?2,?3,?4,?5);
????????DataStream<String>?broadcastStream?=?env.fromElements("2",?"4");
????????MapStateDescriptor<String,?String>?descriptor?=?new?MapStateDescriptor<>(
????????????????"RulesBroadcastState",
????????????????BasicTypeInfo.STRING_TYPE_INFO,
????????????????BasicTypeInfo.STRING_TYPE_INFO);
????????BroadcastStream<String>?broadcastData?=?broadcastStream.broadcast(descriptor);
????????dataStream.connect(broadcastData)
????????????????.process(new?BroadcastProcessFunction<Integer,?String,?String>()?{
????????????????????@Override
????????????????????public?void?processElement(Integer?value,?ReadOnlyContext?ctx,?Collector<String>?out)?throws?Exception?{
????????????????????????if?(ctx.getBroadcastState(descriptor).contains(String.valueOf(value)))?{
????????????????????????????out.collect("Value?"?+?value?+?"?matches?with?a?broadcasted?rule");
????????????????????????}
????????????????????}
????????????????????@Override
????????????????????public?void?processBroadcastElement(String?rule,?Context?ctx,?Collector<String>?out)?throws?Exception?{
????????????????????????ctx.getBroadcastState(descriptor).put(rule,?rule);
????????????????????}
????????????????}).print();
????????env.execute("Broadcast?State?Example");
????}

上述代碼首先定義了一個(gè)主流和一個(gè)要廣播的流。然后,我們創(chuàng)建了一個(gè)MapStateDescriptor,用于存儲(chǔ)廣播數(shù)據(jù)。接著,我們將廣播流轉(zhuǎn)換為BroadcastStream。

最后,我們使用connect()方法連接主流和廣播流,并執(zhí)行process()方法。在這個(gè)process()方法中,我們定義了兩個(gè)處理函數(shù):processElement()processBroadcastElement()。processElement()用于處理主流中的每個(gè)元素,并檢查該元素是否存在于廣播狀態(tài)中。如果是,則輸出一個(gè)字符串,表明匹配成功。而processBroadcastElement()則用于處理廣播流中的每個(gè)元素,并將其添加到廣播狀態(tài)中。

注意:在分布式計(jì)算環(huán)境中,每個(gè)并行實(shí)例都會(huì)接收廣播流中的所有元素。因此,廣播狀態(tài)對(duì)于所有的并行實(shí)例都是一樣的。不過(guò),在Flink 1.13版本中,廣播狀態(tài)尚未在故障恢復(fù)中提供完全的保障。所以在事件出現(xiàn)故障時(shí),廣播狀態(tài)可能會(huì)丟失數(shù)據(jù)。

global

場(chǎng)景:并行度降為1

DataStream → DataStream

在 Apache Flink 中,Global 分區(qū)策略意味著所有數(shù)據(jù)都被發(fā)送到下游算子的同一個(gè)分區(qū)中。這種情況下,下游算子只有一個(gè)任務(wù)處理全部數(shù)據(jù)。這是一種特殊的分區(qū)策略,只有在下游算子能夠很快地處理所有數(shù)據(jù),或者需要全局排序或全局聚合時(shí)才會(huì)使用。

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????//?創(chuàng)建一個(gè)從1到100的數(shù)字流
????????DataStream<Long>?numberStream?=?env.fromSequence(1,?100);
????????//?對(duì)流應(yīng)用?map?function
????????DataStream<Long>?result?=?numberStream.global()
????????????????.map(new?MapFunction<Long,?Long>()?{
????????????????????@Override
????????????????????public?Long?map(Long?value)?{
????????????????????????System.out.println("Processing?"?+?value);
????????????????????????return?value?*?2;
????????????????????}
????????????????});
????????result.print();
????????env.execute("Global?Partition?Example");
????}

以上代碼創(chuàng)建了一個(gè)順序生成 1-100 的數(shù)字流,并應(yīng)用了 Global Partition,然后對(duì)每個(gè)數(shù)字進(jìn)行乘2的操作。實(shí)際運(yùn)行此代碼時(shí),你會(huì)觀察到所有的數(shù)字都由同一任務(wù)處理,打印出來(lái)的處理順序是連續(xù)的。這就是 Global Partition 的作用:所有數(shù)據(jù)都被發(fā)送到下游算子的同一實(shí)例進(jìn)行處理。

需要注意的是,此示例只是為了演示 Global Partition 的工作原理,實(shí)際上并不推薦在負(fù)載均衡很重要的應(yīng)用場(chǎng)景中使用這種分區(qū)策略,因?yàn)樗赡軐?dǎo)致嚴(yán)重的性能問(wèn)題。

forward

場(chǎng)景:一對(duì)一的數(shù)據(jù)分發(fā),默認(rèn)的分區(qū)策略,數(shù)據(jù)在各個(gè)算子之間不會(huì)重新分配。map、flatMap、filter 等都是這種分區(qū)策略

DataStream → DataStream

上游分區(qū)數(shù)據(jù)分發(fā)到下游對(duì)應(yīng)分區(qū)中

partition1->partition1;partition2->partition2

注意:必須保證上下游分區(qū)數(shù)(并行度)一致,不然會(huì)有如下異常:

Forward?partitioning?does?not?allow?change?of?parallelism.?Upstream?operation:?Source:?Socket?Stream-1?parallelism:?1,?downstream?operation:?Map-3?parallelism:?8?You?must?use?another?partitioning?strategy,?such?as?broadcast,?rebalance,?shuffle?or?global.
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Integer>?dataStream?=?env.fromElements(1,?2,?3,?4,?5,?6,?7,?8,?9,?10).setParallelism(1);
????????DataStream<Integer>?forwardStream?=?dataStream.forward().map(new?MapFunction<Integer,?Integer>()?{
????????????@Override
????????????public?Integer?map(Integer?value)?throws?Exception?{
????????????????return?value?*?value;
????????????}
????????}).setParallelism(1);
????????forwardStream.print();
????????env.execute("Flink?Forward?Example");
????}

此代碼首先創(chuàng)建一個(gè)從1到10的數(shù)據(jù)流。然后,它使用 Forward 策略將這個(gè)數(shù)據(jù)流送入一個(gè) MapFunction 中,該函數(shù)將每個(gè)數(shù)字平方。然后,它打印出結(jié)果。注意:以上代碼中的forward調(diào)用實(shí)際上并沒(méi)有改變?nèi)魏畏謪^(qū)策略,因?yàn)閒orward是默認(rèn)分區(qū)策略。這里添加forward調(diào)用主要是為了說(shuō)明其存在和使用方法。

keyBy

場(chǎng)景:與業(yè)務(wù)場(chǎng)景匹配

DataStream → DataStream

根據(jù)上游分區(qū)元素的Hash值與下游分區(qū)數(shù)取模計(jì)算出,將當(dāng)前元素分發(fā)到下游哪一個(gè)分區(qū)

MathUtils.murmurHash(keyHash)(每個(gè)元素的Hash值)?%?maxParallelism(下游分區(qū)數(shù))
public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Tuple2<Integer,?Integer>>?dataStream?=?env.fromElements(
????????????????new?Tuple2<>(1,?3),
????????????????new?Tuple2<>(1,?5),
????????????????new?Tuple2<>(2,?4),
????????????????new?Tuple2<>(2,?6),
????????????????new?Tuple2<>(3,?7)
????????);
????????//?使用?keyBy?對(duì)流進(jìn)行分區(qū)操作
????????DataStream<Tuple2<Integer,?Integer>>?keyedStream?=?dataStream
????????????????.keyBy(0)?//?根據(jù)元組的第一個(gè)字段進(jìn)行分區(qū)
????????????????.sum(1);??//?對(duì)每個(gè)鍵對(duì)應(yīng)的第二個(gè)字段求和
????????keyedStream.print();
????????env.execute("KeyBy?example");
????}

以上程序首先創(chuàng)建了一個(gè)包含五個(gè)元組的流,然后使用?keyBy?方法根據(jù)元組的第一個(gè)字段進(jìn)行分區(qū),并對(duì)每個(gè)鍵對(duì)應(yīng)的第二個(gè)字段求和。執(zhí)行結(jié)果中,每個(gè)鍵的值集合都被映射成了一個(gè)新的元組,其第一個(gè)字段是鍵,第二個(gè)字段是相應(yīng)的和。

注意:在以上代碼中,keyBy(0)?表示根據(jù)元組的第一個(gè)字段(索引從0開(kāi)始)進(jìn)行分區(qū)操作。另外,無(wú)論什么情況,都需要確保你的 Flink 集群是正常運(yùn)行的,否則程序可能無(wú)法執(zhí)行成功。

PartitionCustom

DataStream → DataStream

通過(guò)自定義的分區(qū)器,來(lái)決定元素是如何從上游分區(qū)分發(fā)到下游分區(qū)

public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Integer>?data?=?env.fromElements(1,2,3,4,5,6,7,8,9,10);
????????//?使用自定義分區(qū)器進(jìn)行分區(qū)
????????data.partitionCustom(new?MyPartitioner(),?i?->?i).print();
????????env.execute("Custom?partition?example");
????}

????public?static?class?MyPartitioner?implements?Partitioner<Integer>?{
????????@Override
????????public?int?partition(Integer?key,?int?numPartitions)?{
????????????return?key?%?numPartitions;
????????}
????}

這個(gè)程序?qū)?chuàng)建一個(gè)數(shù)據(jù)流,其中包含從1到10的整數(shù)。然后,它使用了一個(gè)自定義的分區(qū)器MyPartitioner來(lái)對(duì)這個(gè)數(shù)據(jù)流進(jìn)行分區(qū)。這個(gè)分區(qū)器根據(jù)元素的值對(duì)numPartitions取模來(lái)決定數(shù)據(jù)去到哪個(gè)分區(qū)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-774492.html

到了這里,關(guān)于Flink 內(nèi)容分享(四):Fink原理、實(shí)戰(zhàn)與性能優(yōu)化(四)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink 內(nèi)容分享(二十七):Hadoop vs Spark vs Flink——大數(shù)據(jù)框架比較

    大數(shù)據(jù)開(kāi)發(fā)離不開(kāi)各種框架,我們通過(guò)學(xué)習(xí) Apache Hadoop、Spark 和 Flink 之間的特征比較,可以從側(cè)面了解要學(xué)習(xí)的內(nèi)容。眾所周知,Hadoop vs Spark vs Flink是快速占領(lǐng) IT 市場(chǎng)的三大大數(shù)據(jù)技術(shù),大數(shù)據(jù)崗位幾乎都是圍繞它們展開(kāi)。 本文,將詳細(xì)介紹三種框架之間的區(qū)別。 Hadoop:為

    2024年02月01日
    瀏覽(39)
  • Flink 內(nèi)容分享(十九):理想汽車基于Flink on K8s的數(shù)據(jù)集成實(shí)踐

    Flink 內(nèi)容分享(十九):理想汽車基于Flink on K8s的數(shù)據(jù)集成實(shí)踐

    目錄 數(shù)據(jù)集成的發(fā)展與現(xiàn)狀 數(shù)據(jù)集成的落地實(shí)踐 1. 數(shù)據(jù)集成平臺(tái)架構(gòu) 2. 設(shè)計(jì)模型 3. 典型場(chǎng)景 4. 異構(gòu)數(shù)據(jù)源 5. SQL 形式的過(guò)濾條件 數(shù)據(jù)集成云原生的落地實(shí)踐 1. 方案選型 2. 狀態(tài)判斷及日志采集 3. 監(jiān)控告警 4. 共享存儲(chǔ) 未來(lái)規(guī)劃 理想汽車數(shù)據(jù)集成的發(fā)展經(jīng)歷了四個(gè)階段:

    2024年02月01日
    瀏覽(24)
  • 數(shù)據(jù)倉(cāng)庫(kù)內(nèi)容分享(十七):Doris實(shí)踐分享:它做了哪些架構(gòu)優(yōu)化和場(chǎng)景優(yōu)化?

    數(shù)據(jù)倉(cāng)庫(kù)內(nèi)容分享(十七):Doris實(shí)踐分享:它做了哪些架構(gòu)優(yōu)化和場(chǎng)景優(yōu)化?

    Apache Doris是一款開(kāi)源的實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù),由百度旗下的技術(shù)團(tuán)隊(duì)開(kāi)發(fā)。它具有高性能、高可靠性、易擴(kuò)展等特點(diǎn),能夠滿足大規(guī)模數(shù)據(jù)實(shí)時(shí)查詢和分析的需求。目前,Apache Doris已經(jīng)成為國(guó)內(nèi)外眾多企業(yè)的首選數(shù)據(jù)倉(cāng)庫(kù)解決方案,包括阿里巴巴、美團(tuán)、京東、滴滴等知名企業(yè)。

    2024年02月21日
    瀏覽(23)
  • Flink:處理大規(guī)模復(fù)雜數(shù)據(jù)集的最佳實(shí)踐深入探究Flink的數(shù)據(jù)處理和性能優(yōu)化技術(shù)

    作者:禪與計(jì)算機(jī)程序設(shè)計(jì)藝術(shù) 隨著互聯(lián)網(wǎng)、移動(dòng)互聯(lián)網(wǎng)、物聯(lián)網(wǎng)等新型網(wǎng)絡(luò)技術(shù)的不斷發(fā)展,企業(yè)對(duì)海量數(shù)據(jù)的處理日益依賴,而大數(shù)據(jù)分析、決策支持、風(fēng)險(xiǎn)控制等領(lǐng)域都需要海量的數(shù)據(jù)處理能力。如何高效、快速地處理海量數(shù)據(jù)、提升處理效率、降低成本,是當(dāng)下處理

    2024年02月13日
    瀏覽(27)
  • 7個(gè)工程應(yīng)用中數(shù)據(jù)庫(kù)性能優(yōu)化經(jīng)驗(yàn)分享

    摘要: 此篇文章分別從sql執(zhí)行過(guò)程、執(zhí)行計(jì)劃、索引數(shù)據(jù)結(jié)構(gòu)、索引查詢提速原理、聚焦索引、左前綴優(yōu)化原則、自增主鍵索引這些角度談一談我們對(duì)數(shù)據(jù)庫(kù)優(yōu)化的理解。 本文分享自華為云社區(qū)《工程應(yīng)用中數(shù)據(jù)庫(kù)性能優(yōu)化經(jīng)驗(yàn)小結(jié)》,作者: 葉工 。 現(xiàn)階段交付的算法產(chǎn)

    2024年02月06日
    瀏覽(27)
  • Kafka+Fink 實(shí)戰(zhàn)+工具類

    LogServiceImpl DwdShortLinkLogApp KafkaUtil TimeUtil

    2024年02月12日
    瀏覽(10)
  • 滌生大數(shù)據(jù)實(shí)戰(zhàn):基于Flink+ODPS歷史累計(jì)計(jì)算項(xiàng)目分析與優(yōu)化(下)

    滌生大數(shù)據(jù)實(shí)戰(zhàn):基于Flink+ODPS歷史累計(jì)計(jì)算項(xiàng)目分析與優(yōu)化(下)

    滌生大數(shù)據(jù)實(shí)戰(zhàn):基于Flink+ODPS歷史累計(jì)計(jì)算項(xiàng)目分析與優(yōu)化(二) 問(wèn)題分析 在 ODPS計(jì)算期間 或者 odps表同步到hbase表期間,發(fā)生了查詢,會(huì)導(dǎo)致數(shù)據(jù)錯(cuò)誤。出現(xiàn)問(wèn)題的地方就是這兩個(gè)時(shí)間窗口:ODPS計(jì)算期間 和 odps表同步到hbase表期間。那就針對(duì)性分析,各個(gè)擊破。? 解決方案

    2024年03月27日
    瀏覽(37)
  • Flink 內(nèi)容分享(二十):這三種場(chǎng)景,建議使用Flink

    Flink 內(nèi)容分享(二十):這三種場(chǎng)景,建議使用Flink

    目錄 01 事件驅(qū)動(dòng)型應(yīng)用 02 數(shù)據(jù)分析型應(yīng)用 03 數(shù)據(jù)管道型應(yīng)用 Flink的應(yīng)用場(chǎng)景十分廣泛,下面介紹3種常見(jiàn)的應(yīng)用。 在許多場(chǎng)景中,需要處理的數(shù)據(jù)往往來(lái)自事件。小到一些交互式的用戶行為,大到一些復(fù)雜的業(yè)務(wù)操作,它們都會(huì)被轉(zhuǎn)化成一條條數(shù)據(jù),進(jìn)而形成數(shù)據(jù)流(事件

    2024年01月16日
    瀏覽(20)
  • E往無(wú)前|騰訊云大數(shù)據(jù)ES索引原理剖析及寫(xiě)入性能優(yōu)化最佳實(shí)踐

    E往無(wú)前|騰訊云大數(shù)據(jù)ES索引原理剖析及寫(xiě)入性能優(yōu)化最佳實(shí)踐

    導(dǎo)讀 本文經(jīng)過(guò)大量案例總結(jié)和踩坑復(fù)盤(pán),歸納整理了Elastisearch集群在寫(xiě)入性能優(yōu)化方面一些常用的優(yōu)化技巧和避坑指南。 在我們服務(wù)騰訊云ES的客戶過(guò)程中,經(jīng)常會(huì)收到一些客戶對(duì)云上ES集群讀寫(xiě)性能未能達(dá)到預(yù)期的反饋,并希望我們能夠配合做一些性能壓測(cè)及調(diào)優(yōu)的工作。

    2024年02月09日
    瀏覽(21)
  • Flink性能優(yōu)化小結(jié)

    Flink性能優(yōu)化小結(jié)

    jvm內(nèi)存優(yōu)化 內(nèi)存優(yōu)化 netty優(yōu)化 akka優(yōu)化 并行度優(yōu)化 對(duì)象重用 checkpoint優(yōu)化 網(wǎng)絡(luò)內(nèi)存調(diào)優(yōu) 狀態(tài)優(yōu)化 flink數(shù)據(jù)傾斜優(yōu)化 flink背壓 jvm內(nèi)存參數(shù)調(diào)優(yōu) Flink是依賴內(nèi)存計(jì)算,計(jì)算過(guò)程中內(nèi)存不夠?qū)link的執(zhí)行效率影響很大。可以通過(guò)監(jiān)控GC(Garbage Collection),評(píng)估內(nèi)存使用及剩余情況

    2024年02月01日
    瀏覽(17)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包