国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink cep數(shù)據(jù)源keyby union后 keybe失效

這篇具有很好參考價值的文章主要介紹了flink cep數(shù)據(jù)源keyby union后 keybe失效。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

問題背景:cep模板 對數(shù)據(jù)源設置分組條件后,告警的數(shù)據(jù),和分組條件對不上, 摻雜了,其他的不同組的數(shù)據(jù),產生了告警

策略條件:

選擇了兩個kafka的的topic的數(shù)據(jù)作為數(shù)據(jù)源,

對A 數(shù)據(jù)源 test-topic1, 進行條件過濾, 過濾條件為:login_type? = 1

對B 數(shù)據(jù)源 test-topic2,進行條件過濾,過濾條件為:login_type =? 2

分組條件 為? ?src_ip,hostname兩個字段進行分組

進行followby 關聯(lián)。時間關聯(lián)的最大時間間隔為? 60秒

運行并行度設置為3

通過SourceStream打印的原始數(shù)據(jù):

2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
1> {"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type?":"2"}

經過cep處理后,產了告警

產生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"2"}]}

經過src_ip,和hostname分組后, 理論上應該只分組后的相同的 scr_ip,hostname進行事件關聯(lián)告警

結果其他的分組數(shù)據(jù)也參和進來關聯(lián)告警了。?

期望的是? login_type = 1 出現(xiàn)至少兩次, 接著login_type=2的至少出現(xiàn)1次,且相同的src_ip和hostname

然后結果是下面數(shù)據(jù)也產生了告警。

{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":2}

懷疑是分組沒生效。

然后debug數(shù)據(jù)源那塊的方法kafkaStreamSource() 里面有進行分組,debug后發(fā)現(xiàn)確實也進行了keyby

后來找不到其他問題,糾結了下, 懷疑是不是 KeyedSteam.union(KeyedStream)后得到的就不是一個KeyedSteam了。 所以

出現(xiàn)問題的原始代碼數(shù)據(jù)源代碼:

//程序具體執(zhí)行流程
 DataStream<JSONObject> sourceStream = SourceProcess.getKafkaStream(env, rule);
            DataStream<JSONObject> resultStream = TransformProcess.process(sourceStream, rule);
            SinkProcess.sink(resultStream, rule);



public static DataStream<JSONObject> getKafkaStream(StreamExecutionEnvironment env, Rule rule) {
        DataStream<JSONObject> inputStream = null;
        List<Event> events = rule.getEvents();
        if (events.size() > SharingConstant.NUMBER_ZERO) {
            for (Event event : events) {
                FlinkKafkaConsumer<JSONObject> kafkaConsumer =
                        new KafkaSourceFunction(rule, event).init();
                if (inputStream != null) {
                    // 多條 stream 合成一條 stream
                    inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));
                } else {
                    // 只有一條 stream
                    inputStream = kafkaStreamSource(env, event, rule, kafkaConsumer);
                }
            }
        }
        return inputStream;
    }



private static DataStream<JSONObject> kafkaStreamSource(
            StreamExecutionEnvironment env,
            Event event,
            Rule rule,
            FlinkKafkaConsumer<JSONObject> kafkaConsumer) {
        DataStream<JSONObject> inputStream = env.addSource(kafkaConsumer);

        // 對多個黑白名單查詢進行循環(huán)
        String conditions = event.getConditions();
        while (conditions.contains(SharingConstant.ARGS_NAME)) {
            // 使用新的redis 數(shù)據(jù)結構,進行 s.include 過濾
            inputStream = AsyncDataStream.orderedWait(inputStream,new RedisNameListFilterSourceFunction(s,rule.getSettings().getRedis()),30,TimeUnit.SECONDS,2000);

            conditions = conditions.replace(s, "");
        }
        // 一般過濾處理
        inputStream = AsyncDataStream.orderedWait(inputStream,
                new Redis3SourceFunction(event, rule.getSettings().getRedis()), 30, TimeUnit.SECONDS, 2000);

        // kafka source 進行 keyBy 處理
        return KeyedByStream.keyedBy(inputStream, rule.getGroupBy());
    }

public static DataStream<JSONObject> keyedBy(
            DataStream<JSONObject> input, Map<String, String> groupBy) {
        if (null == groupBy || groupBy.isEmpty() ||"".equals(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO])){
            return input;
        }
        return input.keyBy(
                new TwoEventKeySelector(
                        groupBy.values().toArray()[SharingConstant.NUMBER_ZERO].toString()));
    }

