国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能

這篇具有很好參考價值的文章主要介紹了從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

/\*\*

* 設備ID
*/
private Integer deviceId;

/\*\*

* 監(jiān)控的變量名稱
*/
private String varName;

/\*\*

* 最小值
*/
private Double min;

/\*\*

* 最大值
*/
private Double max;

}


##### 報警事件



/**
* 報警消息
*/
@Data
public class AlarmMessage {

/\*\*

* 設備
*/
private Integer deviceId;

/\*\*

* 報警時間
*/
private Long timestamp;
/**
* 觸發(fā)報警的采集變量名稱
*/
private String alarmVar;

/\*\*

* 觸發(fā)報警的采集值
*/
private Number alarmValue;
}


#### 開始實現(xiàn)



public class IotMonitorJob {

public static void main(String[] args) throws Exception {


    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setParallelism(1);

    // 采集數(shù)據Stream
    DataStreamSource<IotData> iotDataStream = getIotStream(environment);
    // 報警規(guī)則Stream
    DataStreamSource<AlarmRule> ruleConfig = getRuleConfig(environment);
    // 緩存報警規(guī)則 并監(jiān)控報警數(shù)據
    SingleOutputStreamOperator<AlarmMessage> alarmStream = iotDataStream.connect(ruleConfig)
            .keyBy(IotData::getDeviceId, AlarmRule::getDeviceId)
            .process(new CoProcessFunction<IotData, AlarmRule, AlarmMessage>() {

                // 用臨時保存設備的報警規(guī)則 ,這里的狀態(tài)交由flink維護
                private MapState<Integer, AlarmRule> alarmRuleValueState;

                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    // 初始化 ValueState
                    alarmRuleValueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("alarm-rule-state", Integer.class, AlarmRule.class));
                }

                @Override
                public void processElement1(IotData iotData, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
                    Map<String, Double> data = iotData.getData();

                    // 遍歷每個規(guī)則
                    alarmRuleValueState.values().forEach(rule -> {

                        String varName = rule.getVarName();
                        // 獲取變量值
                        Double val = data.get(varName);
                        if (val == null) {
                            // 變量里沒有值
                            return;
                        }

                        if (val <= rule.getMin() || val > rule.getMax()) {
                            // 超過限制,輸出報警信息
                            AlarmMessage alarmMessage = new AlarmMessage();
                            alarmMessage.setDeviceId(iotData.getDeviceId());
                            alarmMessage.setTimestamp(iotData.getTimestamp());
                            alarmMessage.setAlarmVar(varName);
                            alarmMessage.setAlarmValue(val);
                            collector.collect(alarmMessage);
                        }
                    });


                }

                @Override
                public void processElement2(AlarmRule alarmRule, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
                    // 接收到AlarmRule, 僅更新 alarmRuleValueState
                    alarmRuleValueState.put(alarmRule.getId(), alarmRule);
                }
            });


    alarmStream.print();
    environment.execute();
}

/\*\*

