問題背景:cep模板 對數(shù)據(jù)源設置分組條件后,告警的數(shù)據(jù),和分組條件對不上, 摻雜了,其他的不同組的數(shù)據(jù),產生了告警
策略條件:
選擇了兩個kafka的的topic的數(shù)據(jù)作為數(shù)據(jù)源,
對A 數(shù)據(jù)源 test-topic1, 進行條件過濾, 過濾條件為:login_type? = 1
對B 數(shù)據(jù)源 test-topic2,進行條件過濾,過濾條件為:login_type =? 2
分組條件 為? ?src_ip,hostname兩個字段進行分組
進行followby 關聯(lián)。時間關聯(lián)的最大時間間隔為? 60秒
運行并行度設置為3
通過SourceStream打印的原始數(shù)據(jù):
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
1> {"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type?":"2"}
經過cep處理后,產了告警
產生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"2"}]}
經過src_ip,和hostname分組后, 理論上應該只分組后的相同的 scr_ip,hostname進行事件關聯(lián)告警
結果其他的分組數(shù)據(jù)也參和進來關聯(lián)告警了。?
期望的是? login_type = 1 出現(xiàn)至少兩次, 接著login_type=2的至少出現(xiàn)1次,且相同的src_ip和hostname
然后結果是下面數(shù)據(jù)也產生了告警。
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":2}
懷疑是分組沒生效。
然后debug數(shù)據(jù)源那塊的方法kafkaStreamSource() 里面有進行分組,debug后發(fā)現(xiàn)確實也進行了keyby
后來找不到其他問題,糾結了下, 懷疑是不是 KeyedSteam.union(KeyedStream)后得到的就不是一個KeyedSteam了。 所以
出現(xiàn)問題的原始代碼數(shù)據(jù)源代碼:
//程序具體執(zhí)行流程
DataStream<JSONObject> sourceStream = SourceProcess.getKafkaStream(env, rule);
DataStream<JSONObject> resultStream = TransformProcess.process(sourceStream, rule);
SinkProcess.sink(resultStream, rule);
public static DataStream<JSONObject> getKafkaStream(StreamExecutionEnvironment env, Rule rule) {
DataStream<JSONObject> inputStream = null;
List<Event> events = rule.getEvents();
if (events.size() > SharingConstant.NUMBER_ZERO) {
for (Event event : events) {
FlinkKafkaConsumer<JSONObject> kafkaConsumer =
new KafkaSourceFunction(rule, event).init();
if (inputStream != null) {
// 多條 stream 合成一條 stream
inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));
} else {
// 只有一條 stream
inputStream = kafkaStreamSource(env, event, rule, kafkaConsumer);
}
}
}
return inputStream;
}
private static DataStream<JSONObject> kafkaStreamSource(
StreamExecutionEnvironment env,
Event event,
Rule rule,
FlinkKafkaConsumer<JSONObject> kafkaConsumer) {
DataStream<JSONObject> inputStream = env.addSource(kafkaConsumer);
// 對多個黑白名單查詢進行循環(huán)
String conditions = event.getConditions();
while (conditions.contains(SharingConstant.ARGS_NAME)) {
// 使用新的redis 數(shù)據(jù)結構,進行 s.include 過濾
inputStream = AsyncDataStream.orderedWait(inputStream,new RedisNameListFilterSourceFunction(s,rule.getSettings().getRedis()),30,TimeUnit.SECONDS,2000);
conditions = conditions.replace(s, "");
}
// 一般過濾處理
inputStream = AsyncDataStream.orderedWait(inputStream,
new Redis3SourceFunction(event, rule.getSettings().getRedis()), 30, TimeUnit.SECONDS, 2000);
// kafka source 進行 keyBy 處理
return KeyedByStream.keyedBy(inputStream, rule.getGroupBy());
}
public static DataStream<JSONObject> keyedBy(
DataStream<JSONObject> input, Map<String, String> groupBy) {
if (null == groupBy || groupBy.isEmpty() ||"".equals(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO])){
return input;
}
return input.keyBy(
new TwoEventKeySelector(
groupBy.values().toArray()[SharingConstant.NUMBER_ZERO].toString()));
}
public class TwoEventKeySelector implements KeySelector<JSONObject, String> {
private static final long serialVersionUID = 8534968406068735616L;
private final String groupBy;
public TwoEventKeySelector(String groupBy) {
this.groupBy = groupBy;
}
@Override
public String getKey(JSONObject event) {
StringBuilder keys = new StringBuilder();
for (String key : groupBy.split(SharingConstant.DELIMITER_COMMA)) {
keys.append(event.getString(key));
}
return keys.toString();
}
}
問題出現(xiàn)在這里:
// 多條 stream 合成一條 stream
? ? ? ? ? ? ? ? ? ? inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));
kafkaStreamSource()這個方法返回的是 KeyedStream ,
兩個KeyedStream unio合并后,? 本來以為返回時KeyedStream,結果確是DataStream類型,
結果導致cep分組不生效,一個告警中出現(xiàn)了其他分組的數(shù)據(jù)。
解決方法, 就是在cep pattern前 根據(jù)是否有分組條件再KeyedBy一次
private static DataStream<JSONObject> patternProcess(DataStream<JSONObject> inputStream, Rule rule) {
PatternGen patternGenerator = new PatternGen(rule.getPatterns(), rule.getWindow().getSize());
Pattern<JSONObject, JSONObject> pattern = patternGenerator.getPattern();
if (!rule.getGroupBy().isEmpty()){
inputStream = KeyedByStream.keyedBy(inputStream, rule.getGroupBy());
}
PatternStream<JSONObject> patternStream = CEP.pattern(inputStream, pattern);
return patternStream.inProcessingTime().select(new RuleSelectFunction(rule.getAlarmInfo(), rule.getSelects()));
輸入數(shù)據(jù):
?{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860300012,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}
?{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860299272,"create_time_desc":"2022-10-27 16:44:59","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}
?{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666860300196,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"2"}
不產生告警,符合預期
再次輸入同分組的數(shù)據(jù):文章來源:http://www.zghlxwxcb.cn/news/detail-427346.html
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}
產生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}]}
告警輸出:{"org_log_id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","event_category_id":1,"event_technique_type":"無","event_description":"1","alarm_first_time":1666860368471,"src_ip":"172.11.11.1","hostname":"hostname1","intelligence_id":"","strategy_category_id":"422596451785379862","intelligence_type":"","id":"cc1cd8cd-a626-4916-bdd3-539ea57e898f","event_nums":3,"event_category_label":"資源開發(fā)","severity":"info","create_time":1666860369647,"strategy_category_name":"網絡威脅分析","rule_name":"ceptest","risk_score":1,"data_center":"guo-sen","baseline":[],"sop_id":"","event_device_type":"無","rule_id":214,"policy_type":"pattern","strategy_category":"/NetThreatAnalysis","internal_event":"1","event_name":"ceptest","event_model_source":"/RuleEngine/OnLine","alarm_last_time":1666860369478}
產生告警符合預期文章來源地址http://www.zghlxwxcb.cn/news/detail-427346.html
到了這里,關于flink cep數(shù)據(jù)源keyby union后 keybe失效的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!