public class TwoEventKeySelector implements KeySelector<JSONObject, String> {
    private static final long serialVersionUID = 8534968406068735616L;
    private final String groupBy;

    public TwoEventKeySelector(String groupBy) {
        this.groupBy = groupBy;
    }

    @Override
    public String getKey(JSONObject event) {
        StringBuilder keys = new StringBuilder();
        for (String key : groupBy.split(SharingConstant.DELIMITER_COMMA)) {
            keys.append(event.getString(key));
        }
        return keys.toString();
    }
}

問題出現(xiàn)在這里:

// 多條 stream 合成一條 stream
? ? ? ? ? ? ? ? ? ? inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));

kafkaStreamSource()這個方法返回的是 KeyedStream ,

兩個KeyedStream unio合并后,? 本來以為返回時KeyedStream,結果確是DataStream類型,

結果導致cep分組不生效,一個告警中出現(xiàn)了其他分組的數(shù)據(jù)。

解決方法, 就是在cep pattern前 根據(jù)是否有分組條件再KeyedBy一次

  private static DataStream<JSONObject> patternProcess(DataStream<JSONObject> inputStream, Rule rule) {
        PatternGen patternGenerator = new PatternGen(rule.getPatterns(), rule.getWindow().getSize());
        Pattern<JSONObject, JSONObject> pattern = patternGenerator.getPattern();

        if (!rule.getGroupBy().isEmpty()){
            inputStream = KeyedByStream.keyedBy(inputStream, rule.getGroupBy());
        }

        PatternStream<JSONObject> patternStream = CEP.pattern(inputStream, pattern);
        return patternStream.inProcessingTime().select(new RuleSelectFunction(rule.getAlarmInfo(), rule.getSelects()));

輸入數(shù)據(jù):

?{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860300012,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}
?{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860299272,"create_time_desc":"2022-10-27 16:44:59","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}
?{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666860300196,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"2"}

不產生告警,符合預期

再次輸入同分組的數(shù)據(jù):

2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}
產生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}]}
告警輸出:{"org_log_id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","event_category_id":1,"event_technique_type":"無","event_description":"1","alarm_first_time":1666860368471,"src_ip":"172.11.11.1","hostname":"hostname1","intelligence_id":"","strategy_category_id":"422596451785379862","intelligence_type":"","id":"cc1cd8cd-a626-4916-bdd3-539ea57e898f","event_nums":3,"event_category_label":"資源開發(fā)","severity":"info","create_time":1666860369647,"strategy_category_name":"網絡威脅分析","rule_name":"ceptest","risk_score":1,"data_center":"guo-sen","baseline":[],"sop_id":"","event_device_type":"無","rule_id":214,"policy_type":"pattern","strategy_category":"/NetThreatAnalysis","internal_event":"1","event_name":"ceptest","event_model_source":"/RuleEngine/OnLine","alarm_last_time":1666860369478}

產生告警符合預期文章來源地址http://www.zghlxwxcb.cn/news/detail-427346.html

