国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考

這篇具有很好參考價(jià)值的文章主要介紹了關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

前言

????提到WebSocket相信大家都聽說過,它的初衷是為了解決客戶端瀏覽器與服務(wù)端進(jìn)行雙向通信,是在單個(gè)TCP連接上進(jìn)行全雙工通訊的協(xié)議。在沒有WebSocket之前只能通過瀏覽器到服務(wù)端的請(qǐng)求應(yīng)答模式比如輪詢,來實(shí)現(xiàn)服務(wù)端的變更響應(yīng)到客戶端,現(xiàn)在服務(wù)端也可以主動(dòng)發(fā)送數(shù)據(jù)到客戶端瀏覽器。WebSocket協(xié)議和Http協(xié)議平行,都屬于TCP/IP四層模型中的第四層應(yīng)用層。由于WebSocket握手階段采用HTTP協(xié)議,所以也需要進(jìn)行跨域處理。它的協(xié)議標(biāo)識(shí)是wswss對(duì)應(yīng)了常規(guī)標(biāo)識(shí)和安全通信協(xié)議標(biāo)識(shí)。本文重點(diǎn)并不是介紹WebSocket協(xié)議相關(guān),而是提供一種基于ASP.NET Core原生WebSocket的方式實(shí)現(xiàn)集群的實(shí)現(xiàn)思路。關(guān)于這套思路其實(shí)很早之前我就構(gòu)思過了,只是之前一直沒有系統(tǒng)的整理出來,本篇文章就來和大家分享一下,由于主要是提供一種思路,所以涉及到具體細(xì)節(jié)或者業(yè)務(wù)相關(guān)的可能沒有體現(xiàn)出來,還望大家理解。

實(shí)現(xiàn)

咱們的重點(diǎn)關(guān)鍵字就是兩個(gè)WebSocket集群,實(shí)現(xiàn)的框架便是基于ASP.NET Core,我也基于golang實(shí)現(xiàn)了一套,本文涉及到的相關(guān)源碼和golang版本的實(shí)現(xiàn)都已上傳至我的github,具體倉(cāng)庫(kù)地址可以轉(zhuǎn)到文末自行跳轉(zhuǎn)到#示例源碼中查看。既然涉及到集群,這里咱們就用nginx作為反向代理,來搭建一個(gè)集群實(shí)例。大致的示例結(jié)構(gòu)如下圖所示關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端`

