前言
ok,那么今天的話也是帶來(lái)這個(gè)非常常用的一個(gè)技術(shù),那就是咱們完成nutty的一個(gè)應(yīng)用,今天的話,我會(huì)介紹地很詳細(xì),這樣的話,拿到這個(gè)博文的代碼就基本上可以按照自己的想法去構(gòu)建自己的一個(gè)在線應(yīng)用了。比如聊天,在線消息推送之類(lèi)的。其實(shí)一開(kāi)始我原來(lái)的想法做在線消息推送是直接mq走起,但是想了想對(duì)mq的依賴(lài)太高了。而且總感覺(jué)不安全,況且還有實(shí)時(shí)在線處理的一些要求,所以的話才覺(jué)得切換nutty來(lái)做。我的構(gòu)想是這樣的:
在我的構(gòu)想里面的話,基本上除了和客戶端建立的連接之外,會(huì)暴露出我們的一個(gè)服務(wù)器地址和接口。
其他的業(yè)務(wù)服務(wù),都是通過(guò)其他的服務(wù)進(jìn)行調(diào)用后返回的,客戶端和nutty服務(wù)器只是建立長(zhǎng)連接,負(fù)責(zé)接收消息,確認(rèn)消息。具體的業(yè)務(wù)消息是如何發(fā)送的都是通過(guò)其他微服務(wù)的,好處就是確保安全,例如限制用戶的聊天評(píng)率(因?yàn)榭赡苁菒阂饽_本)。
不過(guò)的話,我們今天的部分是在這里:
就是紫色框起來(lái)的地方。這部分是基礎(chǔ),也是毛坯房,后面你們可以根據(jù)本文去造自己的房子。
后端
首先是我們的服務(wù)后端的搭建,這部分的話其實(shí)可以參考我的這篇文章:實(shí)用水文篇–SpringBoot整合Netty實(shí)現(xiàn)消息推送服務(wù)器
那么我們這邊只是說(shuō)說(shuō)不同的地方,核心的主要的地方。
項(xiàng)目結(jié)構(gòu)
這里的話,可以看到我們的這邊的話其實(shí)是和先前的一樣的,其實(shí)沒(méi)什么變化,區(qū)別在里面:
這里面我重寫(xiě)了一下方法,對(duì)上次的一些內(nèi)容進(jìn)行了修改,因?yàn)樯洗问敲髦械拿髀铩?/p>
初始化器
首先是我們的初始化器,那么在這里的話,我增加了這個(gè)心跳在線的一個(gè)處理。主要是因?yàn)?,?shí)際上,就是說(shuō),避免我們的一個(gè)資源的浪費(fèi)嘛。
public class ServerHandler extends ChannelInitializer<SocketChannel> {
/**
* 初始化通道以及配置對(duì)應(yīng)管道的處理器
* @param channel
* @throws Exception
*/
@Override
protected void initChannel(SocketChannel channel) throws Exception{
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(1024*64));
//===========================增加心跳支持==============================
/**
* 針對(duì)客戶端,如果在1分鐘時(shí)間內(nèi)沒(méi)有向服務(wù)端發(fā)送讀寫(xiě)心跳(ALL),則主動(dòng)斷開(kāi)連接
* 如果有讀空閑和寫(xiě)空閑,則不做任何處理
*/
pipeline.addLast(new IdleStateHandler(8,10,12));
//自定義的空閑狀態(tài)檢測(cè)的handler
pipeline.addLast(new HeartBeatHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//自定義的handler
pipeline.addLast(new ServerListenerHandler());
}
}
對(duì)應(yīng)的心跳檢測(cè)的實(shí)現(xiàn)類(lèi)在這里:
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;//強(qiáng)制類(lèi)型轉(zhuǎn)化
if(event.state()== IdleState.READER_IDLE){
System.out.println("進(jìn)入讀空閑......");
}else if(event.state() == IdleState.WRITER_IDLE) {
System.out.println("進(jìn)入寫(xiě)空閑......");
}else if(event.state()== IdleState.ALL_IDLE){
System.out.println("channel 關(guān)閉之前:users 的數(shù)量為:"+ UserConnectPool.getChannelGroup().size());
Channel channel = ctx.channel();
//資源釋放
channel.close();
System.out.println("channel 關(guān)閉之后:users 的數(shù)量為:"+UserConnectPool.getChannelGroup().size());
}
}
}
}
服務(wù)類(lèi)
之后的話就是我們具體的消息推送,服務(wù)之類(lèi)的了。
public class ServerListenerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(ServerBoot.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//獲取客戶端所傳輸?shù)南?/span>
String content = msg.text();
//1.獲取客戶端發(fā)來(lái)的消息
DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
assert dataContent != null;
System.out.println("----->"+dataContent);
Integer action = dataContent.getAction();
Channel channel = ctx.channel();
//2.判斷消息類(lèi)型,根據(jù)不同的類(lèi)型來(lái)處理不同的業(yè)務(wù)
if(Objects.equals(action, MessageActionEnum.CONNECT.type)){
//2.1 當(dāng)websocket 第一次open的時(shí)候,初始化channel,把用的channel 和 userid 關(guān)聯(lián)起來(lái)
String senderId = dataContent.getChatMsg().getSenderId();
UserConnectPool.getChannelMap().put(senderId,channel);
//這里是輸出一個(gè)用戶關(guān)系
UserConnectPool.output();
}
} else if(Objects.equals(action, MessageActionEnum.KEEPALIVE.type)){
//2.4 心跳類(lèi)型的消息
System.out.println("收到來(lái)自channel 為["+channel+"]的心跳包");
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//接收到請(qǐng)求
log.info("有新的客戶端鏈接:[{}]", ctx.channel().id().asLongText());
UserConnectPool.getChannelGroup().add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String chanelId = ctx.channel().id().asShortText();
log.info("客戶端被移除:channel id 為:"+chanelId);
UserConnectPool.getChannelGroup().remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//發(fā)生了異常后關(guān)閉連接,同時(shí)從channelgroup移除
ctx.channel().close();
UserConnectPool.getChannelGroup().remove(ctx.channel());
}
}
可以看到這里只保留了兩個(gè)玩意,一個(gè)是把用戶注冊(cè)到咱們的這個(gè)nutty服務(wù)器的內(nèi)存里面。還有一個(gè)是心跳包。
那么其他的數(shù)據(jù)類(lèi)型什么的,都在整合nutty的那篇博文里面。
那么我們的聊天怎么處理,很簡(jiǎn)單,在Controller接受到消息,然后在那里面調(diào)用Channel完成消息的轉(zhuǎn)發(fā)。
具體的案例也在那篇nutty的整合里面。
前端
那么之后的話,是我們的一個(gè)前端 。
封裝websocket
這邊的話對(duì)這個(gè)websocket做了一個(gè)封裝,可以在vue、uniapp當(dāng)中使用。我這邊還用到了element-ui主要是來(lái)做消息提醒的,你可以選擇刪掉。
// 導(dǎo)出socket對(duì)象
export {
socket
}
import { Message } from 'element-ui'
// socket主要對(duì)象
var socket = {
websock: null,
/**
* 這個(gè)是我們的ws的地址
* */
ws_url: "ws://localhost:9000/ws",
/**
* 開(kāi)啟標(biāo)識(shí)
* */
socket_open: false,
/**
* 心跳timer
* */
hearbeat_timer: null,
/**
* 心跳發(fā)送頻率
* */
hearbeat_interval: 5000,
/**
* 是否開(kāi)啟重連
* */
is_reonnect: true,
/**
* 重新連接的次數(shù)
* */
reconnect_count: 3,
/**
* 當(dāng)前重新連接的次數(shù),默認(rèn)為:1
* */
reconnect_current: 1,
/**
* 重新連接的時(shí)間類(lèi)型
* */
reconnect_timer: null,
/**
* 重新連接的間隔
* */
reconnect_interval: 3000,
/**
* 初始化連接
*/
init: () => {
if (!("WebSocket" in window)) {
Message({
message: '當(dāng)前瀏覽器與網(wǎng)站不兼容丫',
type: 'error',
});
console.log('瀏覽器不支持WebSocket')
return null
}
// 已經(jīng)創(chuàng)建過(guò)連接不再重復(fù)創(chuàng)建
if (socket.websock) {
return socket.websock
}
socket.websock = new WebSocket(socket.ws_url)
socket.websock.onmessage = function (e) {
socket.receive(e)
}
// 關(guān)閉連接
socket.websock.onclose = function (e) {
console.log('連接已斷開(kāi)')
console.log('connection closed (' + e.code + ')')
clearInterval(socket.hearbeat_interval)
socket.socket_open = false
// 需要重新連接
if (socket.is_reonnect) {
socket.reconnect_timer = setTimeout(() => {
// 超過(guò)重連次數(shù)
if (socket.reconnect_current > socket.reconnect_count) {
clearTimeout(socket.reconnect_timer)
return
}
// 記錄重連次數(shù)
socket.reconnect_current++
socket.reconnect()
}, socket.reconnect_interval)
}
}
// 連接成功
socket.websock.onopen = function () {
Message({
message: '連接成功,歡迎來(lái)到WhiteHole',
type: 'success',
});
console.log('連接成功')
socket.socket_open = true
socket.is_reonnect = true
// 開(kāi)啟心跳
socket.heartbeat()
}
// 連接發(fā)生錯(cuò)誤
socket.websock.onerror = function (err) {
Message({
message: '服務(wù)連接發(fā)送錯(cuò)誤!',
type: 'error',
});
console.log('WebSocket連接發(fā)生錯(cuò)誤')
}
},
/**
* 獲取websocket對(duì)象
* */
getSocket:()=>{
//創(chuàng)建了直接返回,反之重來(lái)
if (socket.websock) {
return socket.websock
}else {
socket.init();
}
},
getStatus:()=> {
if (socket.websock.readyState === 0) {
return "未連接";
} else if (socket.websock.readyState === 1) {
return "已連接";
} else if (socket.websock.readyState === 2) {
return "連接正在關(guān)閉";
} else if (socket.websock.readyState === 3) {
return "連接已關(guān)閉";
}
},
/**
* 發(fā)送消息
* @param {*} data 發(fā)送數(shù)據(jù)
* @param {*} callback 發(fā)送后的自定義回調(diào)函數(shù)
*/
send: (data, callback = null) => {
// 開(kāi)啟狀態(tài)直接發(fā)送
if (socket.websock.readyState === socket.websock.OPEN) {
socket.websock.send(JSON.stringify(data))
if (callback) {
callback()
}
// 正在開(kāi)啟狀態(tài),則等待1s后重新調(diào)用
} else if (socket.websock.readyState === socket.websock.CONNECTING) {
setTimeout(function () {
socket.send(data, callback)
}, 1000)
// 未開(kāi)啟,則等待1s后重新調(diào)用
} else {
socket.init()
setTimeout(function () {
socket.send(data, callback)
}, 1000)
}
},
/**
* 接收消息
* @param {*} message 接收到的消息
*/
receive: (message) => {
var recData = JSON.parse(message.data)
/**
*這部分是我們具體的對(duì)消息的處理
* */
// 自行擴(kuò)展其他業(yè)務(wù)處理...
},
/**
* 心跳
*/
heartbeat: () => {
console.log('socket', 'ping')
if (socket.hearbeat_timer) {
clearInterval(socket.hearbeat_timer)
}
socket.hearbeat_timer = setInterval(() => {
//發(fā)送心跳包
let data = {
"action": 4,
"chatMsg": null,
"extend": null,
}
socket.send(data)
}, socket.hearbeat_interval)
},
/**
* 主動(dòng)關(guān)閉連接
*/
close: () => {
console.log('主動(dòng)斷開(kāi)連接')
clearInterval(socket.hearbeat_interval)
socket.is_reonnect = false
socket.websock.close()
},
/**
* 重新連接
*/
reconnect: () => {
console.log('發(fā)起重新連接', socket.reconnect_current)
if (socket.websock && socket.socket_open) {
socket.websock.close()
}
socket.init()
},
}
使用
這個(gè)使用其實(shí)很簡(jiǎn)單,我們這邊的話是Vue所以在開(kāi)啟的時(shí)候就用上了,在我們的這個(gè)App.vue或者是其他的主頁(yè)面里面,我這邊是home作為主頁(yè)面(App.vue直接展示了home.vue(這個(gè)是你自己編寫(xiě)的))
效果
剛剛的連接效果看到了,那么就來(lái)看到這個(gè),我們后端的一個(gè)心跳:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-437847.html
可以看到以前正常。
之后的話,拿著這套毛坯房就可以happy了。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-437847.html
到了這里,關(guān)于SpringBoot+Netty+Vue+Websocket實(shí)現(xiàn)在線推送/聊天系統(tǒng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!