* 獲取物聯(lián)采集數(shù)據
*
* @param environment
* @return
*/
private static DataStreamSource getIotStream(StreamExecutionEnvironment environment) {
return environment.addSource(new SourceFunction<>() {
private boolean running = true;

        @Override
        public void run(SourceContext<IotData> sourceContext) throws Exception {
            while (running) {

                // 模擬100個設備 每秒一次上報數(shù)據

                long ts = System.currentTimeMillis();
                ts = ts - ts % 1000;

                for (int i = 0; i < 100; i++) {
                    IotData iotData = new IotData();
                    iotData.setTimestamp(ts);
                    iotData.setDeviceId(i);

                    Map<String, Double> data = new HashMap<>();
                    data.put("var1", RandomUtils.nextDouble());
                    data.put("var2", RandomUtils.nextDouble());
                    iotData.setData(data);

                    sourceContext.collect(iotData);
                }

                Thread.sleep(1000 - ts % 1000);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    });
}


/\*\*

* 獲取規(guī)則配置
*/
public static DataStreamSource getRuleConfig(StreamExecutionEnvironment environment) {
// 僅針對部分設備監(jiān)控

    List<AlarmRule> ruleList = new ArrayList<>();
    for (int i = 0; i < 20; i++) {
        AlarmRule alarmRule1 = new AlarmRule();
        alarmRule1.setDeviceId(i);
        alarmRule1.setVarName("var1");
        alarmRule1.setMax(20.0);
        alarmRule1.setMin(0.0);
        ruleList.add(alarmRule1);

        AlarmRule alarmRule2 = new AlarmRule();
        alarmRule2.setDeviceId(i);
        alarmRule2.setVarName("var2");
        alarmRule2.setMax(10.0);
        alarmRule2.setMin(0.0);
        ruleList.add(alarmRule2);

    }
    return environment.fromCollection(ruleList);
}

}


### 啟動job



> 
> 實際運行基于 java 11 , flink 1.18.1
> 
> 
> 


輸出結果:



AlarmMessage(deviceId=0, timestamp=1709732511000, alarmVar=var2, alarmValue=1.0408785873261203E308)
AlarmMessage(deviceId=1, timestamp=1709732511000, alarmVar=var2, alarmValue=8.409717342118433E306)
AlarmMessage(deviceId=2, timestamp=1709732511000, alarmVar=var2, alarmValue=6.955367711979709E307)
AlarmMessage(deviceId=3, timestamp=1709732511000, alarmVar=var2, alarmValue=2.5403069646236554E307)
AlarmMessage(deviceId=4, timestamp=1709732511000, alarmVar=var2, alarmValue=7.629789041713245E307)
AlarmMessage(deviceId=5, timestamp=1709732511000, alarmVar=var2, alarmValue=6.918664964996954E307)
AlarmMessage(deviceId=6, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1660434456728436E308)
AlarmMessage(deviceId=7, timestamp=1709732511000, alarmVar=var2, alarmValue=2.1272561368179368E307)
AlarmMessage(deviceId=8, timestamp=1709732511000, alarmVar=var2, alarmValue=2.8693117885744695E307)
AlarmMessage(deviceId=9, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1232501067396574E308)
AlarmMessage(deviceId=10, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6192738031099514E308)
AlarmMessage(deviceId=11, timestamp=1709732511000, alarmVar=var2, alarmValue=7.515829766654446E307)
AlarmMessage(deviceId=12, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6409410780574847E308)
AlarmMessage(deviceId=13, timestamp=1709732511000, alarmVar=var2, alarmValue=7.372363635115241E307)
AlarmMessage(deviceId=14, timestamp=1709732511000, alarmVar=var2, alarmValue=5.269385013806783E306)
AlarmMessage(deviceId=15, timestamp=1709732511000, alarmVar=var2, alarmValue=9.736804956554577E307)
AlarmMessage(deviceId=16, timestamp=1709732511000, alarmVar=var2, alarmValue=5.403962718372102E307)
AlarmMessage(deviceId=17, timestamp=1709732511000, alarmVar=var2, alarmValue=1.7957965318588386E308)
AlarmMessage(deviceId=18, timestamp=1709732511000, alarmVar=var2, alarmValue=6.546384330721207E307)
AlarmMessage(deviceId=19, timestamp=1709732511000, alarmVar=var2, alarmValue=1.2797848722222382E308)
AlarmMessage(deviceId=0, timestamp=1709732512000, alarmVar=var2, alarmValue=8.096850966966417E307)
AlarmMessage(deviceId=1, timestamp=1709732512000, alarmVar=var2, alarmValue=1.1459880504481993E308)
AlarmMessage(deviceId=2, timestamp=1709732512000, alarmVar=var2, alarmValue=1.6878563127635106E308)
AlarmMessage(deviceId=3, timestamp=1709732512000, alarmVar=var2, alarmValue=1.3431398337246118E308)
自我介紹一下,小編13年上海交大畢業(yè),曾經在小公司待過,也去過華為、OPPO等大廠,18年進入阿里一直到現(xiàn)在。

深知大多數(shù)嵌入式工程師,想要提升技能,往往是自己摸索成長或者是報班學習,但對于培訓機構動則幾千的學費,著實壓力不小。自己不成體系的自學效果低效又漫長,而且極易碰到天花板技術停滯不前!

因此收集整理了一份《2024年嵌入式&物聯(lián)網開發(fā)全套學習資料》,初衷也很簡單,就是希望能夠幫助到想自學提升又不知道該從何學起的朋友,同時減輕大家的負擔。

從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能,程序員,嵌入式

從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能,程序員,嵌入式

從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能,程序員,嵌入式

既有適合小白學習的零基礎資料,也有適合3年以上經驗的小伙伴深入學習提升的進階課程,基本涵蓋了95%以上嵌入式&物聯(lián)網開發(fā)知識點,真正體系化!

從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能,程序員,嵌入式

從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能,程序員,嵌入式

由于文件比較大,這里只是將部分目錄大綱截圖出來,每個節(jié)點里面都包含大廠面經、學習筆記、源碼講義、實戰(zhàn)項目、講解視頻,并且后續(xù)會持續(xù)更新

如果你覺得這些內容對你有幫助,可以+V:Vip1104z獲?。。?! (備注:嵌入式)

從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能,程序員,嵌入式

最后

資料整理不易,覺得有幫助的朋友可以幫忙點贊分享支持一下小編~

你的支持,我的動力;祝各位前程似錦,offer不斷,步步高升?。?!

項目、講解視頻,并且后續(xù)會持續(xù)更新**

如果你覺得這些內容對你有幫助,可以+V:Vip1104z獲?。。。?(備注:嵌入式)

從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能,程序員,嵌入式

最后

資料整理不易,覺得有幫助的朋友可以幫忙點贊分享支持一下小編~

你的支持,我的動力;祝各位前程似錦,offer不斷,步步高升!??!

更多資料點擊此處獲qu!!文章來源地址http://www.zghlxwxcb.cn/news/detail-848193.html

到了這里,關于從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Flink + MySQL 流式計算數(shù)據分析

    作者:禪與計算機程序設計藝術 大數(shù)據時代,海量的數(shù)據源源不斷涌入到互聯(lián)網、移動應用、企業(yè)數(shù)據庫等各個領域,同時這些數(shù)據也逐漸成為各種業(yè)務場景中的主要輸入數(shù)據。如何在短時間內對海量數(shù)據進行處理、分析并得出有價值的信息,已經成為當今社會越來越關注的

    2024年02月06日
    瀏覽(25)
  • 【Elasticsearch】從零開始搭建ES8集群并且集成到Springboot,更好的服務電商類等需要全文索引的項目(一)

    【Elasticsearch】從零開始搭建ES8集群并且集成到Springboot,更好的服務電商類等需要全文索引的項目(一)

    最近公司的電商項目越來越龐大,功能需求點也越來越多,各種C端對查詢和檢索的要求也越來越高,是時候在項目中引入全文檢索了。 ElasticSearch 是一個基于 Lucene 的搜索服務器,它提供了一個分布式多用戶能力的全文搜索引擎,并且是基于Java 開發(fā)的,我記得很久之前ES還不

    2024年02月15日
    瀏覽(23)
  • Flink流式計算狀態(tài)檢查點與恢復

    Flink流式計算狀態(tài)檢查點與恢復 Apache Flink是一個流處理框架,用于實時數(shù)據處理和分析。Flink可以處理大規(guī)模數(shù)據流,并提供一種高效、可靠的方法來處理和分析這些數(shù)據。Flink流式計算狀態(tài)檢查點與恢復是流處理的關鍵組件,它們確保Flink應用程序在故障時能夠恢復并繼續(xù)處

    2024年02月19日
    瀏覽(25)
  • 從零開始快速構建自己的Flink應用

    從零開始快速構建自己的Flink應用

    本文介紹如何在 mac 下快速構建屬于自己的 Flink 應用。 在 mac 上使用homebrew安裝 flink: 查看安裝的位置: 進入安裝目錄,啟動 flink 集群: 進入 web 頁面:http://localhost:8081/ 基于模板直接構建一個項目: 在項目的 DataStreamJob 類實現(xiàn)如下計數(shù)的功能: 在上面的例子中,我們使用

    2024年02月20日
    瀏覽(23)
  • 從零開始搭建web組態(tài)

    從零開始搭建web組態(tài)

    成果展示:by組態(tài)[web組態(tài)插件] 目前只有兩種選擇,canvas和svg Canvas: 是一個基于像素的渲染引擎,使用JavaScript API在畫布上繪制圖像,它的優(yōu)點包括: Canvas渲染速度快,適合處理大量圖像和高度動態(tài)的圖像。 可以直接操作像素,能夠創(chuàng)建高質量、流暢的動畫效果。 Canvas可用于

    2024年04月23日
    瀏覽(19)
  • 從零開始搭建群眾權益平臺(一)

    本次的平臺我們名為群眾權益維護平臺,我們將講解整體的思路,涉及到很多內容,我將給出一份簡化的示例,包含了網頁的基本結構、前端和后端代碼,以及部署的基本步驟。 技術棧使用:HTML,CSS,JavaScript(前端),Node.js(后端),MongoDB(數(shù)據庫),Heroku(部署)。 這

    2024年02月09日
    瀏覽(21)
  • 從零開始搭建群眾權益平臺(五)

    本篇博客我們將實現(xiàn) 驗證新的用戶名或電子郵件,文件上傳,支付,通知等內容 驗證新的用戶名或電子郵件: 在更新用戶信息的路由中,我們需要確保新的用戶名或電子郵件還沒有被其他用戶使用: 輸入驗證: 對用戶輸入進行驗證非常重要,以

    2024年02月09日
    瀏覽(16)
  • 從零開始學架構-計算高性能

    從零開始學架構-計算高性能

    ????????高性能是每個程序員的追求,無論做一個系統(tǒng)、還是寫一組代碼,都希望能夠達到高性能的效果。而高性能又是最復雜的一環(huán),磁盤、操作系統(tǒng)、CPU、內存、緩存、網絡、編程語言、數(shù)據庫、架構等,每個都可能影響系統(tǒng)的高性能,一行不恰當?shù)?debug 日志,一個

    2023年04月24日
    瀏覽(34)
  • 從零開始搭建STM32CubeMX開發(fā)環(huán)境

    從零開始搭建STM32CubeMX開發(fā)環(huán)境

    本文記錄一下如何從零開始使用STM32CubeMX,包括軟件的安裝,環(huán)境的搭建,配置代碼的生成等; 本文以STM32G030C8T6為例,如果你的單片機不是以STM32G030C8T6為例,換成你的單片機類型即可,過程都是通用的; STM32CubeMX 是意法半導體推出的針對STM32 系列芯片的圖形化配置工具,通

    2024年02月12日
    瀏覽(33)
  • 從零開始搭建家庭網絡:軟路由實戰(zhàn)經驗分享(一)

    從零開始搭建家庭網絡:軟路由實戰(zhàn)經驗分享(一)

    最近入門了軟路由,研究了半個月,一步一步從網絡小白到最后自己搭建了家庭局域網絡,現(xiàn)在給大家分享一下我搭建軟路由的經驗。 既然有軟路由,那么相對的肯定有硬路由:目前我們網上買到的路由器,就是硬路由,這種從一開始就是 按照路由器設計規(guī)范設計出來的硬

    2024年02月02日
    瀏覽(93)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包