redis在這里扮演的角色呢,是用來處理Server端的消息相互傳遞用的,主要是使用的redis的pub/sub`功能來實(shí)現(xiàn)的,這里便涉及到幾個(gè)核心問題

  • 首先,集群狀態(tài)每個(gè)用戶被分發(fā)到具體的哪臺(tái)服務(wù)器上是不得而知的
  • 其次,處在不同Server端的不同用戶間的相互通信是需要一個(gè)傳遞媒介
  • 最后,針對(duì)不同的場(chǎng)景比如單發(fā)消息、分組消息、全部通知等要有不同的處理策略

這里需要考慮的是,如果需要搭建實(shí)時(shí)通信服務(wù)器的話,需要注意集群的隔離性,主要是和核心業(yè)務(wù)進(jìn)行隔離,畢竟WebSocket需要保持長(zhǎng)鏈接、且消息的大小需要評(píng)估。

上面提到了redis的主要功能就是用來傳遞消息用的,畢竟每個(gè)server服務(wù)器是無狀態(tài)的。這當(dāng)然不是必須的,任何可以進(jìn)行消息分發(fā)的中間件都可以,比如消息隊(duì)列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要處理的消息存儲(chǔ)起來都可以比如緩存甚至是關(guān)系型數(shù)據(jù)庫(kù)等等。這壓力使用redis主要是因?yàn)椴僮髌饋砗?jiǎn)單、輕量級(jí)、靈活,讓大家關(guān)注點(diǎn)在思路上,而不是使用中案件的代碼上。

nginx配置

通過上面的圖我們可以看到,我們這里構(gòu)建集群示例使用的nginx,如果讓nginx支持WebSocket的話,需要額外的配置,這個(gè)在網(wǎng)上有很多相關(guān)的文章介紹,這里就來列一下咱們示例的nginx配置,在配置文件nginx.conf

//上游服務(wù)器地址也就是websocket服務(wù)的真實(shí)地址
upstream wsbackend {
    server 127.0.0.1:5001;
    server 127.0.0.1:5678;
}

server {
    listen       5000;
    server_name  localhost;

    location ~/chat/{
        //upstream地址
        proxy_pass http://wsbackend;
        proxy_connect_timeout 60s; 
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        //記得轉(zhuǎn)發(fā)避免踩坑
        proxy_set_header Host $host;
        proxy_http_version 1.1; 
        //http升級(jí)成websocket協(xié)議的頭標(biāo)識(shí)
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
    }
}

這套配置呢,在搜索引擎上能收到很多,不過不妨礙我把使用的粘貼出來。這一套親測(cè)有效,也是我使用的配置,請(qǐng)放心使用。個(gè)人認(rèn)為如果是線上環(huán)境采用的負(fù)載均衡策略可以選擇ip_hash的方式,保證同一個(gè)ip的客戶端用戶可以分發(fā)到一臺(tái)WebSocket實(shí)例中去,這樣的話能盡量避免使用redis的用戶頻道做消息傳遞。好了,接下來準(zhǔn)備開始展示具體實(shí)現(xiàn)的代碼了。

一對(duì)一發(fā)送

首先介紹的就是一對(duì)一發(fā)送的情況,也就是我把消息發(fā)給你,聊天的時(shí)候私聊的情況。這里呢涉及到兩種情況

  • 如果你需要通信的客戶端和你連接在一個(gè)Server端里,這樣的話可以直接在鏈接里找到這個(gè)端的通信實(shí)例直接發(fā)送。
  • 如果你需要通信的客戶端和你不在一個(gè)Server端里,這個(gè)時(shí)候咱們就需要借助redis的pub/sub的功能,把消息傳遞給另一個(gè)Server端。

咱們通過一張圖大致的展示一下它的工作方式
關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端
解釋一下,每個(gè)客戶端注冊(cè)到WebSocket服務(wù)里的時(shí)候會(huì)在redis里訂閱一個(gè)user:用戶唯一標(biāo)識(shí)的頻道,這個(gè)頻道用于接收和當(dāng)前WebSocket連接不在一個(gè)服務(wù)端的其他WebSocket發(fā)送過來的消息。每次發(fā)送消息的時(shí)候你會(huì)知道你要發(fā)送給誰,不在當(dāng)前服務(wù)器的話則發(fā)送到redis的user:用戶唯一標(biāo)識(shí)頻道,這樣的話目標(biāo)WebSocket就能收到消息了。首先是注入相關(guān)的依賴項(xiàng),這里我使用的redis客戶端是freeredis,主要是因?yàn)椴僮髌饋砗?jiǎn)單,具體實(shí)現(xiàn)代碼如下

var builder = WebApplication.CreateBuilder(args);
//注冊(cè)freeredis
builder.Services.AddSingleton(provider => {
    var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();
    RedisClient cli = new RedisClient("127.0.0.1:6379");
    cli.Notice += (s, e) => logger?.LogInformation(e.Log);
    return cli;
});
//注冊(cè)WebSocket具體操作的類
builder.Services.AddSingleton<WebSocketHandler>();
builder.Services.AddControllers();

var app = builder.Build();

var webSocketOptions = new WebSocketOptions
{
    KeepAliveInterval = TimeSpan.FromMinutes(2)
};
//注冊(cè)WebSocket中間件
app.UseWebSockets(webSocketOptions);

app.MapGet("/", () => "Hello World!");
app.MapControllers();

app.Run();

接下來我們定義一個(gè)Controller用來處理WebSocket請(qǐng)求

public class WebSocketController : ControllerBase
{
    private readonly ILogger<WebSocketController> _logger;
    private readonly WebSocketHandler _socketHandler;

    public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler)
    {
        _logger = logger;
        _socketHandler = socketHandler;
    }
    
    //這里的id代表當(dāng)前連接的客戶端唯一標(biāo)識(shí)比如用戶唯一標(biāo)識(shí)
    [HttpGet("/chat/user/{id}")]
    public async Task ChatUser(string id)
    {
        //判斷是否是WebSocket請(qǐng)求
        if (HttpContext.WebSockets.IsWebSocketRequest)
        {
            _logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");

            var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
            //處理請(qǐng)求相關(guān)
            await _socketHandler.Handle(id, webSocket);
        }
        else
        {
            HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
        }
    }
}

這里的WebSocketHandler是用來處理具體邏輯用的,咱們看一下相關(guān)代碼

public class WebSocketHandler:IDisposable
{
    //存儲(chǔ)當(dāng)前服務(wù)用戶的集合
    private readonly UserConnection UserConnection = new();
    //redis頻道前綴
    private readonly string userPrefix = "user:";
    //用戶對(duì)應(yīng)的redis頻道
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();

    private readonly ILogger<WebSocketHandler> _logger;
    //redis客戶端
    private readonly RedisClient _redisClient;

    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }

    public async Task Handle(string id, WebSocket webSocket)
    {
        //把當(dāng)前用戶連接存儲(chǔ)起來
        _ = UserConnection.GetOrAdd(id, webSocket);
        //訂閱一個(gè)當(dāng)前用戶的頻道
        await SubMsg($"{userPrefix}{id}");

        var buffer = new byte[1024 * 4];
        //接收發(fā)送過來的消息,這個(gè)方法是阻塞的,如果沒收到消息則一直阻塞
        var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        //循環(huán)接收消息
        while (webSocket.State == WebSocketState.Open)
        {
            try
            {
                //因?yàn)榫彌_區(qū)長(zhǎng)度是固定的所以要獲取實(shí)際長(zhǎng)度
                string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');
                //接收的到消息轉(zhuǎn)換成實(shí)體
                MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);
                //發(fā)送到其他客戶端的數(shù)據(jù)
                byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");
                _logger.LogInformation($"user {id} send:{msgBody.Msg}");
                 
                //判斷目標(biāo)客戶端是否在當(dāng)前當(dāng)前服務(wù),如果在當(dāng)前服務(wù)直接扎到目標(biāo)連接直接發(fā)送
                if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket))
                {
                    if (targetSocket.State == WebSocketState.Open)
                    {
                        await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);
                    }
                }
                else
                {
                    //如果要發(fā)送的目標(biāo)端不在當(dāng)前服務(wù),則發(fā)送給目標(biāo)redis端的頻道
                    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };
                    //目標(biāo)的redis頻道
                    _redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));
                }
                
                //繼續(xù)阻塞循環(huán)接收消息
                receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                break;
            }
        }
        
        //循環(huán)結(jié)束意味著當(dāng)前端已經(jīng)退出
        //從當(dāng)前用戶的集合移除當(dāng)前用戶
        _ = UserConnection.TryRemove(id, out _);
        //關(guān)閉當(dāng)前WebSocket連接
        await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
        //在當(dāng)前訂閱集合移除當(dāng)前用戶
        _disposables.TryRemove($"{userPrefix}{id}", out var disposable);
        //關(guān)閉當(dāng)前用戶的通道
        disposable.Dispose();
    }

    private async Task SubMsg(string channel)
    {
        //訂閱當(dāng)前用戶頻道
        var sub = _redisClient.Subscribe(channel,  async (channel, data) => {
            //接收過來當(dāng)前頻道數(shù)據(jù),說明發(fā)送端不在當(dāng)前服務(wù)
            ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
            byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");
            //在當(dāng)前服務(wù)找到目標(biāo)的WebSocket連接并發(fā)送消息
            if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket))
            {
                if (targetSocket.State == WebSocketState.Open)
                {
                    await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
                }
            }
        });
        //把redis訂閱頻道添加到集合中
        _disposables.TryAdd(channel, sub);
    }
    
    //程序退出的時(shí)候取消當(dāng)前服務(wù)訂閱的redis頻道
    public void Dispose()
    {
        foreach (var disposable in _disposables)
        {
            disposable.Value.Dispose();
        }

        _disposables.Clear();
    }
}

這里涉及到幾個(gè)輔助相關(guān)的類,其中UserConnection類是存儲(chǔ)注冊(cè)到當(dāng)前服務(wù)的連接,MsgBody類用來接受客戶端發(fā)送過來的消息,ChannelMsgBody是用來發(fā)送redis頻道的相關(guān)消息,因?yàn)橐严嚓P(guān)消息通過redis發(fā)布出去,咱們列一下這幾個(gè)類的相關(guān)代碼

//注冊(cè)到當(dāng)前服務(wù)的連接
public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>
{
    //存儲(chǔ)用戶唯一標(biāo)識(shí)和WebSocket的對(duì)應(yīng)關(guān)系
    private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();

    //當(dāng)前服務(wù)的用戶數(shù)量
    public int Count => _users.Count;

    public WebSocket GetOrAdd(string userId, WebSocket webSocket)
    {
        return _users.GetOrAdd(userId, webSocket);
    }

    public bool TryGetValue(string userId, out WebSocket webSocket)
    {
        return _users.TryGetValue(userId, out webSocket);
    }

    public bool TryRemove(string userId, out WebSocket webSocket)
    {
        return _users.TryRemove(userId, out webSocket);
    }

    public void Clear()
    {
        _users.Clear();
    }

    public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator()
    {
        return _users.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

//客戶端消息
public class MsgBody
{
    //目標(biāo)用戶標(biāo)識(shí)
    public string Id { get; set; }
    //要發(fā)送的消息
    public string Msg { get; set; }
}

//頻道訂閱消息
public class ChannelMsgBody
{
    //用戶標(biāo)識(shí)
    public string FromId { get; set; }
    //目標(biāo)用戶標(biāo)識(shí),也就是要發(fā)送給誰
    public string ToId { get; set; }
    //要發(fā)送的消息
    public string Msg { get; set; }
}

這樣的話關(guān)于一對(duì)一發(fā)送消息的相關(guān)邏輯就實(shí)現(xiàn)完成了,啟動(dòng)兩個(gè)Server端,由于nginx默認(rèn)的負(fù)載均衡策略是輪詢,所以注冊(cè)兩個(gè)用戶的話會(huì)被分發(fā)到不同的服務(wù)里去
關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端
Postman連接三個(gè)連接唯一標(biāo)識(shí)分別是1、2、3,模擬一下消息發(fā)送,效果如下,發(fā)送效果
關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端
接收效果
關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端

群組發(fā)送

上面我們展示了一對(duì)一發(fā)送的情況,接下來我們來看一下,群組發(fā)送的情況。群組發(fā)送的話就是只要大家都加入一個(gè)群組,只要客戶端在群組里發(fā)送一條消息,則注冊(cè)到當(dāng)前群組內(nèi)的所有客戶端都可以收到消息。相對(duì)于一對(duì)一的情況就是如果當(dāng)前WebSocket服務(wù)端如果存在用戶加入某個(gè)群組,則當(dāng)前當(dāng)前WebSocket服務(wù)端則可以訂閱一個(gè)group:群組唯一標(biāo)識(shí)的redis頻道,集群中的其他WebSocket服務(wù)器通過這個(gè)redis頻道接收群組消息,通過一張圖描述一下關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端
群組的實(shí)現(xiàn)方式相對(duì)于一對(duì)一要簡(jiǎn)單一點(diǎn)

  • 發(fā)送端可以不用考慮當(dāng)前服務(wù)中的客戶端連接,一股腦的交給redis把消息發(fā)布出去
  • 如果有WebSocket服務(wù)中的用戶訂閱了當(dāng)前分組則可以接受消息,獲取組內(nèi)的用戶循環(huán)發(fā)送消息

展示一下代碼實(shí)現(xiàn)的方式,首先是定義一個(gè)action用于表示群組的相關(guān)場(chǎng)景

//包含兩個(gè)標(biāo)識(shí)一個(gè)是組別標(biāo)識(shí)一個(gè)是注冊(cè)到組別的用戶
[HttpGet("/chat/group/{groupId}/{userId}")]
public async Task ChatGroup(string groupId, string userId)
{
    if (HttpContext.WebSockets.IsWebSocketRequest)
    {
        _logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");

        var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
        //調(diào)用HandleGroup處理群組相關(guān)的消息
        await _socketHandler.HandleGroup(groupId, userId, webSocket);
    }
    else
    {
        HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
    }
}

接下來看一下HandleGroup的相關(guān)邏輯,還是在WebSocketHandler類中,看一下代碼實(shí)現(xiàn)

public class WebSocketHandler:IDisposable
{
    private readonly UserConnection UserConnection = new();
    private readonly GroupUser GroupUser = new();
    private readonly SemaphoreSlim _lock = new(1, 1);
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
    private readonly string groupPrefix = "group:";

    private readonly ILogger<WebSocketHandler> _logger;
    private readonly RedisClient _redisClient;

    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }

    public async Task HandleGroup(string groupId, string userId, WebSocket webSocket)
    {
        //因?yàn)槿航M的集合可能會(huì)存在很多用戶一起訪問所以限制訪問數(shù)量
        await _lock.WaitAsync();
        //初始化群組容器 群唯一標(biāo)識(shí)為key 群?jiǎn)T容器為value
        var currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });
        //當(dāng)前用戶加入當(dāng)前群組
        _ = currentGroup.GetOrAdd(userId, webSocket);
        //只有有當(dāng)前WebSocket服務(wù)的第一個(gè)加入當(dāng)前組的時(shí)候才去訂閱群組頻道
        //如果不限制的話則會(huì)出現(xiàn)如果當(dāng)前WebSocket服務(wù)有多個(gè)用戶在一個(gè)組內(nèi)則會(huì)重復(fù)收到redis消息
        if (currentGroup.Count == 1)
        {
            //訂閱redis頻道
            await SubGroupMsg($"{groupPrefix}{groupId}");
        }

        _lock.Release();

        var buffer = new byte[1024 * 4];
        //阻塞接收WebSocket消息
        var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        //服務(wù)不退出的話則一直等待接收
        while (webSocket.State == WebSocketState.Open)
        {
            try
            {
                string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');
                _logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}");

                //組裝redis頻道發(fā)布的消息,目標(biāo)為群組標(biāo)識(shí)
                ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg };
                //通過redis發(fā)布消息
                _redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));

                receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                break;
            }
        }
        //如果客戶端退出則在當(dāng)前群組集合刪除當(dāng)前用戶
        _ = currentGroup.TryRemove(userId, out _);
        await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
    }

    private async Task SubGroupMsg(string channel)
    {
        var sub = _redisClient.Subscribe(channel, async (channel, data) => {
            ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
            byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");

            //在當(dāng)前WebSocket服務(wù)器找到當(dāng)前群組里的用戶
            GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);
            //循環(huán)當(dāng)前WebSocket服務(wù)器里的用戶發(fā)送消息
            foreach (var user in currentGroup)
            {
                //不用給自己發(fā)送了
                if (user.Key == msgBody.FromId)
                {
                    continue;
                }

                if (user.Value.State == WebSocketState.Open)
                {
                    await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
                }
            }
        });
        //把當(dāng)前頻道加入訂閱集合
        _disposables.TryAdd(channel, sub);
    }
}

這里涉及到了GroupUser類,是來存儲(chǔ)群組和群組用戶的對(duì)應(yīng)關(guān)系的,定義如下

public class GroupUser
{
    //key為群組的唯一標(biāo)識(shí)
    public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>();
}

演示一下把兩個(gè)用戶添加到一個(gè)群組內(nèi),然后發(fā)送接收消息的場(chǎng)景,用戶u1發(fā)送
關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端
用戶u2接收
關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端

發(fā)送所有人

發(fā)送給所有用戶的邏輯比較簡(jiǎn)單,不用考慮到用戶限制,只要用戶連接到了WebSocket集群則都可以接收到這個(gè)消息,大致工作方式如下圖所示
關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端
這個(gè)比較簡(jiǎn)單,咱們直接看實(shí)現(xiàn)代碼,首先是定義一個(gè)地址,用于發(fā)布消息

//把用戶注冊(cè)進(jìn)去
[HttpGet("/chat/all/{id}")]
public async Task ChatAll(string id)
{
    if (HttpContext.WebSockets.IsWebSocketRequest)
    {
        _logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");

        var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
        await _socketHandler.HandleAll(id, webSocket);
    }
    else
    {
        HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
    }
}

具體的實(shí)現(xiàn)邏輯還是在HandleGroup類里,是HandleAll方法,看一下具體實(shí)現(xiàn)

public class WebSocketHandler:IDisposable
{
    private readonly UserConnection AllConnection = new();
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
    private readonly string all = "all";

    private readonly ILogger<WebSocketHandler> _logger;
    private readonly RedisClient _redisClient;

    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }

    public async Task HandleAll(string id, WebSocket webSocket)
    {
        await _lock.WaitAsync();
        //把用戶加入用戶集合
        _ = AllConnection.GetOrAdd(id, webSocket);
        //WebSocket集群中的每個(gè)服務(wù)只定義一次
        if (AllConnection.Count == 1)
        {
            await SubAllMsg(all);
        }

        _lock.Release();

        var buffer = new byte[1024 * 4];
        //阻塞接收信息
        var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        while (webSocket.State == WebSocketState.Open)
        {
            try
            {
                string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');
                _logger.LogInformation($"user {id} send:{msg}");
                //獲取接收信息
                ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };
                //把消息通過redis發(fā)布到集群中的其他服務(wù)
                _redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
  
                receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                break;
            }
        }
        //用戶退出則刪除集合中的當(dāng)前用戶信息
        _ = AllConnection.TryRemove(id, out _);
        await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
    }

    private async Task SubAllMsg(string channel)
    {
        var sub = _redisClient.Subscribe(channel, async (channel, data) => {
            ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
            byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");
            //接收到消息后遍歷用戶集合把消息發(fā)送給所有用戶
            foreach (var user in AllConnection)
            {   
                //如果包含當(dāng)前用戶跳過
                if (user.Key == msgBody.FromId)
                {
                    continue;
                }

                if (user.Value.State == WebSocketState.Open)
                {
                    await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
                }
            }
        });
        _disposables.TryAdd(channel, sub);
    }
}

效果在這里就不展示了,和群組的效果是類似的,只是一個(gè)是部分用戶,一個(gè)是全部的用戶。

整合到一起

上面我們分別展示了一對(duì)一、群組、所有人的場(chǎng)景,但是實(shí)際使用的時(shí)候,每個(gè)用戶只需要注冊(cè)到WebSocket集群一次也就是保持一個(gè)連接即可,而不是一對(duì)一一個(gè)連接、注冊(cè)群組一個(gè)連接、所有消息的時(shí)候一個(gè)連接。所以我們需要把上面的演示整合一下,一個(gè)用戶只需要連接到WebSocket集群一次即可,至于發(fā)送給誰,加入什么群組,接收全部消息等都是連接后通過一些標(biāo)識(shí)區(qū)分的,而不必每個(gè)類型的操作都注冊(cè)一次,就和微信和QQ一樣我只要登錄了即可,至于其他操作都是靠數(shù)據(jù)標(biāo)識(shí)區(qū)分的。接下來咱們就整合一下代碼達(dá)到這個(gè)效果,大致的思路是

  • 用戶連接到WebSocket集群,把用戶和連接保存到當(dāng)前WebSocket服務(wù)器的用戶集合中去。
  • 一對(duì)一發(fā)送的時(shí)候,只需要在具體的服務(wù)器中找到具體的客戶端發(fā)送消息
  • 群組的時(shí)候,先把當(dāng)前用戶標(biāo)識(shí)加入群組集合即可,接收消息的時(shí)候根據(jù)群組集合里的用戶標(biāo)識(shí)去用戶集合里去拿具體的WebSocket連接發(fā)送消息
  • 全員消息的時(shí)候,直接遍歷集群中的每個(gè)WebSocket服務(wù)里的用戶集合里的WebSocket連接訓(xùn)話發(fā)送消息

這樣的話就保證了每個(gè)客戶端用戶在集群中只會(huì)綁定一個(gè)連接,首先還是單獨(dú)定義一個(gè)action,用于讓客戶端用戶連接上來,具體實(shí)現(xiàn)代碼如下所示

public class WebSocketChannelController : ControllerBase
{
    private readonly ILogger<WebSocketController> _logger;
    private readonly WebSocketChannelHandler _webSocketChannelHandler;

    public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler)
    {
        _logger = logger;
        _webSocketChannelHandler = webSocketChannelHandler;
    }

    //只需要把當(dāng)前用戶連接到服務(wù)即可
    [HttpGet("/chat/channel/{id}")]
    public async Task Channel(string id)
    {
        if (HttpContext.WebSockets.IsWebSocketRequest)
        {
            _logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");

            var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
            await _webSocketChannelHandler.HandleChannel(id, webSocket);
        }
        else
        {
            HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
        }
    }
}

接下來看一下WebSocketChannelHandler類的HandleChannel方法實(shí)現(xiàn),用于處理不同的消息,比如一對(duì)一、群組、全員消息等不同類型的消息

public class WebSocketChannelHandler : IDisposable
{
    //用于存儲(chǔ)當(dāng)前WebSocket服務(wù)器鏈接上來的所有用戶對(duì)應(yīng)關(guān)系
    private readonly UserConnection UserConnection = new();
    //用于存儲(chǔ)群組和用戶關(guān)系,用戶集合采用HashSet保證每個(gè)用戶只加入一個(gè)群組一次
    private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>();
    private readonly SemaphoreSlim _lock = new(1, 1);
    //存放redis訂閱實(shí)例
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();

    //一對(duì)一redis頻道前綴
    private readonly string userPrefix = "user:";
    //群組redis頻道前綴
    private readonly string groupPrefix = "group:";
    //全員redis頻道
    private readonly string all = "all";

    private readonly ILogger<WebSocketHandler> _logger;
    private readonly RedisClient _redisClient;

    public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }

    public async Task HandleChannel(string id, WebSocket webSocket)
    {
        await _lock.WaitAsync();

        //每次連接進(jìn)來就添加到用戶集合
        _ = UserConnection.GetOrAdd(id, webSocket);
        
        //每個(gè)WebSocket服務(wù)實(shí)例只需要訂閱一次全員消息頻道
        await SubMsg($"{userPrefix}{id}");
        if (UserConnection.Count == 1)
        {
            await SubAllMsg(all);
        }

        _lock.Release();
        var buffer = new byte[1024 * 4];
        //接收客戶端消息
        var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

        while (webSocket.State == WebSocketState.Open)
        {
            try
            {
                string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');
                //讀取客戶端消息
                ChannelData channelData = JsonConvert.DeserializeObject<ChannelData>(msg);
                //判斷消息類型
                switch (channelData.Method)
                {
                    //一對(duì)一
                    case "One":
                        await HandleOne(id, channelData.MsgBody, receiveResult);
                        break;
                    //把用戶加入群組
                    case "UserGroup":
                        await AddUserGroup(id, channelData.Group, webSocket);
                        break;
                    //處理群組消息
                    case "Group":
                        await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);
                        break;
                    //處理全員消息
                    default:
                        await HandleAll(id, channelData.MsgBody);
                        break;
                }

                receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                break;
            }
        }

        await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);

        //在群組中移除當(dāng)前用戶
        foreach (var users in GroupUser.Values)
        {
            lock (users)
            {
                users.Remove(id);
            }
        }
        //當(dāng)前客戶端用戶退出則移除連接
        _ = UserConnection.TryRemove(id, out _);
        //取消用戶頻道訂閱
        _disposables.Remove($"{userPrefix}{id}", out var sub);
        sub?.Dispose();
    }

    public void Dispose()
    {
        foreach (var disposable in _disposables)
        {
            disposable.Value.Dispose();
        }

        _disposables.Clear();
    }
}

這里涉及到了ChannelData類是用于接收客戶端消息的類模板,具體定義如下

public class ChannelData
{
    //消息類型 比如一對(duì)一 群組 全員
    public string Method { get; set; }
    //群組標(biāo)識(shí)
    public string Group { get; set; }
    //消息體
    public object MsgBody { get; set; }
}

類中并不會(huì)包含當(dāng)前用戶信息,因?yàn)檫B接到當(dāng)前服務(wù)的時(shí)候已經(jīng)提供了客戶端唯一標(biāo)識(shí)。結(jié)合上面的處理代碼我們可以看出,客戶端用戶連接到WebSocket實(shí)例之后,先注冊(cè)當(dāng)前用戶的redis訂閱頻道并且當(dāng)前實(shí)例僅注冊(cè)一次全員消息的redis頻道,用于處理非當(dāng)前實(shí)例注冊(cè)客戶端的一對(duì)一消息處理和全員消息處理,然后等待接收客戶端消息,根據(jù)客戶端消息的消息類型來判斷是進(jìn)行一對(duì)一、群組、或者全員的消息類型處理,它的工作流程入下圖所示關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考,asp.net core,asp.net,websocket,后端
由代碼和上面的流程圖可知,它根據(jù)不同的標(biāo)識(shí)去處理不同類型的消息,接下來我們可以看下每種消息類型的處理方式。

一對(duì)一處理

首先是一對(duì)一的消息處理情況,看一下具體的處理邏輯,首先是一對(duì)一發(fā)布消息

 private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult)
 {
    MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));
    byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");
    _logger.LogInformation($"user {id} send:{msgBody.Msg}");

    //判斷目標(biāo)用戶是否在當(dāng)前WebSocket服務(wù)器
    if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket))
    {
        if (targetSocket.State == WebSocketState.Open)
        {
            await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);
        }
    }
    else
    {
        //如果不在當(dāng)前服務(wù)器,則直接把消息發(fā)布到具體的用戶頻道去,由具體用戶去訂閱
        ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };
        _redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));
    }
}

接下來是用于處理訂閱其他用戶發(fā)送過來消息的邏輯,這個(gè)和整合之前的邏輯是一致的,在當(dāng)前服務(wù)器中找到用戶對(duì)應(yīng)的連接,發(fā)送消息

private async Task SubMsg(string channel)
{
    var sub = _redisClient.Subscribe(channel, async (channel, data) =>
    {
        ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
        byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");
        if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket))
        {
            if (targetSocket.State == WebSocketState.Open)
            {
                await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
            }
            else
            {
                _ = UserConnection.TryRemove(msgBody.FromId, out _);
            }
        }
    });
    //把訂閱實(shí)例加入集合
    _disposables.TryAdd(channel, sub);
}

如果給某個(gè)用戶發(fā)送消息則可以使用如下的消息格式

{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}

Method為One代表著是私聊一對(duì)一的情況,消息體內(nèi)Id為要發(fā)送給的具體用戶標(biāo)識(shí)和消息體。

群組處理

接下來看群組處理方式,這個(gè)和之前的邏輯是有出入的,首先是用戶要先加入到某個(gè)群組然后才能接收群組消息或者在群組中發(fā)送消息,之前是一個(gè)用戶對(duì)應(yīng)多個(gè)連接,整合了之后集群中每個(gè)用戶只關(guān)聯(lián)唯一的一個(gè)WebSocket連接,首先看用戶加入群組的邏輯

private async Task AddUserGroup(string user, string group, WebSocket webSocket)
{
    //獲取群組信息
    var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());

    lock (currentGroup)
    {
       //把用戶標(biāo)識(shí)加入當(dāng)前組
        _ = currentGroup.Add(user);
    }

    //每個(gè)組的redis頻道,在每臺(tái)WebSocket服務(wù)器實(shí)例只注冊(cè)一次訂閱
    if (currentGroup.Count == 1)
    {
        //訂閱當(dāng)前組消息
        await SubGroupMsg($"{groupPrefix}{group}");
    }
    
    string addMsg = $"user 【{user}】 add  to group 【{group}】";
    byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);
    await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
    //如果有用戶加入群組,則通知其他群成員
    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };
    _redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));
}

用戶想要在群組內(nèi)發(fā)消息,則必須先加入到一個(gè)具體的群組內(nèi),具體的加入群組的格式如下

{"Method":"UserGroup", "Group":"g1"}

Method為UserGroup代表著用戶加入群組的業(yè)務(wù)類型,Group代表著你要加入的群組唯一標(biāo)識(shí)。接下來就看下,用戶發(fā)送群組消息的邏輯了

private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody)
{
    //判斷群組是否存在
    var hasValue = GroupUser.TryGetValue(groupId, out var users);
    if (!hasValue)
    {
        byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");
        await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
        return;
    }

    //只有加入到當(dāng)前群組,才能在群組內(nèi)發(fā)送消息
    if (!users.Contains(userId))
    {
        byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");
        await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
        return;
    }

    _logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");

    //發(fā)送群組消息
    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };
    _redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));
}

加入群組之后則可以發(fā)送和接收群組內(nèi)的消息了,給群組發(fā)送消息的格式如下

{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}

Method為Group代表著用戶加入群組的業(yè)務(wù)類型,Group則代表你要發(fā)送到具體的群組的唯一標(biāo)識(shí),MsgBody則是發(fā)送到群組內(nèi)的消息。最后再來看下訂閱群組內(nèi)消息的情況,也就是處理群組消息的邏輯

private async Task SubGroupMsg(string channel)
{
    var sub = _redisClient.Subscribe(channel, async (channel, data) =>
    {
        //接收群組訂閱消息
        ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
        byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");
        
        //獲取當(dāng)前服務(wù)器實(shí)例中當(dāng)前群組的所有用戶連接
        GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);
        foreach (var user in currentGroup)
        {
            if (user == msgBody.FromId)
            {
                continue;
            }
            
            //通過群組內(nèi)的用戶標(biāo)識(shí)去用戶集合獲取用戶集合里的用戶唯一連接發(fā)送消息
            if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open)
            {
                await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
            }
            else
            {
                currentGroup.Remove(user);
            }
        }
    });
    _disposables.TryAdd(channel, sub);
}
全員消息處理

全員消息處理相對(duì)來說思路比較簡(jiǎn)單,因?yàn)楫?dāng)服務(wù)啟動(dòng)的時(shí)候就會(huì)監(jiān)聽redis的全員消息頻道,這樣的話具體的實(shí)現(xiàn)也就只包含發(fā)送和接收全員消息了,首先看一下全員消息發(fā)送的邏輯

private async Task HandleAll(string id, object msgBody)
{
    _logger.LogInformation($"user {id} send:{msgBody}");

    //直接給redis的全員頻道發(fā)送消息
    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() };
    _redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
}

全員消息的發(fā)送數(shù)據(jù)格式如下所示

{"Method":"All", "MsgBody":"Hello All"}

Method為All代表著全員消息類型,MsgBody則代表著具體消息。接收消息出里同樣很簡(jiǎn)單,訂閱redis全員消息頻道,然后遍歷當(dāng)前WebSocket服務(wù)器實(shí)例內(nèi)的所有用戶獲取連接發(fā)送消息,具體邏輯如下

private async Task SubAllMsg(string channel)
{
    var sub = _redisClient.Subscribe(channel, async (channel, data) =>
    {
        ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
        byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");
        //獲取當(dāng)前服務(wù)器實(shí)例內(nèi)所有用戶的連接
        foreach (var user in UserConnection)
        {
            //不給自己發(fā)送消息,因?yàn)榘l(fā)送的時(shí)候可以通過具體的業(yè)務(wù)代碼處理
            if (user.Key == msgBody.FromId)
            {
                continue;
            }
            
            //給每個(gè)用戶發(fā)送消息
            if (user.Value.State == WebSocketState.Open)
            {
                await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
            }
            else
            {
                _ = UserConnection.TryRemove(user.Key, out _);
            }
        }
    });
    _disposables.TryAdd(channel, sub);
}

示例源碼

由于篇幅有限,沒辦法設(shè)計(jì)到全部的相關(guān)源碼,因此在這里貼出來github相關(guān)的地址,方便大家查看和運(yùn)行源碼。相關(guān)的源碼我這里實(shí)現(xiàn)了兩個(gè)版本,一個(gè)是基于asp.net core的版本,一個(gè)是基于golang的版本。兩份源碼的實(shí)現(xiàn)思路是一致的,所以這兩份代碼可以運(yùn)行在一套集群示例里,配置在一套nginx里,并且連接到同一個(gè)redis實(shí)例里即可

  • asp.net core源碼示例 https://github.com/softlgl/WebsocketCluster
  • golang源碼示例 https://github.com/softlgl/websocket-cluster

倉(cāng)庫(kù)里還涉及到本人閑暇之余開源的其他倉(cāng)庫(kù),由于本人能力有限難登大雅之堂,就不做廣告了,有興趣的同學(xué)可以自行瀏覽一下。

總結(jié)

????本文基于ASP.NET Core框架提供了一個(gè)基于WebSocket做集群的示例,由于思想是通用的,所以基于這個(gè)思路樓主也實(shí)現(xiàn)了golang版本。其實(shí)在之前就想自己動(dòng)手搞一搞關(guān)于WebSocket集群方面的設(shè)計(jì),本篇文章算是對(duì)之前想法的一個(gè)落地操作。其核心思路文章已經(jīng)做了相關(guān)介紹,由于這些只是博主關(guān)于構(gòu)思的實(shí)現(xiàn),可能有很多細(xì)節(jié)尚未體現(xiàn)到,還希望大家多多理解。其核心思路總結(jié)一下

  • 首先是,利用可以構(gòu)建WebSocket服務(wù)的框架,在當(dāng)前服務(wù)實(shí)例中保存當(dāng)前客戶端用戶和WebSocket的連接關(guān)系
  • 如果消息的目標(biāo)客戶端不在當(dāng)前服務(wù)器,可以利用redis頻道、消息隊(duì)列相關(guān)、甚至是數(shù)據(jù)庫(kù)類的共享回話發(fā)送的消息,由目標(biāo)服務(wù)器獲取目標(biāo)是否屬于自己的ws會(huì)話
  • 本文設(shè)計(jì)的思路使用的是無狀態(tài)的方式,即WebSocket服務(wù)實(shí)例之間不存在直接的消息通信和相互的服務(wù)地址存儲(chǔ),當(dāng)然也可以利用redis等存儲(chǔ)在線用戶信息等,這個(gè)可以參考具體業(yè)務(wù)自行設(shè)計(jì)

讀萬卷書,行萬里路。在這個(gè)時(shí)刻都在變化點(diǎn)的環(huán)境里,唯有不斷的進(jìn)化自己,多接觸多嘗試不用的事物,多擴(kuò)展自己的認(rèn)知思維,方能構(gòu)建自己的底層邏輯。畢竟越底層越抽象,越通用越抽象。面對(duì)未知的挑戰(zhàn),自身作為自己堅(jiān)強(qiáng)的后盾,可能才會(huì)讓自己更踏實(shí)。文章來源地址http://www.zghlxwxcb.cn/news/detail-851187.html

到了這里,關(guān)于關(guān)于ASP.NET Core WebSocket實(shí)現(xiàn)集群的思考的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • ASP.NET Core + Jenkins實(shí)現(xiàn)自動(dòng)化發(fā)布

    ASP.NET Core + Jenkins實(shí)現(xiàn)自動(dòng)化發(fā)布

    ??作者:科技、互聯(lián)網(wǎng)行業(yè)優(yōu)質(zhì)創(chuàng)作者 ??專注領(lǐng)域:.Net技術(shù)、軟件架構(gòu)、人工智能、數(shù)字化轉(zhuǎn)型、DeveloperSharp、微服務(wù)、工業(yè)互聯(lián)網(wǎng)、智能制造 ??歡迎關(guān)注我(Net數(shù)字智慧化基地),里面有很多 高價(jià)值 技術(shù)文章, 是你刻苦努力也積累不到的經(jīng)驗(yàn) ,能助你快速成長(zhǎng)。升職

    2024年02月22日
    瀏覽(21)
  • ASP.NET Core+Vue3 實(shí)現(xiàn)SignalR通訊

    ASP.NET Core+Vue3 實(shí)現(xiàn)SignalR通訊

    從ASP.NET Core 3.0版本開始,SignalR的Hub已經(jīng)集成到了ASP.NET Core框架中。因此,在更高版本的ASP.NET Core中,不再需要單獨(dú)引用Microsoft.AspNetCore.SignalR包來使用Hub。 在項(xiàng)目創(chuàng)建一個(gè)類繼承Hub, 首先是寫一個(gè)CreateConnection方法 ConnectionId是SignalR中標(biāo)識(shí)的客戶端連接的唯一標(biāo)識(shí)符, 將userId和

    2024年02月06日
    瀏覽(24)
  • 2步輕松實(shí)現(xiàn)ASP.NET Core托管服務(wù)執(zhí)行定時(shí)任務(wù)

    2步輕松實(shí)現(xiàn)ASP.NET Core托管服務(wù)執(zhí)行定時(shí)任務(wù)

    最近接到一個(gè)新項(xiàng)目,需要在項(xiàng)目里添加一個(gè)后臺(tái)任務(wù),定時(shí)去發(fā)郵件通知客戶;由于是一個(gè)比較小型的項(xiàng)目,不希望引入Quartz.Net、Hangfire等太重的框架,同時(shí)也沒持久化要;尋覓了一下發(fā)現(xiàn)ASP.NET Core本身帶有托管服務(wù),可以執(zhí)行定時(shí)任務(wù)。ASP.NET Core提供了IHostedService接口,

    2024年02月06日
    瀏覽(22)
  • ASP.NET和ASP.NET Core的區(qū)別

    ASP.NET和ASP.NET Core是兩個(gè)不同的Web應(yīng)用程序框架,它們都是由Microsoft開發(fā)的。ASP.NET是Microsoft推出的第一個(gè)Web應(yīng)用程序框架,而ASP.NET Core是其最新版本。本文將介紹ASP.NET和ASP.NET Core的簡(jiǎn)介和區(qū)別。 ASP.NET的簡(jiǎn)介 ASP.NET是一個(gè)基于.NET框架的Web應(yīng)用程序框架,它是Microsoft推出的第一

    2024年02月16日
    瀏覽(96)
  • ASP.NET Core教程:ASP.NET Core 程序部署到Windows系統(tǒng)

    ASP.NET Core教程:ASP.NET Core 程序部署到Windows系統(tǒng)

    本篇文章介紹如何將一個(gè)ASP.NET Core Web程序部署到Windows系統(tǒng)上。這里以ASP.NET Core WebApi為例進(jìn)行講解。首先創(chuàng)建一個(gè)ASP.NET Core WebApi項(xiàng)目,使用默認(rèn)的Values控制器,這里使用Visual Studio 2019創(chuàng)建一個(gè)ASP.NET Core 3.1d的WebApi項(xiàng)目。 創(chuàng)建新項(xiàng)目的時(shí)候選項(xiàng)ASP.NET Core Web應(yīng)用程序,如下圖所

    2023年04月08日
    瀏覽(103)
  • Asp.Net VS ASP.NET Core 請(qǐng)求管道

    Asp.Net VS ASP.NET Core 請(qǐng)求管道

    參考鏈接 ASP.NET CORE 啟動(dòng)過程及源碼解讀 請(qǐng)求進(jìn)入Asp.Net工作進(jìn)程后,由進(jìn)程創(chuàng)建HttpWorkRequest對(duì)象,封裝此次請(qǐng)求有關(guān)的所有信息,然后進(jìn)入HttpRuntime類進(jìn)行進(jìn)一步處理。HttpRuntime通過請(qǐng)求信息創(chuàng)建HttpContext上下文對(duì)象,此對(duì)象將貫穿整個(gè)管道,直到響應(yīng)結(jié)束。同時(shí)創(chuàng)建或從應(yīng)用

    2024年02月04日
    瀏覽(100)
  • Asp .Net Core 系列: 集成 Consul 實(shí)現(xiàn) 服務(wù)注冊(cè)與健康檢查

    Asp .Net Core 系列: 集成 Consul 實(shí)現(xiàn) 服務(wù)注冊(cè)與健康檢查

    官網(wǎng):https://www.consul.io/ Consul 是一款開源的服務(wù)發(fā)現(xiàn)和配置管理工具,它能夠監(jiān)控應(yīng)用程序和服務(wù)之間的通信,并提供了一組 API 和 Web UI,用于管理服務(wù)和配置。 Consul 是分布式的、高可用的、可橫向擴(kuò)展的,具備以下特性: 服務(wù)發(fā)現(xiàn):Consul 通過 DNS 或者 HTTP 接口使服務(wù)注冊(cè)

    2024年01月21日
    瀏覽(21)
  • ASP.NET Core使用JWT+標(biāo)識(shí)框架(identity)實(shí)現(xiàn)登錄驗(yàn)證

    ASP.NET Core使用JWT+標(biāo)識(shí)框架(identity)實(shí)現(xiàn)登錄驗(yàn)證

    最近閱讀了《ASP.NET Core 技術(shù)內(nèi)幕與項(xiàng)目實(shí)戰(zhàn)——基于DDD與前后端分離》(作者楊中科)的第八章,對(duì)于Core入門的我來說體會(huì)頗深,整理相關(guān)筆記。 JWT:全稱“JSON web toke”,目前流行的跨域身份驗(yàn)證解決方案; 標(biāo)識(shí)框架(identity):由ASP.NET Core提供的框架,它采用RBAC(role

    2024年02月11日
    瀏覽(25)
  • ASP.NET Core MVC -- 將視圖添加到 ASP.NET Core MVC 應(yīng)用

    ASP.NET Core MVC -- 將視圖添加到 ASP.NET Core MVC 應(yīng)用

    右鍵單擊“視圖”文件夾,然后單擊“添加”“新文件夾”,并將文件夾命名為“HelloWorld”。 右鍵單擊“Views/HelloWorld”文件夾,然后單擊“添加”“新項(xiàng)”。 在“添加新項(xiàng) - MvcMovie”對(duì)話框中: 在右上角的搜索框中,輸入“視圖” 選擇“Razor 視圖 - 空” 保持“名稱”框的

    2024年02月13日
    瀏覽(127)
  • ASP.NET Core 的 Web Api 實(shí)現(xiàn)限流 中間件

    ASP.NET Core 的 Web Api 實(shí)現(xiàn)限流 中間件

    Microsoft.AspNetCore.RateLimiting ?中間件提供速率限制(限流)中間件。 它是.NET 7 以上版本才支持的中間件,剛看了一下,確實(shí)挺好用,下面給大家簡(jiǎn)單介紹一下: RateLimiterOptionsExtensions?類提供下列用于限制速率的擴(kuò)展方法:????? 固定窗口限制器 滑動(dòng)窗口限制器 令牌桶限

    2024年01月17日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包