/\*\*
* 設備ID
*/
private Integer deviceId;
/\*\*
* 監(jiān)控的變量名稱
*/
private String varName;
/\*\*
* 最小值
*/
private Double min;
/\*\*
* 最大值
*/
private Double max;
}
##### 報警事件
/**
* 報警消息
*/
@Data
public class AlarmMessage {
/\*\*
* 設備
*/
private Integer deviceId;
/\*\*
* 報警時間
*/
private Long timestamp;
/**
* 觸發(fā)報警的采集變量名稱
*/
private String alarmVar;
/\*\*
* 觸發(fā)報警的采集值
*/
private Number alarmValue;
}
#### 開始實現(xiàn)
public class IotMonitorJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
// 采集數(shù)據Stream
DataStreamSource<IotData> iotDataStream = getIotStream(environment);
// 報警規(guī)則Stream
DataStreamSource<AlarmRule> ruleConfig = getRuleConfig(environment);
// 緩存報警規(guī)則 并監(jiān)控報警數(shù)據
SingleOutputStreamOperator<AlarmMessage> alarmStream = iotDataStream.connect(ruleConfig)
.keyBy(IotData::getDeviceId, AlarmRule::getDeviceId)
.process(new CoProcessFunction<IotData, AlarmRule, AlarmMessage>() {
// 用臨時保存設備的報警規(guī)則 ,這里的狀態(tài)交由flink維護
private MapState<Integer, AlarmRule> alarmRuleValueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化 ValueState
alarmRuleValueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("alarm-rule-state", Integer.class, AlarmRule.class));
}
@Override
public void processElement1(IotData iotData, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
Map<String, Double> data = iotData.getData();
// 遍歷每個規(guī)則
alarmRuleValueState.values().forEach(rule -> {
String varName = rule.getVarName();
// 獲取變量值
Double val = data.get(varName);
if (val == null) {
// 變量里沒有值
return;
}
if (val <= rule.getMin() || val > rule.getMax()) {
// 超過限制,輸出報警信息
AlarmMessage alarmMessage = new AlarmMessage();
alarmMessage.setDeviceId(iotData.getDeviceId());
alarmMessage.setTimestamp(iotData.getTimestamp());
alarmMessage.setAlarmVar(varName);
alarmMessage.setAlarmValue(val);
collector.collect(alarmMessage);
}
});
}
@Override
public void processElement2(AlarmRule alarmRule, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
// 接收到AlarmRule, 僅更新 alarmRuleValueState
alarmRuleValueState.put(alarmRule.getId(), alarmRule);
}
});
alarmStream.print();
environment.execute();
}
/\*\*
* 獲取物聯(lián)采集數(shù)據
*
* @param environment
* @return
*/
private static DataStreamSource getIotStream(StreamExecutionEnvironment environment) {
return environment.addSource(new SourceFunction<>() {
private boolean running = true;
@Override
public void run(SourceContext<IotData> sourceContext) throws Exception {
while (running) {
// 模擬100個設備 每秒一次上報數(shù)據
long ts = System.currentTimeMillis();
ts = ts - ts % 1000;
for (int i = 0; i < 100; i++) {
IotData iotData = new IotData();
iotData.setTimestamp(ts);
iotData.setDeviceId(i);
Map<String, Double> data = new HashMap<>();
data.put("var1", RandomUtils.nextDouble());
data.put("var2", RandomUtils.nextDouble());
iotData.setData(data);
sourceContext.collect(iotData);
}
Thread.sleep(1000 - ts % 1000);
}
}
@Override
public void cancel() {
running = false;
}
});
}
/\*\*
* 獲取規(guī)則配置
*/
public static DataStreamSource getRuleConfig(StreamExecutionEnvironment environment) {
// 僅針對部分設備監(jiān)控
List<AlarmRule> ruleList = new ArrayList<>();
for (int i = 0; i < 20; i++) {
AlarmRule alarmRule1 = new AlarmRule();
alarmRule1.setDeviceId(i);
alarmRule1.setVarName("var1");
alarmRule1.setMax(20.0);
alarmRule1.setMin(0.0);
ruleList.add(alarmRule1);
AlarmRule alarmRule2 = new AlarmRule();
alarmRule2.setDeviceId(i);
alarmRule2.setVarName("var2");
alarmRule2.setMax(10.0);
alarmRule2.setMin(0.0);
ruleList.add(alarmRule2);
}
return environment.fromCollection(ruleList);
}
}
### 啟動job
>
> 實際運行基于 java 11 , flink 1.18.1
>
>
>
輸出結果:
AlarmMessage(deviceId=0, timestamp=1709732511000, alarmVar=var2, alarmValue=1.0408785873261203E308)
AlarmMessage(deviceId=1, timestamp=1709732511000, alarmVar=var2, alarmValue=8.409717342118433E306)
AlarmMessage(deviceId=2, timestamp=1709732511000, alarmVar=var2, alarmValue=6.955367711979709E307)
AlarmMessage(deviceId=3, timestamp=1709732511000, alarmVar=var2, alarmValue=2.5403069646236554E307)
AlarmMessage(deviceId=4, timestamp=1709732511000, alarmVar=var2, alarmValue=7.629789041713245E307)
AlarmMessage(deviceId=5, timestamp=1709732511000, alarmVar=var2, alarmValue=6.918664964996954E307)
AlarmMessage(deviceId=6, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1660434456728436E308)
AlarmMessage(deviceId=7, timestamp=1709732511000, alarmVar=var2, alarmValue=2.1272561368179368E307)
AlarmMessage(deviceId=8, timestamp=1709732511000, alarmVar=var2, alarmValue=2.8693117885744695E307)
AlarmMessage(deviceId=9, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1232501067396574E308)
AlarmMessage(deviceId=10, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6192738031099514E308)
AlarmMessage(deviceId=11, timestamp=1709732511000, alarmVar=var2, alarmValue=7.515829766654446E307)
AlarmMessage(deviceId=12, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6409410780574847E308)
AlarmMessage(deviceId=13, timestamp=1709732511000, alarmVar=var2, alarmValue=7.372363635115241E307)
AlarmMessage(deviceId=14, timestamp=1709732511000, alarmVar=var2, alarmValue=5.269385013806783E306)
AlarmMessage(deviceId=15, timestamp=1709732511000, alarmVar=var2, alarmValue=9.736804956554577E307)
AlarmMessage(deviceId=16, timestamp=1709732511000, alarmVar=var2, alarmValue=5.403962718372102E307)
AlarmMessage(deviceId=17, timestamp=1709732511000, alarmVar=var2, alarmValue=1.7957965318588386E308)
AlarmMessage(deviceId=18, timestamp=1709732511000, alarmVar=var2, alarmValue=6.546384330721207E307)
AlarmMessage(deviceId=19, timestamp=1709732511000, alarmVar=var2, alarmValue=1.2797848722222382E308)
AlarmMessage(deviceId=0, timestamp=1709732512000, alarmVar=var2, alarmValue=8.096850966966417E307)
AlarmMessage(deviceId=1, timestamp=1709732512000, alarmVar=var2, alarmValue=1.1459880504481993E308)
AlarmMessage(deviceId=2, timestamp=1709732512000, alarmVar=var2, alarmValue=1.6878563127635106E308)
AlarmMessage(deviceId=3, timestamp=1709732512000, alarmVar=var2, alarmValue=1.3431398337246118E308)
自我介紹一下,小編13年上海交大畢業(yè),曾經在小公司待過,也去過華為、OPPO等大廠,18年進入阿里一直到現(xiàn)在。
深知大多數(shù)嵌入式工程師,想要提升技能,往往是自己摸索成長或者是報班學習,但對于培訓機構動則幾千的學費,著實壓力不小。自己不成體系的自學效果低效又漫長,而且極易碰到天花板技術停滯不前!
因此收集整理了一份《2024年嵌入式&物聯(lián)網開發(fā)全套學習資料》,初衷也很簡單,就是希望能夠幫助到想自學提升又不知道該從何學起的朋友,同時減輕大家的負擔。
既有適合小白學習的零基礎資料,也有適合3年以上經驗的小伙伴深入學習提升的進階課程,基本涵蓋了95%以上嵌入式&物聯(lián)網開發(fā)知識點,真正體系化!
由于文件比較大,這里只是將部分目錄大綱截圖出來,每個節(jié)點里面都包含大廠面經、學習筆記、源碼講義、實戰(zhàn)項目、講解視頻,并且后續(xù)會持續(xù)更新
如果你覺得這些內容對你有幫助,可以+V:Vip1104z獲?。。?! (備注:嵌入式)

最后
資料整理不易,覺得有幫助的朋友可以幫忙點贊分享支持一下小編~
你的支持,我的動力;祝各位前程似錦,offer不斷,步步高升?。?!文章來源:http://www.zghlxwxcb.cn/news/detail-848193.html
項目、講解視頻,并且后續(xù)會持續(xù)更新**
如果你覺得這些內容對你有幫助,可以+V:Vip1104z獲?。。。?(備注:嵌入式)

最后
資料整理不易,覺得有幫助的朋友可以幫忙點贊分享支持一下小編~
你的支持,我的動力;祝各位前程似錦,offer不斷,步步高升!??!
更多資料點擊此處獲qu!!文章來源地址http://www.zghlxwxcb.cn/news/detail-848193.html
到了這里,關于從零開始搭建flink流式計算項目-2小試牛刀-物聯(lián)網場景下,如何實現(xiàn)設備采集參數(shù)監(jiān)控報警功能的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!