Flink中的分流
在Flink中將數(shù)據(jù)流切分為多個(gè)子數(shù)據(jù)流,子數(shù)據(jù)流稱(chēng)為”旁路輸出數(shù)據(jù)流“。
拆分流數(shù)據(jù)的方式
- Split,已經(jīng)廢棄,不推薦使用
- Fliter
- SideOut,推薦使用
Fliter分流的Java實(shí)現(xiàn)
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指標(biāo)明細(xì)
DataStream<String> detailMessage = KafkaConfigUtil.buildSource(env)
.map((MapFunction<String, String>) kafkaMessage -> {
JSONObject jsonobject = null;
try {
jsonobject = JSONObject.parseObject(kafkaMessage);
} catch (Exception e) {
LOG.warn("報(bào)文格式錯(cuò)誤:{}", kafkaMessage);
}
if (null == jsonobject || jsonobject.isEmpty()) {
LOG.warn("報(bào)文內(nèi)容不合法:{}", JSONObject.toJSONString(jsonobject));
} else {
if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
&& !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
LOG.warn("報(bào)文所屬服務(wù)不存在:{}", JSONObject.toJSONString(jsonobject));
}
}
return JSONObject.toJSONString(jsonobject);
});
// 將原始流中包含demo的數(shù)據(jù)篩選出來(lái)
DataStream<String> diagnosisMessages = detailMessage
.filter((FilterFunction<String>) kafkaMessage -> (kafkaMessage.contains("demo")))
.map((MapFunction<String, String>) sparkMessage -> {
// 為達(dá)到實(shí)驗(yàn)效果,進(jìn)行日志輸出
LOG.info("[is demo message]:{}", sparkMessage);
return sparkMessage;
});
env.execute("Flink Streaming Java API Skeleton");
}
SideOut分流的Java實(shí)現(xiàn)
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println("【SideOutputDemo】");
// 指標(biāo)明細(xì)
DataStream<String> mainMessage = KafkaConfigUtil.buildSource(env)
.map((MapFunction<String, String>) kafkaMessage -> {
JSONObject jsonobject = null;
try {
jsonobject = JSONObject.parseObject(kafkaMessage);
} catch (Exception e) {
LOG.warn("報(bào)文格式錯(cuò)誤:{}", kafkaMessage);
}
if (null == jsonobject || jsonobject.isEmpty()) {
LOG.warn("報(bào)文內(nèi)容不合法:{}", JSONObject.toJSONString(jsonobject));
} else {
if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
&& !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
LOG.warn("報(bào)文所屬服務(wù)不存在:{}", JSONObject.toJSONString(jsonobject));
}
}
return JSONObject.toJSONString(jsonobject);
});
// 定義一個(gè)切分(旁路輸出)
final OutputTag<String> outputTag = new OutputTag<String>("Spark_END") {
};
SingleOutputStreamOperator<String> sp = mainMessage
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(
String s
, Context context
, Collector<String> collector) throws Exception {
// 向常規(guī)流(主流)中添加數(shù)據(jù)
collector.collect(s);
// 向旁路輸出流中添加數(shù)據(jù)
if (s.contains(AppPhaseEnum.Spark_APP_End.getValue())) {
context.output(outputTag, s);
}
}
});
sp.map((MapFunction<String, String>) sparkMessage -> {
LOG.info("主流的數(shù)據(jù): {}", sparkMessage);
return sparkMessage;
});
DataStream<String> tag = sp.getSideOutput(outputTag);
tag.map((MapFunction<String, String>) sparkMessage -> {
LOG.info("旁路[{}]的數(shù)據(jù): {}", outputTag.getId(), sparkMessage);
return sparkMessage;
});
env.execute("Flink Streaming Java API Skeleton");
}
SideOutPut 是 Flink 框架推薦的分流方法,在使用 SideOutPut 時(shí),需要按照以下步驟進(jìn)行:
-
為每個(gè)分支流定義一個(gè) SideOutPut。
-
為定義好的 SideOutPut發(fā)出數(shù)據(jù)。只有以下特定的函數(shù)才能通過(guò)Context上下文對(duì)象,向旁路輸出的SideOutPut發(fā)送數(shù)據(jù)。
- ProcessFunction:處理函數(shù),單流輸入函數(shù)
- KeyedProcessFunction:處理函數(shù),單流輸入函數(shù)
- CoProcessFunction:處理函數(shù),雙流流輸入函數(shù)
- KeyedCoProcessFunction:處理函數(shù),雙流流輸入函數(shù)
- ProcessWindowFunction:窗口函數(shù),全量計(jì)算函數(shù)
- ProcessAllWindowFunction:窗口函數(shù),全量計(jì)算函數(shù),它與 ProcessWindowFunction 類(lèi)似,但是它會(huì)對(duì)窗口中的所有數(shù)據(jù)進(jìn)行處理,而不是僅處理觸發(fā)窗口計(jì)算的數(shù)據(jù)。
例子中使用ProcessFunction實(shí)現(xiàn)流拆分。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-687699.html
-
根據(jù)SideOutPut 的ID標(biāo)識(shí)獲取旁路輸出流,進(jìn)行數(shù)據(jù)繼續(xù)處理。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-687699.html
拆分方式 | 對(duì)比 |
---|---|
Split | 不支持鏈?zhǔn)讲鸱郑蟹值玫降牧?,是不能進(jìn)行再次切分的 |
Fliter | 多分支流,需要多次遍歷原始流進(jìn)行篩選。浪費(fèi)集群的資源 |
SideOut | 以多次進(jìn)行拆分的,支持鏈?zhǔn)讲鸱帧?/td> |
到了這里,關(guān)于【Flink實(shí)戰(zhàn)】Flink中的分流的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!