參考如下文檔,主要為了方面查找,因此把參考內(nèi)容都在此文章中重新寫(xiě)一遍:
SpringBoot整合WebScoket顯示進(jìn)度條 - 鐘小嘿 - 博客園
1.問(wèn)題描述
對(duì)于大文件上傳解析,若直接上傳,會(huì)超時(shí),可使用WebSocket長(zhǎng)鏈接方式實(shí)時(shí)顯示文件的上傳狀態(tài),實(shí)際上是從文件上傳到內(nèi)容解析完成存入數(shù)據(jù)庫(kù)的過(guò)程,各個(gè)階段的進(jìn)度可自定義。
本文使用SpringBoot+WebSocket+vue2.0+Element+nginx實(shí)現(xiàn)文件實(shí)時(shí)上傳顯示進(jìn)度條,上傳的截圖如下:
2.解決方案
?1)導(dǎo)入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
?2)開(kāi)啟WebSocket的支持,并把該類注入到spring容器中
package com.zxh.example.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
//開(kāi)啟WebSocket的支持,并把該類注入到spring容器中
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3)編寫(xiě)WebSocket服務(wù)
package com.zxh.example.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author zhengkai.blog.csdn.net
*/
@ServerEndpoint("/wsServer/{userId}")
@Component
@Slf4j
public class WebSocketServer {
/**
* 靜態(tài)變量,用來(lái)記錄當(dāng)前在線連接數(shù)。應(yīng)該把它設(shè)計(jì)成線程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的線程安全Set,用來(lái)存放每個(gè)客戶端對(duì)應(yīng)的MyWebSocket對(duì)象。
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 與某個(gè)客戶端的連接會(huì)話,需要通過(guò)它來(lái)給客戶端發(fā)送數(shù)據(jù)
*/
private Session session;
/**
* 接收userId
*/
private String userId = "";
/**
* 連接建立成功調(diào)用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
webSocketMap.put(userId, this);
//加入set中
} else {
webSocketMap.put(userId, this);
//加入set中
addOnlineCount();
//在線數(shù)加1
}
log.info("用戶連接:" + userId + ",當(dāng)前在線人數(shù)為:" + getOnlineCount());
try {
sendMessage("連接成功");
} catch (IOException e) {
log.error("用戶:" + userId + ",網(wǎng)絡(luò)異常!!!!!!");
}
}
/**
* 連接關(guān)閉調(diào)用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
//從set中刪除
subOnlineCount();
}
log.info("用戶退出:" + userId + ",當(dāng)前在線人數(shù)為:" + getOnlineCount());
}
/**
* 收到客戶端消息后調(diào)用的方法
*
* @param message 客戶端發(fā)送過(guò)來(lái)的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用戶消息:" + userId + ",報(bào)文:" + message);
//可以群發(fā)消息
//消息保存到數(shù)據(jù)庫(kù)、redis
if (StringUtils.isNotBlank(message)) {
try {
//解析發(fā)送的報(bào)文
JSONObject jsonObject = JSON.parseObject(message);
//追加發(fā)送人(防止串改)
jsonObject.put("fromUserId", this.userId);
String toUserId = jsonObject.getString("toUserId");
//傳送給對(duì)應(yīng)toUserId用戶的websocket
if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) {
webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
} else {
log.error("請(qǐng)求的userId:" + toUserId + "不在該服務(wù)器上");
//否則不在這個(gè)服務(wù)器上,發(fā)送到mysql或者redis
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 出現(xiàn)錯(cuò)誤
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用戶錯(cuò)誤:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 實(shí)現(xiàn)服務(wù)器主動(dòng)推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 發(fā)送自定義消息
*/
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
log.info("發(fā)送消息到:" + userId + ",報(bào)文:" + message);
if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
webSocketMap.get(userId).sendMessage(message);
} else {
log.error("用戶" + userId + ",不在線!");
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
4)編寫(xiě)文件上傳的controller
package com.zxh.example.controller;
import com.zxh.example.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
@RestController
@RequestMapping("/api")
@Slf4j
public class TestController {
@Autowired
private TestService testService;
@PostMapping("/upload")
public String upload(MultipartFile file) {
return testService.upload(file);
}
}
5)編寫(xiě)文件上傳的實(shí)現(xiàn)類,實(shí)時(shí)解析文件并發(fā)送通知
package com.zxh.example.service;
import cn.afterturn.easypoi.handler.inter.IReadHandler;
import com.zxh.example.entity.User;
import com.zxh.example.util.ExcelUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Service
@Slf4j
public class TestService {
public String upload(MultipartFile file) {
Integer[] percent = {1};
sendMessage(percent[0]);
Integer percentMax1 = 20;
Integer percentMax2 = 100;
// 讀取Excel中的數(shù)據(jù)到list集合中
List<User> list = new ArrayList<>();
//解析excel,解析1%~20%
ExcelUtils.importExcelBySax(file, User.class, 2, new IReadHandler<User>() {
@Override
public void handler(User o) {
list.add(o);
//每讀取指定行,推送1
if (list.size() % 10000 == 0 && percent[0] < percentMax1) {
percent[0]++;
sendMessage(percent[0]);
}
}
@Override
public void doAfterAll() {
//解析成功
percent[0] = percentMax1;
sendMessage(percent[0]);
}
});
//模擬數(shù)據(jù)插入,每1000條發(fā)送一次消息 21%~100%
Integer maxSize = 1000;
Integer queryCnt = list.size() % maxSize == 0 ? list.size() / maxSize : (list.size() / maxSize) + 1;
Integer sendCnt = 10;
for (int i = 0; i < queryCnt; i++) {
Integer endIndex = (i + 1) * maxSize;
if (endIndex > list.size()) {
endIndex = list.size();
}
//集合截取
List<User> tempList = new ArrayList<>(list.subList(i * maxSize, endIndex));
//模擬數(shù)據(jù)查詢
if (queryCnt % sendCnt == 0 && percent[0] < percentMax2) {
percent[0]++;
sendMessage(percent[0]);
}
}
percent[0] = percentMax2;
sendMessage(percent[0]);
return "success";
}
/**
* 自定義封裝的發(fā)送方法
* @param msg
*/
private void sendMessage(Integer msg) {
try {
WebSocketServer.sendInfo(msg.toString(), "111");
} catch (IOException e) {
log.error("消息發(fā)送異常:" + e.getMessage());
e.printStackTrace();
}
}
}
6)編寫(xiě)全局的global.js,可在全局使用,方便各個(gè)頁(yè)面都能獲取到消息
export default {
//websocket
webSocket: {},
setWs: function (ws) {
this.webSocket = ws
},
wsUrl: `${location.protocol === 'https:' ? 'wss' : 'ws'}://${location.host}/wsServer/`,
}
7)在main.js中注入global.js中的方法
import global from './global'
Vue.prototype.global = global
8)在Vue的App.vue創(chuàng)建webscoketd對(duì)象,并注冊(cè)到全局
<template>
<div id="app">
<router-view />
</div>
</template>
<script>
export default {
name: 'App',
data() {
return {
socket: null
}
},
mounted() {
this.initWs()
},
methods: {
//初始化
initWs() {
if (typeof (WebSocket) === "undefined") {
alert("您的瀏覽器不支持socket")
} else {
// 實(shí)例化socket 111是固定的用戶id,正式環(huán)境直接獲取當(dāng)前登錄用戶id
this.socket = new WebSocket(this.global.wsUrl + '111')
this.global.setWs(this.socket)
// 監(jiān)聽(tīng)socket連接
this.socket.onopen = () => {
console.log("socket連接成功")
}
// 監(jiān)聽(tīng)socket錯(cuò)誤信息
this.socket.onerror = () => {
console.error("連接錯(cuò)誤")
}
//監(jiān)聽(tīng)socket消息
this.socket.onmessage = (msg) => {
// console.log(msg)
}
// 監(jiān)聽(tīng)socket關(guān)閉信息
this.socket.onclose = (e) => {
console.error("socket已經(jīng)關(guān)閉")
console.error(e)
}
}
},
},
}
</script>
<style>
#app {
height: 100%;
}
</style>
9)在vue.config.js配置協(xié)議,轉(zhuǎn)發(fā)到后臺(tái)服務(wù)(本地開(kāi)發(fā))
module.exports = {
devServer: {
host: '0.0.0.0',
// //設(shè)置端口號(hào)
port: 8006,
//自動(dòng)打開(kāi)瀏覽器
open: true,
proxy: {
'/api': {
target: 'http://localhost:8080',
},
//websocket配置,正式環(huán)境設(shè)置nginx代理
'/wsServer': {
target: 'http://localhost:8080'
},
},
},
}
10)編寫(xiě)上傳文件的頁(yè)面
<template>
<div>
<el-button type="primary" icon="el-icon-upload" @click="handleUpload" style="margin-left: 10px;">導(dǎo)入
</el-button>
<el-upload ref="importUpload" :auto-upload="false" :show-file-list="false" :on-change="postFile"
style="display: inline" action="#">
<el-button id="uploadButton1" style="display: none" slot="trigger" />
</el-upload>
<el-dialog title="上傳進(jìn)度" :visible.sync="uploadDialog" width="30%" @close="closeDialog"
:close-on-click-modal="false">
<p>
<div class="time-content">已用時(shí)間:{{timesStr}}</div>
</p>
<el-progress :percentage="percentMsg" :text-inside="true" :stroke-width="23"></el-progress>
<div class="status-content">
<p v-if="importStatus == 1">
<span class="status-content-icon-span">上傳中,請(qǐng)稍后......</span>
</p>
<p v-if="importStatus == 2"><i class="el-icon-success"></i>
<span class="status-content-icon-span">上傳成功</span>
</p>
<p v-if="importStatus == 3"><i class="el-icon-error"></i>
<span class="status-content-icon-span">上傳失敗</span>
</p>
</div>
</el-dialog>
</div>
</template>
<script>
import {
user
} from "@/api/user";
let that
export default {
data() {
return {
uploadDialog: false,
websocket: "",
percentMsg: 0,
times: 0,
timesStr: '00:00',
timesId: null,
importStatus: 0, //上傳狀態(tài),0未上傳,1上傳中,2上傳成功,3上傳失敗
}
},
created() {
that = this
},
watch: {
'percentMsg': function (val) {
if (val === 100 && this.timesId) {
clearInterval(this.timesId)
}
},
'importStatus': function (val) {
if (val === 3 && this.timesId) {
clearInterval(this.timesId)
}
}
},
mounted() {
this.getSystemWs()
},
methods: {
getSystemWs() {
this.global.webSocket.onmessage = res => {
if (res && res.data) {
this.percentMsg = Number(res.data)
} else {
this.importStatus = 3
}
}
},
//上傳開(kāi)始計(jì)時(shí)
startUpload() {
this.timesId = setInterval(function () {
let timesStr = that.timesStr
that.times++
let m = parseInt(that.times / 60)
let s = that.times % 60
if (that.times != 0 && s % 60 == 0) {
m = that.times / 60
s = 0
}
if (m < 10) {
timesStr = '0' + m
} else {
timesStr = m
}
timesStr += ":"
if (s < 10) {
timesStr = timesStr + '0'
}
timesStr = timesStr + s
that.timesStr = timesStr
}, 1000);
},
handleUpload() {
const uploadObj1 = document.getElementById("uploadButton1");
uploadObj1.click();
},
beforeUpload(file) {
if (file.type == "" || file.type == null || file.type == undefined) {
const FileExt = file.name.replace(/.+\./, "").toLowerCase();
if (
FileExt == "xls" ||
FileExt == "xlsx" ||
FileExt == "XLS" ||
FileExt == "XLSX"
) {
return true;
} else {
this.$message.error("上傳文件必須是Excel格式!");
return false;
}
}
return true;
},
postFile(file) {
this.percentMsg = 0
this.startUpload()
var fileData = new FormData();
fileData.append("file", file.raw);
let headers = {
"Content-Type": "multipart/form-data"
};
this.uploadDialog = true;
user.upload(fileData).then(res => {
if (res == 'success') {
this.importStatus = 2
} else {
this.importStatus = 3
}
});
},
closeDialog() {
if (this.timesId) {
clearInterval(this.timesId)
}
this.percentMsg = 0
this.times = 0
this.timesStr = '00:00'
if (this.importStatus == 2) {
this.getList()
}
this.importStatus = 0
},
},
}
</script>
<style>
.time-content {
text-align: right;
width: 100%;
}
.status-content {
margin-top: 40px;
width: 100%;
text-align: center;
}
.status-content .el-icon-success {
font-size: 30px;
vertical-align: -20%;
color: #67C23A;
}
.status-content .el-icon-error {
font-size: 30px;
vertical-align: -20%;
color: #ee3838;
}
.status-content .el-icon-warning {
font-size: 30px;
vertical-align: -20%;
color: #E6A23C;
}
.status-content-icon-span {
margin-left: 10px;
}
</style>
3.注意事項(xiàng)
3.1nginx代理配置
11)在上線時(shí)是需要使用nginx代理的,故需使用nginx代理前端的WebSocket
在nginx.conf做如下配置:
...
#請(qǐng)求體大小
client_max_body_size 20M;
...
server {
listen 81;
server_name localhost;
location / {
root html;
try_files $uri $uri/ /index.html;
}
location ~^/api/ {
proxy_pass http://127.0.0.1:8080;
proxy_read_timeout 600s; #默認(rèn)是60s,若不配置則超過(guò)60s會(huì)出現(xiàn)504狀態(tài)碼
}
#websocket代理配置
location ~^/wsServer/ {
proxy_pass http://127.0.0.1:8080;
# 開(kāi)啟nginx對(duì)websocket的支持
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 36000s; #10小時(shí)未傳輸數(shù)據(jù)則關(guān)閉連接
}
...
默認(rèn)情況下,如果代理服務(wù)器在60秒內(nèi)未傳輸任何數(shù)據(jù),則連接將關(guān)閉。請(qǐng)求體的大小根據(jù)實(shí)際情況修改。若不配置,則上傳文件超過(guò)默認(rèn)值1MB時(shí)就會(huì)出現(xiàn)413錯(cuò)誤狀態(tài)碼。
3.2多節(jié)點(diǎn)問(wèn)題
在單節(jié)點(diǎn)服務(wù)時(shí),上述即可滿足需求,但多節(jié)點(diǎn)服務(wù)時(shí),通過(guò)nginx代理,若連接和請(qǐng)求都在同一臺(tái)服務(wù)器時(shí),可正常使用,但也會(huì)出現(xiàn)和A服務(wù)器連接了WebSocket,但在導(dǎo)入時(shí)請(qǐng)求的是B服務(wù)器的情況,此時(shí)B服務(wù)器并不會(huì)發(fā)送消息給前端,導(dǎo)致導(dǎo)入時(shí)不顯示進(jìn)度。此時(shí)就需要使用分布式的通知方式,下面使用redis的發(fā)布訂閱功能進(jìn)行消息的通知。
1)導(dǎo)入redis依賴
<!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2)創(chuàng)建redis消息實(shí)體,
package com.zxh.model;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
/**
* redis發(fā)布訂閱的消息實(shí)體
*/
@Data
@Accessors(chain = true)
public class RedisMessage {
//消息類型,1全部廣播,2個(gè)人信息
private Integer category;
//消息
private String message;
//要發(fā)送的用戶組
private List<String> userList;
}
?
方便消息的封裝。
2)創(chuàng)建業(yè)務(wù)處理類,監(jiān)聽(tīng)redis消息發(fā)布
主要用于監(jiān)聽(tīng)消息的發(fā)布,收到消息時(shí)進(jìn)行相關(guān)業(yè)務(wù)的處理。
package com.zxh.common.listener;
import com.alibaba.fastjson.JSON;
import com.zxh.common.util.CollectionUtil;
import com.zxh.model.RedisMessage;
import com.zxh.server.WebSocketServer;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* redis消息訂閱-業(yè)務(wù)處理
*/
@Component
@Slf4j
public class RedisMessageListener implements MessageListener {
//重寫(xiě)onMessage,處理相關(guān)發(fā)布訂閱的業(yè)務(wù)
@SneakyThrows
@Override
public void onMessage(Message message, byte[] bytes) {
String body = new String(message.getBody(), "UTF-8");
RedisMessage redisMessage = JSON.parseObject(body, RedisMessage.class);
if (redisMessage != null) {
Integer category = redisMessage.getCategory();
//個(gè)人信息
if (category == 2) {
//根據(jù)用戶id消息
if (CollectionUtil.isNotEmpty(redisMessage.getUserList())) {
redisMessage.getUserList().stream().forEach(userId -> {
try {
WebSocketServer.sendInfo(redisMessage.getMessage(),userId);
} catch (IOException e) {
e.printStackTrace();
}
});
} else {
log.warn("無(wú)用戶信息,發(fā)送信息失敗");
}
} else if (category == 1) {
}
}
}
}
3)配置redis發(fā)布訂閱
package com.zxh.configure;
import com.zxh.common.SystemConst;
import com.zxh.common.listener.RedisMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* redis發(fā)布訂閱配置
*/
@Configuration
@EnableCaching
public class RedisPubSubConfig {
Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 配置 交換機(jī)消息,添加多個(gè) messageListener參數(shù),配置不同的交換機(jī)
*
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("channel:test1"));
return container;
}
/**
* 消息監(jiān)聽(tīng)器適配器,綁定消息處理器,利用反射技術(shù)調(diào)用消息處理器的業(yè)務(wù)方法
*
* @param listener 業(yè)務(wù)處理類
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisMessageListener listener) {
logger.info("redis消息監(jiān)聽(tīng)器加載成功--------->>>>>>");
// onMessage 就是方法名,基于反射調(diào)用
return new MessageListenerAdapter(listener, "onMessage");
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
4)調(diào)用redis的發(fā)布功能
修改TestService的sendMessage的方法,把使用WebSocket發(fā)送信息改為把消息發(fā)布到redis中。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-624051.html
@Service
@Slf4j
public class TestService {
.....
@Autowired
private StringRedisTemplate stringRedisTemplate;
private void sendMessage(Integer msg) {
List<String> userList = Arrays.asList("1111");//使用redis的發(fā)布訂閱發(fā)送消息
RedisMessage redisMessage = new RedisMessage().setCategory(2);
redisMessage.setMessage(msg.toString()).setUserList(userList);
stringRedisTemplate.convertAndSend("channel:test1", JSON.toJSONString(redisMessage));
}
}
redis發(fā)布后,監(jiān)聽(tīng)器監(jiān)聽(tīng)到有消息時(shí),使用WebSocket進(jìn)行消息推送。每臺(tái)服務(wù)器都會(huì)推送,只有服務(wù)連接成功的一臺(tái)服務(wù)器才能通知到前臺(tái)成功文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-624051.html
到了這里,關(guān)于WebSocket整合springboot顯示進(jìn)度條的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!