到了這里,關于flink cep數(shù)據(jù)源keyby union后 keybe失效的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Flink學習之旅:(三)Flink源算子(數(shù)據(jù)源)

    Flink學習之旅:(三)Flink源算子(數(shù)據(jù)源)

    ? ? ? ? Flink可以從各種數(shù)據(jù)源獲取數(shù)據(jù),然后構建DataStream 進行處理轉換。source就是整個數(shù)據(jù)處理程序的輸入端。 數(shù)據(jù)集合 數(shù)據(jù)文件 Socket數(shù)據(jù) kafka數(shù)據(jù) 自定義Source ? ? ? ? 創(chuàng)建 FlinkSource_List 類,再創(chuàng)建個 Student 類(姓名、年齡、性別三個屬性就行,反正測試用) 運行結果

    2024年02月06日
    瀏覽(28)
  • 【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    CDC是Change Data Capture的縮寫,中文意思是 變更數(shù)據(jù)獲取 ,flink-cdc的作用是,通過flink捕獲數(shù)據(jù)源的事務變動操作記錄,包括數(shù)據(jù)的增刪改操作等,根據(jù)這些記錄可作用于對目標端進行實時數(shù)據(jù)同步。 下圖是flink-cdc最新支持的數(shù)據(jù)源類型: kafka的數(shù)據(jù)源要通過flink-cdc進行實時數(shù)

    2024年02月12日
    瀏覽(36)
  • flink重溫筆記(二):Flink 流批一體 API 開發(fā)——Source 數(shù)據(jù)源操作

    前言:今天是第二天啦!開始學習 Flink 流批一體化開發(fā)知識點,重點學習了各類數(shù)據(jù)源的導入操作,我發(fā)現(xiàn)學習編程需要分類記憶,一次一次地猜想 api 作用,然后通過敲代碼印證自己的想法,以此理解知識點,加深對api的理解和應用。 Tips:我覺得學習 Flink 還是挺有意思的

    2024年02月19日
    瀏覽(24)
  • flink如何初始化kafka數(shù)據(jù)源的消費偏移

    我們知道在日常非flink場景中消費kafka主題時,我們只要指定了消費者組,下次程序重新消費時是可以從上次消費停止時的消費偏移開始繼續(xù)消費的,這得益于kafka的_offset_主題保存的關于消費者組和topic偏移位置的具體偏移信息,那么flink應用中重啟flink應用時,flink是從topic的什

    2024年02月16日
    瀏覽(31)
  • Flink讀取數(shù)據(jù)的5種方式(文件,Socket,Kafka,MySQL,自定義數(shù)據(jù)源)

    這是最簡單的數(shù)據(jù)讀取方式。當需要進行功能測試時,可以將數(shù)據(jù)保存在文件中,讀取后驗證流處理的邏輯是否符合預期。 程序代碼: 輸出結果 用于驗證一些通過Socket傳輸數(shù)據(jù)的場景非常方便。 程序代碼: 測試時,需要先在 172.16.3.6 的服務器上啟動 nc ,然后再啟動Flink讀

    2024年02月16日
    瀏覽(21)
  • 基于大數(shù)據(jù)平臺(XSailboat)的計算管道實現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    基于大數(shù)據(jù)平臺(XSailboat)的計算管道實現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    筆者在先前的一篇文檔《數(shù)據(jù)標簽設計 – 大數(shù)據(jù)平臺(XSailboat)的數(shù)據(jù)標簽模塊》 提到了關于數(shù)據(jù)標簽的模塊,現(xiàn)已實現(xiàn)并應用于項目中。在項目中遇到這樣一種情形: 如果打標信息和業(yè)務數(shù)據(jù)是在一個數(shù)據(jù)庫實例中,那么只需要連接兩張表進行查詢即可。但是數(shù)據(jù)標簽作為

    2024年01月17日
    瀏覽(35)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的實時綜合案例(二)數(shù)據(jù)源

    基于Flume+Kafka+Hbase+Flink+FineBI的實時綜合案例(二)數(shù)據(jù)源

    目標 : 了解數(shù)據(jù)源的格式及實現(xiàn)模擬數(shù)據(jù)的生成 路徑 step1:數(shù)據(jù)格式 step2:數(shù)據(jù)生成 實施 數(shù)據(jù)格式 消息時間 發(fā)件人昵稱 發(fā)件人賬號 發(fā)件人性別 發(fā)件人IP 發(fā)件人系統(tǒng) 發(fā)件人手機型號 發(fā)件人網絡制式 發(fā)件人GPS 收件人昵稱 收件人IP 收件人賬號 收件人系統(tǒng) 收件人手機型號

    2024年02月04日
    瀏覽(47)
  • 【flink番外篇】15、Flink維表實戰(zhàn)之6種實現(xiàn)方式-維表來源于第三方數(shù)據(jù)源

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年01月21日
    瀏覽(27)
  • Flink CDC 2.4 正式發(fā)布,新增 Vitess 數(shù)據(jù)源,更多連接器支持增量快照,升級 Debezium 版本

    Flink CDC 2.4 正式發(fā)布,新增 Vitess 數(shù)據(jù)源,更多連接器支持增量快照,升級 Debezium 版本

    Flink CDC [1] 是基于數(shù)據(jù)庫的日志 CDC 技術,實現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。配合 Flink 優(yōu)秀的管道能力和豐富的上下游生態(tài),F(xiàn)link CDC 可以高效實現(xiàn)海量數(shù)據(jù)的實時集成。 作為新一代的實時數(shù)據(jù)集成框架,F(xiàn)link CDC 具有全增量一體化、無鎖讀取、并行讀取、表結構變更

    2024年02月12日
    瀏覽(23)
  • Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實現(xiàn))

    Flink(七)Flink四大基石之Time和WaterMaker詳解與詳細示例(watermaker基本使用、kafka作為數(shù)據(jù)源的watermaker使用示例以及超出最大允許延遲數(shù)據(jù)的接收實現(xiàn))

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年02月15日
    瀏覽(43)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包