目錄
Transformations
Sink
分區(qū)策略
Transformations
Transformations算子可以將一個(gè)或者多個(gè)算子轉(zhuǎn)換成一個(gè)新的數(shù)據(jù)流,使用Transformations算子組合可以處理復(fù)雜的業(yè)務(wù)處理。
Map
DataStream → DataStream
遍歷數(shù)據(jù)流中的每一個(gè)元素,產(chǎn)生一個(gè)新的元素。
FlatMap
DataStream → DataStream
遍歷數(shù)據(jù)流中的每一個(gè)元素,產(chǎn)生N個(gè)元素 N=0,1,2......。
Filter
DataStream → DataStream
過(guò)濾算子,根據(jù)數(shù)據(jù)流的元素計(jì)算出一個(gè)boolean類型的值,true代表保留,false代表過(guò)濾掉。
KeyBy
DataStream → KeyedStream
根據(jù)數(shù)據(jù)流中指定的字段來(lái)分區(qū),相同指定字段值的數(shù)據(jù)一定是在同一個(gè)分區(qū)中,內(nèi)部分區(qū)使用的是HashPartitioner。
指定分區(qū)字段的方式有三種:
-
根據(jù)索引號(hào)指定。
-
通過(guò)匿名函數(shù)來(lái)指定。
-
通過(guò)實(shí)現(xiàn)KeySelector接口 ?指定分區(qū)字段。
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStreamSource<Long>?stream?=?env.fromSequence(1,?100);
????????stream.map((MapFunction<Long,?Tuple2<Long,?Integer>>)?(Long?x)?->?new?Tuple2<>(x?%?3,?1),?TypeInformation.of(new?TypeHint<Tuple2<Long,?Integer>>()?{}))
????????????????//根據(jù)索引號(hào)來(lái)指定分區(qū)字段:.keyBy(0)
????????????????//通過(guò)傳入匿名函數(shù)?指定分區(qū)字段:.keyBy(x=>x._1)
????????????????//通過(guò)實(shí)現(xiàn)KeySelector接口??指定分區(qū)字段????????????????
????????????????.keyBy((KeySelector<Tuple2<Long,?Integer>,?Long>)?(Tuple2<Long,?Integer>?value)?->?value.f0,?BasicTypeInfo.LONG_TYPE_INFO)
????????????????.sum(1).print();
????????env.execute("Flink?Job");
????}
Reduce
適用于KeyedStream
KeyedStream:根據(jù)key分組 → DataStream
注意,reduce是基于分區(qū)后的流對(duì)象進(jìn)行聚合,也就是說(shuō),DataStream類型的對(duì)象無(wú)法調(diào)用reduce方法
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Tuple2<String,?Integer>>?dataStream?=?env.fromElements(
????????????????new?Tuple2<>("apple",?3),
????????????????new?Tuple2<>("banana",?1),
????????????????new?Tuple2<>("apple",?5),
????????????????new?Tuple2<>("banana",?2),
????????????????new?Tuple2<>("apple",?4)
????????);
????????//?使用reduce操作,將input中的所有元素合并到一起
????????DataStream<Tuple2<String,?Integer>>?result?=?dataStream
????????????????.keyBy(0)
????????????????.reduce((ReduceFunction<Tuple2<String,?Integer>>)?(value1,?value2)?->?new?Tuple2<>(value1.f0,?value1.f1?+?value2.f1));
????????result.print();
????????env.execute();
????}
Aggregations
KeyedStream → DataStream
Aggregations代表的是一類聚合算子,上面說(shuō)的reduce就屬于Aggregations,以下是一些常用的:
-
sum()
: 計(jì)算數(shù)字類型字段的總和。 -
min()
: 計(jì)算最小值。 -
max()
: 計(jì)算最大值。 -
count()
: 計(jì)數(shù)元素個(gè)數(shù)。 -
avg()
: 計(jì)算平均值。
另外,F(xiàn)link 還支持自定義聚合函數(shù),即使用?AggregateFunction
?接口實(shí)現(xiàn)更復(fù)雜的聚合邏輯。
Union 真合并
DataStream → DataStream
Union of two or more data streams creating a new stream containing all the elements from all the streams
合并兩個(gè)或者更多的數(shù)據(jù)流產(chǎn)生一個(gè)新的數(shù)據(jù)流,這個(gè)新的數(shù)據(jù)流中包含了所合并的數(shù)據(jù)流的元素
注意:需要保證數(shù)據(jù)流中元素類型一致
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Tuple2<String,?Integer>>?ds1?=?env.fromCollection(Arrays.asList(Tuple2.of("a",1),Tuple2.of("b",2),Tuple2.of("c",3)));
????????DataStream<Tuple2<String,?Integer>>?ds2?=?env.fromCollection(Arrays.asList(Tuple2.of("d",4),Tuple2.of("e",5),Tuple2.of("f",6)));
????????DataStream<Tuple2<String,?Integer>>?ds3?=?env.fromCollection(Arrays.asList(Tuple2.of("g",7),Tuple2.of("h",8)));
????????DataStream<Tuple2<String,?Integer>>?unionStream?=?ds1.union(ds2,ds3);
????????unionStream.print();
????????env.execute();
????}
在 Flink 中,Union 操作被稱為 "真合并" 是因?yàn)樗鼘蓚€(gè)或多個(gè)數(shù)據(jù)流完全融合在一起,沒(méi)有特定的順序,并且不會(huì)去除重復(fù)項(xiàng)。這種操作方式類似于在數(shù)學(xué)概念中的集合聯(lián)合(Union)操作,所以被稱為 "真合并"。
請(qǐng)注意,與其他一些數(shù)據(jù)處理框架中的 Union 操作相比,例如 Spark 中的 Union 會(huì)根據(jù)某些條件去除重復(fù)的元素,F(xiàn)link 的 Union 行為更接近于數(shù)學(xué)上的集合聯(lián)合理論。
Connect 假合并
DataStream,DataStream → ConnectedStreams
合并兩個(gè)數(shù)據(jù)流并且保留兩個(gè)數(shù)據(jù)流的數(shù)據(jù)類型,能夠共享兩個(gè)流的狀態(tài)
public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?ds1?=?env.socketTextStream("localhost",?8888);
????????DataStream<String>?ds2?=?env.socketTextStream("localhost",?9999);
????????DataStream<Tuple2<String,?Integer>>?wcStream1?=?ds1
????????????????.flatMap(new?Tokenizer())
????????????????.keyBy(value?->?value.f0)
????????????????.sum(1);
????????DataStream<Tuple2<String,?Integer>>?wcStream2?=?ds2
????????????????.flatMap(new?Tokenizer())
????????????????.keyBy(value?->?value.f0)
????????????????.sum(1);
????????ConnectedStreams<Tuple2<String,?Integer>,?Tuple2<String,?Integer>>?connectedStreams?=?wcStream1.connect(wcStream2);
????}
與union
不同,connect
只能連接兩個(gè)流,并且這兩個(gè)流的類型可以不同。connect
后的兩個(gè)流會(huì)被看作是兩個(gè)不同的流,可以使用CoMap
或者CoFlatMap
函數(shù)分別處理這兩個(gè)流。
CoMap, CoFlatMap
ConnectedStreams → DataStream
CoMap, CoFlatMap并不是具體算子名字,而是一類操作的名稱
-
凡是基于ConnectedStreams數(shù)據(jù)流做map遍歷,這類操作叫做CoMap。
-
凡是基于ConnectedStreams數(shù)據(jù)流做flatMap遍歷,這類操作叫做CoFlatMap。
CoMap實(shí)現(xiàn):
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????//?創(chuàng)建兩個(gè)不同的數(shù)據(jù)流
????????DataStream<Integer>?nums?=?env.fromElements(1,?2,?3,?4,?5);
????????DataStream<String>?text?=?env.fromElements("a",?"b",?"c");
????????//?連接兩個(gè)數(shù)據(jù)流
????????ConnectedStreams<Integer,?String>?connected?=?nums.connect(text);
????????//?使用?CoMap?處理連接的流
????????DataStream<String>?result?=?connected.map(new?CoMapFunction<Integer,?String,?String>()?{
????????????@Override
????????????public?String?map1(Integer?value)?{
????????????????return?String.valueOf(value*2);
????????????}
????????????@Override
????????????public?String?map2(String?value)?{
????????????????return?"hello?"?+?value;
????????????}
????????});
????????result.print();
????????env.execute("CoMap?example");
????}
CoFlatMap實(shí)現(xiàn)方式:
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Integer>?nums?=?env.fromElements(1,?2,?3,?4,?5);
????????DataStream<String>?text?=?env.fromElements("a",?"b",?"c");
????????ConnectedStreams<Integer,?String>?connected?=?nums.connect(text);
????????DataStream<String>?result?=?connected.flatMap(new?CoFlatMapFunction<Integer,?String,?String>()?{
????????????@Override
????????????public?void?flatMap1(Integer?value,?Collector<String>?out)?{
????????????????out.collect(String.valueOf(value*2));
????????????????out.collect(String.valueOf(value*3));
????????????}
????????????@Override
????????????public?void?flatMap2(String?value,?Collector<String>?out)?{
????????????????out.collect("hello?"?+?value);
????????????????out.collect("hi?"?+?value);
????????????}
????????});
????????result.print();
????????env.execute("CoFlatMap?example");
????}
Split/Select
DataStream → SplitStream
根據(jù)條件將一個(gè)流分成多個(gè)流,示例代碼如下:
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStreamSource<Long>?data?=?env.generateSequence(0,?10);
????????SplitStream<Long>?split?=?data.split((OutputSelector<Long>)?value?->?{
????????????List<String>?output?=?new?ArrayList<>();
????????????if?(value?%?2?==?0)?{
????????????????output.add("even");
????????????}?else?{
????????????????output.add("odd");
????????????}
????????????return?output;
????????});
????????split.select("odd").print();
????????env.execute("Flink?SplitStream?Example");
????}
select()
用于從SplitStream中選擇一個(gè)或者多個(gè)數(shù)據(jù)流。
split.select("odd").print();
SideOutput
注意:在Flink 1.12 及之后的版本中,SplitStream 已經(jīng)被棄用并移除,一般推薦使用 Side Outputs(側(cè)輸出流)來(lái)替代 Split和Select
示例代碼如下:
private?static?final?OutputTag<String>?evenOutput?=?new?OutputTag<String>("even"){};
private?static?final?OutputTag<String>?oddOutput?=?new?OutputTag<String>("odd"){};
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?input?=?env.fromElements("1",?"2",?"3",?"4",?"5");
????????SingleOutputStreamOperator<String>?processed?=?input.process(new?ProcessFunction<String,?String>()?{
????????????@Override
????????????public?void?processElement(String?value,?Context?ctx,?Collector<String>?out){
????????????????int?i?=?Integer.parseInt(value);
????????????????if?(i?%?2?==?0)?{
????????????????????ctx.output(evenOutput,?value);
????????????????}?else?{
????????????????????ctx.output(oddOutput,?value);
????????????????}
????????????}
????????});
????????DataStream<String>?evenStream?=?processed.getSideOutput(evenOutput);
????????DataStream<String>?oddStream?=?processed.getSideOutput(oddOutput);
????????evenStream.print("even");
????????oddStream.print("odd");
????????env.execute("Side?Output?Example");
????}
Iterate
DataStream → IterativeStream → DataStream
Iterate算子提供了對(duì)數(shù)據(jù)流迭代的支持
一個(gè)數(shù)據(jù)集通過(guò)迭代運(yùn)算符被劃分為兩部分:“反饋”部分(feedback)和“輸出”部分(output)。反饋部分被反饋到迭代頭(iteration head),從而形成下一次迭代。輸出部分則構(gòu)成該迭代的結(jié)果:
public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Long>?input?=?env.fromElements(10L);
????????//?定義迭代流,最大迭代10次
????????IterativeStream<Long>?iteration?=?input.iterate(10000L);
????????//?定義迭代邏輯
????????DataStream<Long>?minusOne?=?iteration.map((MapFunction<Long,?Long>)?value?->?value?-?1);
????????//?定義反饋流(滿足條件繼續(xù)迭代)和輸出流(不滿足條件的結(jié)果)
????????DataStream<Long>?stillGreaterThanZero?=?minusOne.filter(value?->?value?>?0).setParallelism(1);;
????????DataStream<Long>?lessThanZero?=?minusOne.filter(value?->?value?<=?0);
????????//?關(guān)閉迭代,定義反饋流
????????iteration.closeWith(stillGreaterThanZero);
????????//?打印結(jié)果
????????lessThanZero.print();
????????env.execute("Iterative?Stream?Example");
????}
普通函數(shù) & 富函數(shù)
Apache Flink 中有兩種類型的函數(shù): 「普通函數(shù)(Regular Functions)」和 「富函數(shù)(Rich Functions)」。主要區(qū)別在于富函數(shù)相比普通函數(shù)提供了更多生命周期方法和上下文信息。
-
普通函數(shù):這些函數(shù)只需要覆蓋一個(gè)或幾個(gè)特定方法,如?
MapFunction
?需要實(shí)現(xiàn)?map()
?方法。它們沒(méi)有生命周期方法,也不能訪問(wèn)執(zhí)行環(huán)境的上下文。 -
富函數(shù):除了覆蓋特定函數(shù)外,富函數(shù)還提供了對(duì) Flink API 更多的控制和操作,包括:
-
生命周期管理:可以覆蓋?
open()
?和?close()
?方法以便在函數(shù)啟動(dòng)前和關(guān)閉后做一些設(shè)置或清理工作。 -
獲取運(yùn)行時(shí)上下文信息:例如,通過(guò)?
getRuntimeContext()
?方法獲取并行任務(wù)的信息,如當(dāng)前子任務(wù)的索引等。 -
狀態(tài)管理和容錯(cuò):可以定義和使用托管狀態(tài)(Managed State),這在構(gòu)建容錯(cuò)系統(tǒng)時(shí)非常重要。
-
簡(jiǎn)而言之,如果你需要在函數(shù)中使用 Flink 的高級(jí)功能,如狀態(tài)管理或訪問(wèn)運(yùn)行時(shí)上下文,則需要使用富函數(shù)。如果不需要這些功能,使用普通函數(shù)即可。
普通函數(shù)類 | 富函數(shù)類 |
---|---|
MapFunction | RichMapFunction |
FlatMapFunction | RichFlatMapFunction |
FilterFunction | RichFilterFunction |
...... | ...... |
普通函數(shù):
public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????List<String>?words?=?Arrays.asList("hello",?"world",?"flink",?"hello",?"world");
????????env.fromCollection(words)
????????????????.map(new?MapFunction<String,?Tuple2<String,?Integer>>()?{
????????????????????@Override
????????????????????public?Tuple2<String,?Integer>?map(String?value)?{
????????????????????????return?new?Tuple2<>(value,?1);
????????????????????}
????????????????})
????????????????.keyBy(0)
????????????????.sum(1)
????????????????.print();
????????env.execute("Word?Count?Example");
????}
富函數(shù):
public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????List<String>?words?=?Arrays.asList("hello",?"world",?"flink",?"hello",?"world");
????????env.fromCollection(words)
????????????????.map(new?RichMapFunction<String,?Tuple2<String,?Integer>>()?{
????????????????????@Override
????????????????????public?void?open(Configuration?parameters)?throws?Exception?{
????????????????????????super.open(parameters);
????????????????????????//?可以在這里設(shè)置相關(guān)的配置或者資源,如數(shù)據(jù)庫(kù)連接等
????????????????????}
????????????????????@Override
????????????????????public?Tuple2<String,?Integer>?map(String?value)?throws?Exception?{
????????????????????????return?new?Tuple2<>(value,?1);
????????????????????}
????????????????????@Override
????????????????????public?void?close()?throws?Exception?{
????????????????????????super.close();
????????????????????????//?可以在這里完成資源的清理工作
????????????????????}
????????????????})
????????????????.keyBy(0)
????????????????.sum(1)
????????????????.print();
????????env.execute("Word?Count?Example");
????}
ProcessFunction(處理函數(shù))
ProcessFunction屬于低層次的API,在類繼承關(guān)系上屬于富函數(shù)。
我們前面講的map
、filter
、flatMap
等算子都是基于這層封裝出來(lái)的。
越低層次的API,功能越強(qiáng)大,用戶能夠獲取的信息越多,比如可以拿到元素狀態(tài)信息、事件時(shí)間、設(shè)置定時(shí)器等
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?dataStream?=?env.socketTextStream("localhost",?9999)
????????????????.map(new?MapFunction<String,?Tuple2<String,?Integer>>()?{
????????????????????@Override
????????????????????public?Tuple2<String,?Integer>?map(String?value)?{
????????????????????????return?new?Tuple2<>(value,?1);
????????????????????}
????????????????})
????????????????.keyBy(0)
????????????????.process(new?AlertFunction());
????????dataStream.print();
????????env.execute("Process?Function?Example");
????}
????public?static?class?AlertFunction?extends?KeyedProcessFunction<Tuple,?Tuple2<String,?Integer>,?String>?{
????????private?transient?ValueState<Integer>?countState;
????????@Override
????????public?void?open(Configuration?config)?{
????????????ValueStateDescriptor<Integer>?descriptor?=
????????????????????new?ValueStateDescriptor<>(
????????????????????????????"countState",?//?state?name
????????????????????????????TypeInformation.of(new?TypeHint<Integer>()?{}),?//?type?information
????????????????????????????0);?//?default?value
????????????countState?=?getRuntimeContext().getState(descriptor);
????????}
????????@Override
????????public?void?processElement(Tuple2<String,?Integer>?value,?Context?ctx,?Collector<String>?out)?throws?Exception?{
????????????Integer?currentCount?=?countState.value();
????????????currentCount?+=?1;
????????????countState.update(currentCount);
????????????if?(currentCount?>=?3)?{
????????????????out.collect("Warning!?The?key?'"?+?value.f0?+?"'?has?been?seen?"?+?currentCount?+?"?times.");
????????????}
????????}
????}
這里,我們創(chuàng)建一個(gè)名為AlertFunction
的處理函數(shù)類,并繼承KeyedProcessFunction
。其中,ValueState
用于保存狀態(tài)信息,每個(gè)鍵會(huì)有其自己的狀態(tài)實(shí)例。當(dāng)計(jì)數(shù)達(dá)到或超過(guò)三次時(shí),該系統(tǒng)將發(fā)出警告。這個(gè)例子主要展示了處理函數(shù)與其他運(yùn)算符相比的兩個(gè)優(yōu)點(diǎn):訪問(wèn)鍵控狀態(tài)和生命周期管理方法(例如open()
)。
注意:上述示例假設(shè)你已經(jīng)在本地的9999端口上設(shè)置了一個(gè)socket服務(wù)器,用于流式傳輸文本數(shù)據(jù)。如果沒(méi)有,你需要替換這部分以適應(yīng)你的輸入源。
Sink
在Flink中,"Sink"是數(shù)據(jù)流計(jì)算的最后一步。它代表了一個(gè)輸出端點(diǎn),在那里計(jì)算結(jié)果被發(fā)送或存儲(chǔ)。換句話說(shuō),Sink是數(shù)據(jù)流處理過(guò)程中的結(jié)束節(jié)點(diǎn),負(fù)責(zé)將處理后的數(shù)據(jù)輸出到外部系統(tǒng),如數(shù)據(jù)庫(kù)、文件、消息隊(duì)列等。
Flink內(nèi)置了大量Sink,可以將Flink處理后的數(shù)據(jù)輸出到HDFS、kafka、Redis、ES、MySQL等。
Redis Sink
Flink處理的數(shù)據(jù)可以存儲(chǔ)到Redis中,以便實(shí)時(shí)查詢。
首先,需要導(dǎo)入Flink和Redis的連接器依賴:
<!--?Flink?Redis?connector?-->
????????<dependency>
????????????<groupId>org.apache.bahir</groupId>
????????????<artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
????????????<version>1.1.0</version>
????????</dependency>
下面的代碼展示了"Word Count"(詞頻統(tǒng)計(jì))操作,并將結(jié)果存儲(chǔ)到Redis數(shù)據(jù)庫(kù)中:
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?text?=?env.fromElements(
????????????????"Hello?World",
????????????????"Hello?Flink",
????????????????"Hello?Java");
????????DataStream<Tuple2<String,?Integer>>?counts?=
????????????????text.flatMap(new?Tokenizer())
????????????????????????.keyBy(value?->?value.f0)
????????????????????????.sum(1);
????????FlinkJedisPoolConfig?conf?=?new?FlinkJedisPoolConfig.Builder().setHost("localhost").build();
????????counts.addSink(new?RedisSink<>(conf,?new?RedisExampleMapper()));
????????env.execute("Word?Count?Example");
????}
????public?static?final?class?Tokenizer?implements?FlatMapFunction<String,?Tuple2<String,?Integer>>?{
????????@Override
????????public?void?flatMap(String?value,?Collector<Tuple2<String,?Integer>>?out)?{
????????????String[]?words?=?value.toLowerCase().split("\\W+");
????????????for?(String?word?:?words)?{
????????????????if?(word.length()?>?0)?{
????????????????????out.collect(new?Tuple2<>(word,?1));
????????????????}
????????????}
????????}
????}
????public?static?final?class?RedisExampleMapper?implements?RedisMapper<Tuple2<String,?Integer>>?{
????????@Override
????????public?RedisCommandDescription?getCommandDescription()?{
????????????return?new?RedisCommandDescription(RedisCommand.HSET);
????????}
????????@Override
????????public?String?getKeyFromData(Tuple2<String,?Integer>?data)?{
????????????return?data.f0;
????????}
????????@Override
????????public?String?getValueFromData(Tuple2<String,?Integer>?data)?{
????????????return?data.f1.toString();
????????}
????}
Kafka Sink
處理結(jié)果寫(xiě)入到kafka topic中,F(xiàn)link也是支持的,需要添加連接器依賴,跟讀取kafka數(shù)據(jù)用的連接器依賴相同,之前添加過(guò)就不需要再添加了。
<dependency>
????<groupId>org.apache.flink</groupId>
????<artifactId>flink-connector-kafka_2.12</artifactId>
????<version>1.13.6</version>
</dependency>
還是用上面詞頻統(tǒng)計(jì)的例子:
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?text?=?env.fromElements(
????????????????"Hello?World",
????????????????"Hello?Flink",
????????????????"Hello?Java");
????????DataStream<Tuple2<String,?Integer>>?counts?=
????????????????text.flatMap(new?Tokenizer())
????????????????????????.keyBy(value?->?value.f0)
????????????????????????.sum(1);
????????//?Define?Kafka?properties
????????Properties?properties?=?new?Properties();
????????properties.setProperty("bootstrap.servers",?"localhost:9092");
????????//?Write?the?data?stream?to?Kafka
????????counts.map(new?MapFunction<Tuple2<String,Integer>,?String>()?{
????????????????????@Override
????????????????????public?String?map(Tuple2<String,Integer>?value)?throws?Exception?{
????????????????????????return?value.f0?+?","?+?value.f1.toString();
????????????????????}
????????????????})
????????????????.addSink(new?FlinkKafkaProducer<>("my-topic",?new?SimpleStringSchema(),?properties));
????????env.execute("Word?Count?Example");
????}
MySQL Sink
Flink處理結(jié)果寫(xiě)入到MySQL中,這并不是Flink默認(rèn)支持的,需要添加MySQL的驅(qū)動(dòng)依賴:
<!--?https://mvnrepository.com/artifact/mysql/mysql-connector-java?-->
<dependency>
????<groupId>mysql</groupId>
????<artifactId>mysql-connector-java</artifactId>
????<version>8.0.28</version>
</dependency>
因?yàn)椴皇莾?nèi)嵌支持的,所以需要基于SinkFunction自定義Sink。
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?text?=?env.fromElements(
????????????????"Hello?World",
????????????????"Hello?Flink",
????????????????"Hello?Java");
????????DataStream<Tuple2<String,?Integer>>?counts?=
????????????????text.flatMap(new?Tokenizer())
????????????????????????.keyBy(value?->?value.f0)
????????????????????????.sum(1);
????????//?Transform?the?Tuple2<String,?Integer>?to?a?format?acceptable?by?MySQL
????????DataStream<String>?mysqlData?=?counts.map(new?MapFunction<Tuple2<String,?Integer>,?String>()?{
????????????@Override
????????????public?String?map(Tuple2<String,?Integer>?value)?throws?Exception?{
????????????????return?"'"?+?value.f0?+?"',"?+?value.f1.toString();
????????????}
????????});
????????//?Write?the?data?stream?to?MySQL
????????mysqlData.addSink(new?MySqlSink());
????????env.execute("Word?Count?Example");
????}
????public?static?class?MySqlSink?implements?SinkFunction<String>?{
????????private?Connection?connection;
????????private?PreparedStatement?preparedStatement;
????????@Override
????????public?void?invoke(String?value,?Context?context)?throws?Exception?{
????????????if(connection?==?null)?{
????????????????connection?=?DriverManager.getConnection("jdbc:mysql://localhost:3306/test",?"username",?"password");
????????????????preparedStatement?=?connection.prepareStatement("INSERT?INTO?my_table(word,?count)?VALUES("+?value?+")");
????????????}
????????????preparedStatement.executeUpdate();
????????}
????}
}
HBase Sink
需要導(dǎo)入HBase的依賴:
????????<!--?https://mvnrepository.com/artifact/org.apache.hbase/hbase-client?-->
????????<dependency>
????????????<groupId>org.apache.hbase</groupId>
????????????<artifactId>hbase-client</artifactId>
????????????<version>2.5.2</version>
????????</dependency>
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?text?=?env.fromElements(
????????????????"Hello?World",
????????????????"Hello?Flink",
????????????????"Hello?Java");
????????DataStream<Tuple2<String,?Integer>>?counts?=
????????????????text.flatMap(new?Tokenizer())
????????????????????????.keyBy(value?->?value.f0)
????????????????????????.sum(1);
????????counts.addSink(new?HBaseSink());
????????env.execute("Word?Count?Example");
????}
????public?static?final?class?Tokenizer?implements?FlatMapFunction<String,?Tuple2<String,?Integer>>?{
????????@Override
????????public?void?flatMap(String?value,?Collector<Tuple2<String,?Integer>>?out)?{
????????????String[]?words?=?value.toLowerCase().split("\\W+");
????????????for?(String?word?:?words)?{
????????????????if?(word.length()?>?0)?{
????????????????????out.collect(new?Tuple2<>(word,?1));
????????????????}
????????????}
????????}
????}
????public?static?class?HBaseSink?extends?RichSinkFunction<Tuple2<String,?Integer>>?{
????????private?org.apache.hadoop.conf.Configuration?config;
????????private?org.apache.hadoop.hbase.client.Connection?connection;
????????private?Table?table;
????????@Override
????????public?void?invoke(Tuple2<String,?Integer>?value,?Context?context)?throws?IOException?{
????????????Put?put?=?new?Put(Bytes.toBytes(value.f0));
????????????put.addColumn(Bytes.toBytes("cf"),?Bytes.toBytes("count"),?Bytes.toBytes(value.f1));
????????????table.put(put);
????????}
????????@Override
????????public?void?open(Configuration?parameters)?throws?Exception?{
????????????config?=?HBaseConfiguration.create();
????????????config.set("hbase.zookeeper.quorum",?"localhost");
????????????config.set("hbase.zookeeper.property.clientPort",?"2181");
????????????connection?=?ConnectionFactory.createConnection(config);
????????????table?=?connection.getTable(TableName.valueOf("my-table"));
????????}
????????@Override
????????public?void?close()?throws?Exception?{
????????????table.close();
????????????connection.close();
????????}
????}
HBaseSink
類是RichSinkFunction
的實(shí)現(xiàn),用于將結(jié)果寫(xiě)入HBase數(shù)據(jù)庫(kù)。在invoke
方法中,它將接收到的每個(gè)二元組(單詞和計(jì)數(shù))寫(xiě)入HBase。在open
方法中,它創(chuàng)建了與HBase的連接,并指定了要寫(xiě)入的表。在close
方法中,它關(guān)閉了與HBase的連接和表。
分區(qū)策略
在 Apache Flink 中,分區(qū)(Partitioning)是將數(shù)據(jù)流按照一定的規(guī)則劃分成多個(gè)子數(shù)據(jù)流或分片,以便在不同的并行任務(wù)或算子中并行處理數(shù)據(jù)。分區(qū)是實(shí)現(xiàn)并行計(jì)算和數(shù)據(jù)流處理的基礎(chǔ)機(jī)制。Flink 的分區(qū)決定了數(shù)據(jù)在作業(yè)中的流動(dòng)方式,以及在并行任務(wù)之間如何分配和處理數(shù)據(jù)。
在 Flink 中,數(shù)據(jù)流可以看作是一個(gè)有向圖,圖中的節(jié)點(diǎn)代表算子(Operators),邊代表數(shù)據(jù)流(Data Streams)。數(shù)據(jù)從源算子流向下游算子,這些算子可能并行地處理輸入數(shù)據(jù),而分區(qū)就是決定數(shù)據(jù)如何從一個(gè)算子傳遞到另一個(gè)算子的機(jī)制。
下面介紹Flink中常用的幾種分區(qū)策略。
shuffle
場(chǎng)景:增大分區(qū)、提高并行度,解決數(shù)據(jù)傾斜。
DataStream → DataStream
分區(qū)元素隨機(jī)均勻分發(fā)到下游分區(qū),網(wǎng)絡(luò)開(kāi)銷比較大
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Long>?stream?=?env.fromSequence(1,?10).setParallelism(1);
????????System.out.println(stream.getParallelism());
????????stream.shuffle().print();
????????env.execute();
????}
輸出結(jié)果:上游數(shù)據(jù)比較隨意地分發(fā)到下游
1>?7
7>?1
2>?8
4>?5
8>?3
1>?9
8>?4
8>?10
6>?2
6>?6
rebalance
場(chǎng)景:增大分區(qū)、提高并行度,解決數(shù)據(jù)傾斜
DataStream → DataStream
輪詢分區(qū)元素,均勻的將元素分發(fā)到下游分區(qū),下游每個(gè)分區(qū)的數(shù)據(jù)比較均勻,在發(fā)生數(shù)據(jù)傾斜時(shí)非常有用,網(wǎng)絡(luò)開(kāi)銷比較大
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Long>?stream?=?env.fromSequence(1,?10).setParallelism(1);
????????System.out.println(stream.getParallelism());
????????stream.rebalance().print();
????????env.execute();
????}
輸出:上游數(shù)據(jù)比較均勻的分發(fā)到下游
2>?2
1>?1
8>?8
5>?5
7>?7
4>?4
3>?3
6>?6
1>?9
2>?10
rescale
場(chǎng)景:減少分區(qū),防止發(fā)生大量的網(wǎng)絡(luò)傳輸,不會(huì)發(fā)生全量的重分區(qū)
DataStream → DataStream
通過(guò)輪詢分區(qū)元素,將一個(gè)元素集合從上游分區(qū)發(fā)送給下游分區(qū),發(fā)送單位是集合,而不是一個(gè)個(gè)元素
和其他重分區(qū)策略(如 rebalance、forward、broadcast 等)不同的是,rescale 在運(yùn)行時(shí)不會(huì)改變并行度,而且它只在本地(同一個(gè) TaskManager 內(nèi))進(jìn)行數(shù)據(jù)交換,所以它比其他重分區(qū)策略更加高效
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String>?dataStream?=?env.fromElements("1",?"2",?"3",?"4",?"5");
????????//?使用MapFunction將元素轉(zhuǎn)換為整數(shù)類型
????????DataStream<Integer>?intStream?=?dataStream.map(new?MapFunction<String,?Integer>()?{
????????????@Override
????????????public?Integer?map(String?value)?{
????????????????return?Integer.parseInt(value);
????????????}
????????});
????????//?使用rescale()進(jìn)行重分區(qū)
????????DataStream<Integer>?rescaledStream?=?intStream.rescale();
????????rescaledStream.print();
????????env.execute("Rescale?Example");
????}
在這個(gè)例子中,我們創(chuàng)建了一個(gè)字符串類型的DataStream然后通過(guò)map()
將每一個(gè)元素轉(zhuǎn)換為整數(shù)。然后,我們對(duì)結(jié)果DataStream應(yīng)用rescale()
操作來(lái)重分區(qū)數(shù)據(jù)。
值得注意的是,rescale()
的實(shí)際影響取決于你的并行度和集群環(huán)境,如果不同的并行實(shí)例都在同一臺(tái)機(jī)器上,或者并行度只有1,那么可能不會(huì)看到rescale()
的效果。而在大規(guī)模并行處理的情況下,使用rescale()
操作可以提高數(shù)據(jù)處理的效率。
此外,我們不能直接在打印結(jié)果中看到rescale
的影響,因?yàn)樗淖兊氖莾?nèi)部數(shù)據(jù)分布和處理方式,而不是輸出的結(jié)果。如果想觀察rescale
的作用,需要通過(guò)Flink的Web UI或者日志來(lái)查看任務(wù)執(zhí)行情況,如數(shù)據(jù)流的分布、各個(gè)子任務(wù)的運(yùn)行狀態(tài)等信息。
broadcast
場(chǎng)景:需要使用映射表、并且映射表會(huì)經(jīng)常發(fā)生變動(dòng)的場(chǎng)景
DataStream → DataStream
上游中每一個(gè)元素內(nèi)容廣播到下游每一個(gè)分區(qū)中
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Integer>?dataStream?=?env.fromElements(1,?2,?3,?4,?5);
????????DataStream<String>?broadcastStream?=?env.fromElements("2",?"4");
????????MapStateDescriptor<String,?String>?descriptor?=?new?MapStateDescriptor<>(
????????????????"RulesBroadcastState",
????????????????BasicTypeInfo.STRING_TYPE_INFO,
????????????????BasicTypeInfo.STRING_TYPE_INFO);
????????BroadcastStream<String>?broadcastData?=?broadcastStream.broadcast(descriptor);
????????dataStream.connect(broadcastData)
????????????????.process(new?BroadcastProcessFunction<Integer,?String,?String>()?{
????????????????????@Override
????????????????????public?void?processElement(Integer?value,?ReadOnlyContext?ctx,?Collector<String>?out)?throws?Exception?{
????????????????????????if?(ctx.getBroadcastState(descriptor).contains(String.valueOf(value)))?{
????????????????????????????out.collect("Value?"?+?value?+?"?matches?with?a?broadcasted?rule");
????????????????????????}
????????????????????}
????????????????????@Override
????????????????????public?void?processBroadcastElement(String?rule,?Context?ctx,?Collector<String>?out)?throws?Exception?{
????????????????????????ctx.getBroadcastState(descriptor).put(rule,?rule);
????????????????????}
????????????????}).print();
????????env.execute("Broadcast?State?Example");
????}
上述代碼首先定義了一個(gè)主流和一個(gè)要廣播的流。然后,我們創(chuàng)建了一個(gè)MapStateDescriptor
,用于存儲(chǔ)廣播數(shù)據(jù)。接著,我們將廣播流轉(zhuǎn)換為BroadcastStream
。
最后,我們使用connect()
方法連接主流和廣播流,并執(zhí)行process()
方法。在這個(gè)process()
方法中,我們定義了兩個(gè)處理函數(shù):processElement()
和processBroadcastElement()
。processElement()
用于處理主流中的每個(gè)元素,并檢查該元素是否存在于廣播狀態(tài)中。如果是,則輸出一個(gè)字符串,表明匹配成功。而processBroadcastElement()
則用于處理廣播流中的每個(gè)元素,并將其添加到廣播狀態(tài)中。
注意:在分布式計(jì)算環(huán)境中,每個(gè)并行實(shí)例都會(huì)接收廣播流中的所有元素。因此,廣播狀態(tài)對(duì)于所有的并行實(shí)例都是一樣的。不過(guò),在Flink 1.13版本中,廣播狀態(tài)尚未在故障恢復(fù)中提供完全的保障。所以在事件出現(xiàn)故障時(shí),廣播狀態(tài)可能會(huì)丟失數(shù)據(jù)。
global
場(chǎng)景:并行度降為1
DataStream → DataStream
在 Apache Flink 中,Global 分區(qū)策略意味著所有數(shù)據(jù)都被發(fā)送到下游算子的同一個(gè)分區(qū)中。這種情況下,下游算子只有一個(gè)任務(wù)處理全部數(shù)據(jù)。這是一種特殊的分區(qū)策略,只有在下游算子能夠很快地處理所有數(shù)據(jù),或者需要全局排序或全局聚合時(shí)才會(huì)使用。
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????//?創(chuàng)建一個(gè)從1到100的數(shù)字流
????????DataStream<Long>?numberStream?=?env.fromSequence(1,?100);
????????//?對(duì)流應(yīng)用?map?function
????????DataStream<Long>?result?=?numberStream.global()
????????????????.map(new?MapFunction<Long,?Long>()?{
????????????????????@Override
????????????????????public?Long?map(Long?value)?{
????????????????????????System.out.println("Processing?"?+?value);
????????????????????????return?value?*?2;
????????????????????}
????????????????});
????????result.print();
????????env.execute("Global?Partition?Example");
????}
以上代碼創(chuàng)建了一個(gè)順序生成 1-100 的數(shù)字流,并應(yīng)用了 Global Partition,然后對(duì)每個(gè)數(shù)字進(jìn)行乘2的操作。實(shí)際運(yùn)行此代碼時(shí),你會(huì)觀察到所有的數(shù)字都由同一任務(wù)處理,打印出來(lái)的處理順序是連續(xù)的。這就是 Global Partition 的作用:所有數(shù)據(jù)都被發(fā)送到下游算子的同一實(shí)例進(jìn)行處理。
需要注意的是,此示例只是為了演示 Global Partition 的工作原理,實(shí)際上并不推薦在負(fù)載均衡很重要的應(yīng)用場(chǎng)景中使用這種分區(qū)策略,因?yàn)樗赡軐?dǎo)致嚴(yán)重的性能問(wèn)題。
forward
場(chǎng)景:一對(duì)一的數(shù)據(jù)分發(fā),默認(rèn)的分區(qū)策略,數(shù)據(jù)在各個(gè)算子之間不會(huì)重新分配。map、flatMap、filter 等都是這種分區(qū)策略
DataStream → DataStream
上游分區(qū)數(shù)據(jù)分發(fā)到下游對(duì)應(yīng)分區(qū)中
partition1->partition1;partition2->partition2
注意:必須保證上下游分區(qū)數(shù)(并行度)一致,不然會(huì)有如下異常:
Forward?partitioning?does?not?allow?change?of?parallelism.?Upstream?operation:?Source:?Socket?Stream-1?parallelism:?1,?downstream?operation:?Map-3?parallelism:?8?You?must?use?another?partitioning?strategy,?such?as?broadcast,?rebalance,?shuffle?or?global.
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Integer>?dataStream?=?env.fromElements(1,?2,?3,?4,?5,?6,?7,?8,?9,?10).setParallelism(1);
????????DataStream<Integer>?forwardStream?=?dataStream.forward().map(new?MapFunction<Integer,?Integer>()?{
????????????@Override
????????????public?Integer?map(Integer?value)?throws?Exception?{
????????????????return?value?*?value;
????????????}
????????}).setParallelism(1);
????????forwardStream.print();
????????env.execute("Flink?Forward?Example");
????}
此代碼首先創(chuàng)建一個(gè)從1到10的數(shù)據(jù)流。然后,它使用 Forward 策略將這個(gè)數(shù)據(jù)流送入一個(gè) MapFunction 中,該函數(shù)將每個(gè)數(shù)字平方。然后,它打印出結(jié)果。注意:以上代碼中的forward調(diào)用實(shí)際上并沒(méi)有改變?nèi)魏畏謪^(qū)策略,因?yàn)閒orward是默認(rèn)分區(qū)策略。這里添加forward調(diào)用主要是為了說(shuō)明其存在和使用方法。
keyBy
場(chǎng)景:與業(yè)務(wù)場(chǎng)景匹配
DataStream → DataStream
根據(jù)上游分區(qū)元素的Hash值與下游分區(qū)數(shù)取模計(jì)算出,將當(dāng)前元素分發(fā)到下游哪一個(gè)分區(qū)
MathUtils.murmurHash(keyHash)(每個(gè)元素的Hash值)?%?maxParallelism(下游分區(qū)數(shù))
public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Tuple2<Integer,?Integer>>?dataStream?=?env.fromElements(
????????????????new?Tuple2<>(1,?3),
????????????????new?Tuple2<>(1,?5),
????????????????new?Tuple2<>(2,?4),
????????????????new?Tuple2<>(2,?6),
????????????????new?Tuple2<>(3,?7)
????????);
????????//?使用?keyBy?對(duì)流進(jìn)行分區(qū)操作
????????DataStream<Tuple2<Integer,?Integer>>?keyedStream?=?dataStream
????????????????.keyBy(0)?//?根據(jù)元組的第一個(gè)字段進(jìn)行分區(qū)
????????????????.sum(1);??//?對(duì)每個(gè)鍵對(duì)應(yīng)的第二個(gè)字段求和
????????keyedStream.print();
????????env.execute("KeyBy?example");
????}
以上程序首先創(chuàng)建了一個(gè)包含五個(gè)元組的流,然后使用?keyBy
?方法根據(jù)元組的第一個(gè)字段進(jìn)行分區(qū),并對(duì)每個(gè)鍵對(duì)應(yīng)的第二個(gè)字段求和。執(zhí)行結(jié)果中,每個(gè)鍵的值集合都被映射成了一個(gè)新的元組,其第一個(gè)字段是鍵,第二個(gè)字段是相應(yīng)的和。
注意:在以上代碼中,keyBy(0)
?表示根據(jù)元組的第一個(gè)字段(索引從0開(kāi)始)進(jìn)行分區(qū)操作。另外,無(wú)論什么情況,都需要確保你的 Flink 集群是正常運(yùn)行的,否則程序可能無(wú)法執(zhí)行成功。
PartitionCustom
DataStream → DataStream
通過(guò)自定義的分區(qū)器,來(lái)決定元素是如何從上游分區(qū)分發(fā)到下游分區(qū)文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-774492.html
public?static?void?main(String[]?args)?throws?Exception?{
????????final?StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<Integer>?data?=?env.fromElements(1,2,3,4,5,6,7,8,9,10);
????????//?使用自定義分區(qū)器進(jìn)行分區(qū)
????????data.partitionCustom(new?MyPartitioner(),?i?->?i).print();
????????env.execute("Custom?partition?example");
????}
????public?static?class?MyPartitioner?implements?Partitioner<Integer>?{
????????@Override
????????public?int?partition(Integer?key,?int?numPartitions)?{
????????????return?key?%?numPartitions;
????????}
????}
這個(gè)程序?qū)?chuàng)建一個(gè)數(shù)據(jù)流,其中包含從1到10的整數(shù)。然后,它使用了一個(gè)自定義的分區(qū)器MyPartitioner
來(lái)對(duì)這個(gè)數(shù)據(jù)流進(jìn)行分區(qū)。這個(gè)分區(qū)器根據(jù)元素的值對(duì)numPartitions
取模來(lái)決定數(shù)據(jù)去到哪個(gè)分區(qū)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-774492.html
到了這里,關(guān)于Flink 內(nèi)容分享(四):Fink原理、實(shí)戰(zhàn)與性能優(yōu)化(四)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!