-
前提須知:websocket基本使用
-
業(yè)務場景,每秒推送統(tǒng)計數(shù)據(jù)給前端頁面,分別顯示前天,昨天,今天的前十名客戶數(shù)據(jù)文章來源:http://www.zghlxwxcb.cn/news/detail-551371.html
連接觸發(fā)業(yè)務定義
-
@ServerEndpoint("/smsMCustomerStaTop10Ws")
定義推送數(shù)據(jù)給到具體的連接標識
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xyc.monitor.service.SmsMonitorService;
import com.xyc.monitor.service.impl.SmsMonitorServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* Top10客戶發(fā)送量推送
*/
@Component
@ServerEndpoint(value = "/smsMCustomerStaTop10Ws")
public class SmsMCustomerStaTop10Ws {
private static final Logger LOGGER = LoggerFactory.getLogger(SmsMCustomerStaTop10Ws.class);
//靜態(tài)變量,用來記錄當前在線連接數(shù)。應該把它設計成線程安全的。
private static int onlineCount = 0;
//concurrent包的線程安全Set,用來存放每個客戶端對應的當前對象。
private static Set<SmsMCustomerStaTop10Ws> webSocketSet = new CopyOnWriteArraySet<SmsMCustomerStaTop10Ws>();
private static Map<Session,Integer> map = new ConcurrentHashMap<>();
//與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
private Session session;
//客戶端傳過來的參數(shù) ,ip:port
private String parameter;
private static SmsMonitorService smsMonitorService;
@Autowired
public void setSmsMonitorService(SmsMonitorService smsMonitorService) {
SmsMCustomerStaTop10Ws.smsMonitorService = smsMonitorService;
}
/**
* 連接建立成功調(diào)用的方法
* @param session
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在線數(shù)加1
LOGGER.info("SmsMCustomerStaTop10Ws ===> 有新連接加入!當前在線人數(shù)為" + getOnlineCount());
}
/**
* 連接關閉調(diào)用的方法
*/
@OnClose
public void onClose() {
map.remove(this.session);
//UDPMsgQueueStatusClient.removeUdpClient(parameter);
webSocketSet.remove(this); //從set中刪除
subOnlineCount(); //在線數(shù)減1
LOGGER.info("SmsMCustomerStaTop10Ws ===> 有一連接關閉!當前在線人數(shù)為" + getOnlineCount());
}
/**
* 收到客戶端消息后調(diào)用的方法
*
* @param message 客戶端發(fā)送過來的消息
*
* */
@OnMessage
public void onMessage(String message, Session session) {
//關閉前一個udp
//UDPMsgQueueStatusClient.removeUdpClient(parameter);
//this.parameter = message;
JSONArray objects = JSON.parseArray(message);
//JSONObject jso = JSON.parseObject(message);
Integer status= objects.getJSONObject(0).getInteger("status");
//SmsMonitorServiceImpl.statusSCSSo = status;
map.put(session,status);
LOGGER.info("SmsMCustomerStaTop10Ws ===> 來自客戶端的消息:" + message);
}
/**
* 發(fā)生錯誤時調(diào)用
*/
@OnError
public void onError(Session session, Throwable error) {
LOGGER.info("SmsMCustomerStaTop10Ws 發(fā)生錯誤");
LOGGER.error("onError ===> ", error);
}
/**
* 推送觸發(fā)
*/
private static void pushTrigger() {
if(SmsMCustomerStaTop10Ws.onlineCount >= 1) {
if(!SmsMonitorServiceImpl.isPushSCS)
SmsMonitorServiceImpl.isPushSCS = true;
smsMonitorService.pushSmsMCustomerStaTop10();
}
if(SmsMCustomerStaTop10Ws.onlineCount == 0) {
SmsMonitorServiceImpl.isPushSCS = false;
}
}
/**
* 發(fā)送文本類型消息 192.168.3.146:10086
* @throws IOException
*/
public static void sendInfo(Integer status,Object obj) throws IOException {
for (SmsMCustomerStaTop10Ws item : webSocketSet) {
try {
Session session = item.session;
Integer integer = map.get(session);
if(integer == status){
item.session.getBasicRemote().sendText(JSON.toJSONString(obj));
}else{
continue;
}
} catch (Exception e) {
LOGGER.info("SmsMCustomerStaTop10Ws ===> 發(fā)送文本消息錯誤", e);
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
SmsMCustomerStaTop10Ws.onlineCount++;
pushTrigger();
}
public static synchronized void subOnlineCount() {
SmsMCustomerStaTop10Ws.onlineCount--;
pushTrigger();
}
}
使用到的業(yè)務方法
- 以上
onOpen()
方法最終觸發(fā)的業(yè)務方法smsMonitorService.pushSmsMCustomerStaTop10();
/**
* 記錄線程數(shù),保證只創(chuàng)建一個線程
*/
private volatile static int threadCountSCS = 0;
/**
* 數(shù)據(jù)推送開關,是否推送數(shù)據(jù)到前端,默認關閉
*/
public static boolean isPushSCS = false;
/**
* 今日1,昨天2,前天3 數(shù)據(jù) 默認今日
*/
private static ImmutableSet<Integer> statusSCS = ImmutableSet.of(1, 2, 3);
@Override
public void pushSmsMCustomerStaTop10() {
if (threadCountSCS >= 1)
return;
threadCountSCS++;
new Thread() {
public void run() {
Thread.currentThread().setName("pushSmsMCustomerStaTop10-" + threadCountSCS);
System.out.println("top10客戶發(fā)送量推送服務線程啟動");
while (isPushSCS) {
for (Integer statusSC : statusSCS) {
try {
List<SmsMCustomerStaVo> smsMCustomerStaTop10 = smsMonitorMapper.findSmsMCustomerStaTop10(statusSC);
List<SmsMCustomerStaView> smsMCustomerStaViewList = new ArrayList<>();
int i = 1;
for (SmsMCustomerStaVo sm : smsMCustomerStaTop10) {
SmsMCustomerStaView smsMCustomerStaView = new SmsMCustomerStaView();
smsMCustomerStaView.setId(i);
smsMCustomerStaView.setShortName(SmsSymbol.customerMap.get(sm.getCustomerNo()).getShortName());
smsMCustomerStaView.setCustomerName(SmsSymbol.customerMap.get(sm.getCustomerNo()).getName());
smsMCustomerStaView.setSubmitCount(sm.getAllCount());
Integer issueCount = sm.getSendSuccessCount() + sm.getSendFailCount() + sm.getSubmitSuccessCount(); //下發(fā)總量 = 總量除開提交失敗
smsMCustomerStaView.setIssueCount(issueCount);
//成功率(前30分鐘成功率)
Double sendSuccessCountRate = 0D;
if(sm != null && sm.getAllCountThirty() != null && sm.getAllCountThirty() != 0){
sendSuccessCountRate = sm.getSendSuccessCountThirty() / sm.getAllCountThirty().doubleValue();
}
//未知率
Double unknownRateRate = 0D;
if(sm != null && sm.getAllCount() != null && sm.getAllCount() != 0){
//sendSuccessCountRate = sm.getSendSuccessCount() / issueCount.doubleValue(); //發(fā)送成功率
unknownRateRate = sm.getSubmitSuccessCount() / sm.getAllCount().doubleValue(); //未知率 = 提交成功 / 提交總量
}
smsMCustomerStaView.setSuccessRate(sendSuccessCountRate == 0 ?"0%":new DecimalFormat("#0.00").format(sendSuccessCountRate * 100) + "%");
smsMCustomerStaView.setUnknownRate(unknownRateRate == 0 ?"0%":new DecimalFormat("#0.00").format(unknownRateRate * 100) + "%");
i++;
smsMCustomerStaViewList.add(smsMCustomerStaView);
}
//List<SmsMCustomerStaView> collect = smsMCustomerStaViewList.stream().sorted(Comparator.comparing(SmsMCustomerStaView::getSubmitCount).reversed()).collect(Collectors.toList());
SmsMCustomerStaTop10Ws.sendInfo(statusSC,smsMCustomerStaViewList);
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("top10客戶發(fā)送量推送服務報錯",e );
}
}
try {
Thread.sleep(1000);
}catch (Exception e){
System.out.println(e);
}
}
System.out.println("top10客戶發(fā)送量推送服務線程停止");
threadCountSCS--;
}
}.start();
}
涉及的查詢sql
- 以上
smsMonitorMapper.findSmsMCustomerStaTop10(statusSC);
(傳參區(qū)分查前日/昨日/今日
數(shù)據(jù)) - oracle數(shù)據(jù)庫查詢語句
<select id="findSmsMCustomerStaTop10" resultMap="SmsMCustomerStaResultMap">
select t.* from (
SELECT SUM(ALL_COUNT) ALL_COUNT,SUM(SUBMIT_SUCCESS_COUNT) SUBMIT_SUCCESS_COUNT,SUM(SUBMIT_FAIL_COUNT) SUBMIT_FAIL_COUNT,
SUM(SEND_SUCCESS_COUNT) SEND_SUCCESS_COUNT,SUM(SEND_FAIL_COUNT) SEND_FAIL_COUNT,SUM(ALL_COUNT_THIRTY) ALL_COUNT_THIRTY,
SUM(SEND_SUCCESS_COUNT_THIRTY) SEND_SUCCESS_COUNT_THIRTY,CUSTOMER_NO
FROM SMS_M_CUSTOMER_STA
<if test="statusSCS == 1">
WHERE STA_DATE_TIME >= trunc(sysdate)
</if>
<if test="statusSCS == 2">
WHERE STA_DATE_TIME >= trunc(sysdate-1) AND STA_DATE_TIME <![CDATA[ < ]]> trunc(sysdate)
</if>
<if test="statusSCS == 3">
WHERE STA_DATE_TIME >= trunc(sysdate-2) AND STA_DATE_TIME <![CDATA[ < ]]> trunc(sysdate-1)
</if>
GROUP BY CUSTOMER_NO ORDER BY ALL_COUNT DESC)t where rownum <= 20
</select>
推送數(shù)據(jù)給前端演示圖
文章來源地址http://www.zghlxwxcb.cn/news/detail-551371.html
到了這里,關于websocket實時推送統(tǒng)計數(shù)據(jù)給前端頁面的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!