一、前言
1、如果我們的app類似于股票這種,數(shù)據(jù)很多很快,之前用的tomcat自帶的websocket
又或者 spring-boot-starter-websocke
集成,但是性能在數(shù)據(jù)并發(fā)很大時就會存在問題。
2、我前面寫的一篇關(guān)于 springBoot+webosket的,沒有使用netty的文章 springBoot使用webSocket的幾種方式以及在高并發(fā)出現(xiàn)的問題及解決 ,其中就包含了 以下者兩種方式,都有說明,大家如果量不大,下面這兩種方式也是可以的。
- tomcat自帶的
websocket
-
spring-boot-starter-websocke
集成
二、使用Netty 完成 webSocket
1、如何使用 ,可以參考 netty + webSocket + SpringBott 是參考文章 SpringBoot整合Netty處理WebSocket(支持url參數(shù)) 這篇文章是,說的已經(jīng)很ok了
2、但是上面那篇文章還是有所不足,因為我需要加上token認(rèn)證,只有認(rèn)證了,才可以建立鏈接,上面那篇的文章,只是獲取參數(shù),在認(rèn)證方面,還是有所不足,滿足不了這個條件。后續(xù)我可以把我的方式,寫一篇文章放出來
2.1、RequestUriUtils 的 getBasePath 方法
2、比如你的鏈接是 ws://192.168.172.139:1234/ws/id=1
,使用它文章中的獲取后得到 /ws/
,建議改成如下,獲取之后是 /ws
/**
* 獲取URI中參數(shù)以外部分路徑
*
* @param uri
* @return
*/
public static String getBasePath(String uriStr) {
String pathWithSlash ="";
try {
// 使用URI解析URL字符串
URI uri = new URI(uriStr);
// 獲取路徑部分
pathWithSlash = uri.getPath();
// 去掉末尾的斜杠
return pathWithSlash.replaceAll("/$", "");
} catch (URISyntaxException e) {
log.error("解析path錯誤", e);
}
return pathWithSlash;
}
2.2、WebSocketChannelInitializer 中的 ChannelPipeline 說明
在WebSocket服務(wù)器的構(gòu)建中添加.addLast(new HttpServerCodec())
的主要原因是WebSocket握手是基于HTTP協(xié)議的,WebSocket連接的建立需要經(jīng)過以下步驟:
- 客戶端向服務(wù)器發(fā)送一個HTTP請求,請求升級到WebSocket協(xié)議。
- 服務(wù)器收到這個請求后,需要進(jìn)行協(xié)議升級處理,將HTTP協(xié)議切換到WebSocket協(xié)議。
- 一旦升級成功,WebSocket連接建立,客戶端和服務(wù)器之間可以通過WebSocket協(xié)議進(jìn)行雙向通信。
因此,WebSocket握手的開始階段仍然是HTTP請求和響應(yīng)。為了處理這個初始的HTTP請求,需要在Netty的ChannelPipeline中添加.addLast(new HttpServerCodec())
,以確保能夠解析和處理這個HTTP請求,并在需要時將其升級為WebSocket連接。簡而言之,.addLast(new HttpServerCodec())
的作用是為了使WebSocket服務(wù)器能夠正確地處理WebSocket握手之前的HTTP請求和響應(yīng),確保WebSocket連接能夠成功建立。一旦WebSocket連接建立,就可以通過WebSocket協(xié)議進(jìn)行實(shí)時雙向通信。
這是WebSocket服務(wù)器構(gòu)建中的一個標(biāo)準(zhǔn)操作。websocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http解編碼器
2.3、addLast(new ChunkedWriteHandler())
.addLast(new ChunkedWriteHandler())
是 Netty 中的一個 ChannelHandler,它的主要作用是支持異步寫大數(shù)據(jù)流(例如文件傳輸)。
在某些情況下,你可能需要向客戶端發(fā)送大量的數(shù)據(jù),例如文件的內(nèi)容,而不是一次性將整個數(shù)據(jù)寫入緩沖區(qū),因為這可能會導(dǎo)致內(nèi)存占用過高。相反,你可以將數(shù)據(jù)分成小塊(chunk)并逐塊寫入客戶端,以避免內(nèi)存問題。
ChunkedWriteHandler
的作用如下:
- 支持大數(shù)據(jù)流的異步寫入: 它允許你將數(shù)據(jù)切割成小塊并異步地將這些塊寫入客戶端。這對于傳輸大型文件或大量數(shù)據(jù)非常有用,因為它可以避免將整個數(shù)據(jù)加載到內(nèi)存中。
- 維護(hù)寫入順序: 它確保數(shù)據(jù)塊按照它們添加到 Channel 的順序進(jìn)行寫入。這有助于保持?jǐn)?shù)據(jù)的有序性。
- 提高性能: 通過異步寫入數(shù)據(jù)塊,
ChunkedWriteHandler
可以提高網(wǎng)絡(luò)性能,因為它不會阻塞線程等待數(shù)據(jù)傳輸完成。
這個處理器通常與其他處理器一起使用,以完成完整的數(shù)據(jù)傳輸過程。例如,如果你要實(shí)現(xiàn)文件傳輸,通常會使用 ChunkedWriteHandler
將文件數(shù)據(jù)切割成小塊,然后使用其他處理器來處理文件的傳輸,例如文件塊的編碼和解碼。
總之,.addLast(new ChunkedWriteHandler())
的作用是支持異步寫大數(shù)據(jù)流,以提高性能并降低內(nèi)存使用,尤其在需要傳輸大量數(shù)據(jù)時非常有用。
2.4、addLast(new HttpObjectAggregator(1024 * 64))
將HttpMessage和HttpContents聚合到一個完成的 FullHttpRequest或FullHttpResponse中,具體是FullHttpRequest對象還是FullHttpResponse對象取決于是請求還是響應(yīng)
.addLast(new HttpObjectAggregator(1024 * 64))
是 Netty 中的一個 ChannelHandler,主要用于將HTTP請求或響應(yīng)的多個部分聚合成一個完整的HTTP消息。這對于處理HTTP消息非常有用,特別是當(dāng)你需要處理大量的HTTP數(shù)據(jù)時。
以下是.addLast(new HttpObjectAggregator(1024 * 64))
的主要作用:
- 消息聚合: 在HTTP通信中,請求或響應(yīng)可能會分成多個部分(例如,HTTP請求頭和HTTP請求體)。
HttpObjectAggregator
負(fù)責(zé)將這些部分聚合成一個完整的FullHttpRequest
或FullHttpResponse
,以便更容易處理和操作。- 內(nèi)存管理: 這個處理器還具有內(nèi)存管理功能。你可以在構(gòu)造函數(shù)中指定一個最大的聚合字節(jié)數(shù)(在示例中是64 KB)。如果接收到的HTTP數(shù)據(jù)超過了這個大小,
HttpObjectAggregator
將拋出異常以防止內(nèi)存泄漏。- 簡化HTTP消息處理: 聚合HTTP消息使得你可以更容易地處理完整的HTTP請求和響應(yīng),而不必手動處理每個部分。這對于構(gòu)建Web服務(wù)器或HTTP代理非常有用。
示例使用:
pipeline.addLast(new HttpServerCodec()); // 添加HTTP編解碼器
pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB
pipeline.addLast(new MyHttpRequestHandler()); // 自定義的HTTP請求處理器
在上面的示例中,首先使用
HttpServerCodec
添加了HTTP編解碼器,然后使用HttpObjectAggregator
聚合HTTP消息,最后添加了一個自定義的HTTP請求處理器。
總之,.addLast(new HttpObjectAggregator(1024 * 64))
的作用是將HTTP請求或響應(yīng)的多個部分聚合成一個完整的HTTP消息,以簡化和改善處理HTTP消息的流程,并提供內(nèi)存管理功能。這在構(gòu)建支持HTTP的應(yīng)用程序中非常有用。
2.5、addLast(new WebSocketServerCompressionHandler())
webSocket 數(shù)據(jù)壓縮擴(kuò)展,當(dāng)添加這個的時候WebSocketServerProtocolHandler的第三個參數(shù)需要設(shè)置成true.addLast(new WebSocketServerCompressionHandler())
是 Netty 中的一個 ChannelHandler,用于支持 WebSocket 消息的壓縮和解壓縮。WebSocket 消息壓縮可以減小消息的大小,提高網(wǎng)絡(luò)傳輸效率,尤其在低帶寬環(huán)境下非常有用。
以下是 .addLast(new WebSocketServerCompressionHandler())
的主要作用:
- WebSocket 消息壓縮: 當(dāng)客戶端和服務(wù)器之間通過 WebSocket 協(xié)議傳輸大量數(shù)據(jù)時,可以使用壓縮技術(shù)將消息壓縮為更小的尺寸,以減少網(wǎng)絡(luò)帶寬的使用。
WebSocketServerCompressionHandler
負(fù)責(zé)處理消息的壓縮。- WebSocket 消息解壓縮: 對于接收到的已壓縮的 WebSocket 消息,服務(wù)器需要將其解壓縮以獲取原始消息。
WebSocketServerCompressionHandler
也負(fù)責(zé)解壓縮已壓縮的消息。- 支持多種壓縮算法:
WebSocketServerCompressionHandler
支持多種壓縮算法,包括通常的 DEFLATE 和 GZIP 壓縮算法,以及自定義的壓縮算法。
在WebSocket應(yīng)用程序中,通常需要在WebSocket連接建立時協(xié)商是否啟用壓縮,以及使用哪種壓縮算法。如果客戶端和服務(wù)器都支持壓縮,那么它們可以在消息傳輸過程中啟用壓縮。
要使用 .addLast(new WebSocketServerCompressionHandler())
,你需要在 WebSocket 服務(wù)器的處理管道中添加該處理器。例如:
pipeline.addLast(new HttpServerCodec()); // 添加HTTP編解碼器
pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB
pipeline.addLast(new WebSocketServerCompressionHandler()); // 添加WebSocket消息壓縮處理器
pipeline.addLast(new MyWebSocketHandler()); // 自定義的WebSocket處理器
在上面的示例中,首先使用 HttpServerCodec
添加了HTTP編解碼器,然后使用 HttpObjectAggregator
聚合HTTP消息,接下來添加了 WebSocketServerCompressionHandler
以支持WebSocket消息壓縮,最后添加了一個自定義的WebSocket處理器。
總之,.addLast(new WebSocketServerCompressionHandler())
的作用是為WebSocket服務(wù)器添加消息壓縮和解壓縮的功能,以減小消息大小并提高網(wǎng)絡(luò)傳輸效率。這在需要傳輸大量數(shù)據(jù)的WebSocket應(yīng)用中非常有用。
2.6、.addLast(new MyWebSocketHandler())
自定義處理器 - 處理 web socket 消息(消息的父類是WebSocketFrame,旗下有很多子類,比如BinaryWebSocketFrame TextWebSocketFrame 等等)
如果你使用的是 父類是WebSocketFrame,則需要在其內(nèi)部,判斷是什么類型的數(shù)據(jù),如果你使用的具體的子類,那么只有具體的消息類型會到哪里
2.7、 .addLast(new WebSocketServerProtocolHandler(WebSocketProperties.path, null, true, 10485760));
服務(wù)器端向外暴露的 web socket 端點(diǎn),當(dāng)客戶端傳遞比較大的對象時,maxFrameSize參數(shù)的值需要調(diào)大
WebSocketServerProtocolHandler
是 Netty 中的一個關(guān)鍵組件,用于處理 WebSocket 握手和協(xié)議升級,以及管理 WebSocket 連接的生命周期。它的主要作用如下:
- WebSocket 握手處理: 當(dāng)客戶端通過 HTTP 請求發(fā)起 WebSocket 握手時,
WebSocketServerProtocolHandler
負(fù)責(zé)識別并處理這些握手請求。它可以檢查HTTP請求中的升級標(biāo)頭和協(xié)議頭,以確定是否需要升級到 WebSocket 協(xié)議。- WebSocket 握手協(xié)議升級: 如果客戶端發(fā)送了符合 WebSocket 握手規(guī)范的請求,
WebSocketServerProtocolHandler
會處理協(xié)議升級,將連接從 HTTP 協(xié)議切換到 WebSocket 協(xié)議。這個過程包括升級響應(yīng)的構(gòu)建和升級握手的處理。- WebSocket 生命周期管理: 一旦 WebSocket 握手成功,
WebSocketServerProtocolHandler
管理 WebSocket 連接的生命周期。它會處理連接的打開、關(guān)閉、異常和消息傳遞等事件。- Ping/Pong 處理: WebSocket 協(xié)議支持 Ping 和 Pong 消息,用于保持連接的活動狀態(tài)。
WebSocketServerProtocolHandler
會自動處理這些心跳消息,以確保連接保持活動狀態(tài)。
以下是一個示例,展示了如何在 Netty 中使用 WebSocketServerProtocolHandler
:
pipeline.addLast(new HttpServerCodec()); // 添加HTTP編解碼器
pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB
pipeline.addLast(new WebSocketServerProtocolHandler("/websocket")); // 添加WebSocket握手處理器
pipeline.addLast(new MyWebSocketHandler()); // 自定義的WebSocket處理器
在上面的示例中,WebSocketServerProtocolHandler
被添加到處理管道中,并指定了 WebSocket 的路徑(在示例中是"/websocket")。一旦握手成功,連接將切換到 WebSocket 協(xié)議,并且可以在 MyWebSocketHandler
中處理 WebSocket 消息。
總之,WebSocketServerProtocolHandler
是用于處理 WebSocket 握手和協(xié)議升級的關(guān)鍵組件,它使得在 Netty 中創(chuàng)建 WebSocket 服務(wù)器變得更加容易。
三、Web Socket 性能對比——Spring Boot vs Tomcat vs Netty
參考文章 Web Socket 性能對比——Spring Boot vs Tomcat vs Netty 說的很ok了。
四、使用四種框架分別實(shí)現(xiàn)百萬websocket常連接的服務(wù)器(寫的很好,必看)
1、文章包含了一些線上的參數(shù)調(diào)整,都是干活
原文地址: https://colobu.com/2015/05/22/implement-C1000K-servers-by-spray-netty-undertow-and-node-js/
五、七種WebSocket框架的性能比較
原文地址: https://colobu.com/2015/07/14/performance-comparison-of-7-websocket-frameworks/
六、使用python 腳本測試
1、主要測試兩部分
- 大量客戶端同時在線,查看性能,內(nèi)存消耗問題
- 大量客戶端同時在線,數(shù)據(jù)發(fā)送效率
2、python 安裝這里就不再說了
3、本文的 第四節(jié) 和第五節(jié) 請務(wù)必了解,需要修改對應(yīng)的 服務(wù)器 tcp鏈接數(shù)等等參數(shù)。
4、我的webSocket 鏈接格式是 ws://192.168.172.226:7081/ws/token
最后的那個token
,用于線上的認(rèn)證,只有認(rèn)證了的用戶,才會建立通道,這里為了方便測試,直接用數(shù)值代替,如下,這樣,就代表用戶id好了,畢竟后續(xù)我要是測試50w個客戶端,總不能先生成50w個token吧。
ws://192.168.172.226:7081/ws/1
ws://192.168.172.226:7081/ws/2
6.1、python 腳本
1、腳本內(nèi)容
import threading
import time
import websocket
# 定義帶有順序編號的 WebSocket URL
url_base = "ws://192.168.172.226:7081/ws/"
num_connections = 10000 # 要模擬的連接數(shù)
running_connections = 0 # 跟蹤當(dāng)前正在運(yùn)行的連接數(shù)
# 創(chuàng)建線程本地存儲對象來存儲每個線程的文件名
local = threading.local()
# 建立 WebSocket 連接的函數(shù)
def connect_websocket():
global running_connections
try:
# 使用順序編號生成 URL
url = url_base + str(running_connections)
# 為當(dāng)前線程創(chuàng)建文件名
local.filename = f"{running_connections}.txt"
while True:
# 創(chuàng)建 WebSocket 連接
ws = websocket.create_connection(url)
while True:
# 接收來自服務(wù)端的消息
message = ws.recv()
# 保存消息到文件
with open(local.filename, "a") as file:
file.write(message + "\n")
except Exception as e:
print(f"WebSocket 連接失敗: {e}")
running_connections -= 1
# 開始模擬 WebSocket 連接
while running_connections < num_connections:
t = threading.Thread(target=connect_websocket)
t.start()
running_connections += 1
# 等待所有連接完成
while running_connections > 0:
time.sleep(1)
print("所有 WebSocket 連接完成。")
2、運(yùn)行
# 安裝 websocket-client
pip install websocket-client
# 運(yùn)行test.py 文件
python test.py
3、說明
腳本作用是生成指定 num_connections
的webSocket 連接數(shù),并一直監(jiān)聽服務(wù)端返回的消息,如果服務(wù)端有消息就會保存到對應(yīng)鏈接的文件夾下面,包含其服務(wù)端返回的內(nèi)容。
6.2、netty 服務(wù)端
1、具體的鏈接的代碼我這里就不說了
2、主要需要寫兩個接口,一個接口是向所有在線的客戶端發(fā)送一條消息,另一個接口是向所有在線的客戶端發(fā)送指定數(shù)量mockCount
的消息
package cn.jt.thermalapi.common.controller;
import cn.jt.thermalapi.response.Response;
import cn.jt.thermalapi.websocket.session.SessionFactory;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author GXM
* @version 1.0.0
* @Description TODO
* @createTime 2023年10月13日
*/
@ApiIgnore
@Api(tags = "測試api")
@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {
@GetMapping("mockOne")
public Response<String> mockOne() {
AtomicInteger count = new AtomicInteger(0);
SessionFactory.getSession().broadcast(count.getAndIncrement() + "");
return Response.buildSuccess();
}
@GetMapping("mockMany/{mockCount}")
public Response<String> mockMany(@PathVariable("mockCount") int mockCount) {
AtomicInteger count = new AtomicInteger(0);
while (count.getAndIncrement() <= mockCount) {
SessionFactory.getSession().broadcast(count.getAndIncrement() + "");
}
return Response.buildSuccess();
}
}
6.3、演示
1、啟動你的netty 服務(wù)端
2、啟動測試腳本
python test.py
3、服務(wù)端日志輸出,我測試出來,1w 鏈接大約30s左右,看自己機(jī)器吧,我這還是在idea里面跑的。
4、請求test/mockOne
接口,大于1s
文章來源:http://www.zghlxwxcb.cn/news/detail-758365.html
5、在 test.py
文件下,生成了對應(yīng)的1w客戶端的文件,其內(nèi)容就是服務(wù)端發(fā)送的。
5、請求test/mockMany/100
接口,這個大家可以自己測試下,或者等我后續(xù)在服務(wù)器測試結(jié)束后,再把這篇文章整理,一下,因為本次測試都是在我本機(jī)上測試的,只是初步了解。但是腳本已經(jīng)可以使用,后續(xù)大家測試服務(wù)器上面,步驟是一樣的文章來源地址http://www.zghlxwxcb.cn/news/detail-758365.html
到了這里,關(guān)于springBoot + netty搭建高性能 websocket 服務(wù) & 性能測試(包含python 測試腳本)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!