在 Golang 語言標準庫之中提供了,對于TCP/IP鏈接、偵聽器的高級封裝支持,這易于上層開發(fā)人員輕松基于這些BCL(基礎類庫)實現(xiàn)期望的功能。
TCP/IP鏈接(客戶端)
net.Conn 接口
TCP/IP偵聽器(服務器)
net.Listener
Golang 提供了易用的寫入數(shù)據(jù)到遠程(對端)實現(xiàn),而不比像 C/C++?這類傳統(tǒng)的編程語言,人們需要自行處理發(fā)送的字節(jié)數(shù)。
例如:
原生:send、WSASend、WSPSend 等函數(shù)
ASIO:stream::async_write_some? ? 等函數(shù)
它與 Microsoft .NET 提供的 System.Net.Socket 類的發(fā)送函數(shù)功能是類似的,調(diào)用該函數(shù)發(fā)送數(shù)據(jù),它會確保數(shù)據(jù)全部發(fā)送到對端(遠端),否則視為失敗。
在實際生產(chǎn)環(huán)境之中,絕大多數(shù)的場景上面,人們的確不需要調(diào)用一次發(fā)送函數(shù),但不保證本次期望傳送數(shù)據(jù)全部發(fā)送成功,而是潛在的可能只發(fā)送一部分,還需要開發(fā)人員自行處理,這樣繁瑣的TCP網(wǎng)絡程序發(fā)送實現(xiàn)的。
但這在一些特定場景的網(wǎng)絡程序上面是有意義的,例如我們需要知道已用掉了多少的流量,因為這一次緩沖區(qū)發(fā)送并沒有全部傳送到遠端,但已經(jīng)傳送了一部分也生產(chǎn)了網(wǎng)絡帶寬資源的浪費,所以,像這種問題,Golang 不提供類似接口,它這塊的不自由,是會有一些問題的。
較為慶幸的是:
net.Conn 接口提供的?Read 函數(shù)并非是保證一定讀入期望BUF大小的,否則這個在很多類型的網(wǎng)絡程序上面就很坑人了。
它就相當于傳統(tǒng)阻塞的?recv,不會出現(xiàn)非阻塞的EAGIN要求開發(fā)人員重試的操作的問題,所有它只有返回已收到的字節(jié)數(shù),或發(fā)生錯誤。
當然人們?nèi)孕杼幚硪粋€特殊的情況,recv 可能返回FIN 0字節(jié),但并非錯誤,這是因為對端正確的關閉了TCP鏈接時產(chǎn)生的。
但遇到類似這類型的場景還是用 C/C++、或者CGO調(diào)用原生C API來實現(xiàn)把,功能上面都可以解決,只是用GO語言整會很麻煩就是了。
本文提供一個簡單的網(wǎng)絡傳輸協(xié)議,適用四個字節(jié)來表示長度,一個字節(jié)來表示關鍵幀字,不考慮對于流的效驗合(checksum)的計算及驗證,人們?nèi)粲行枨罂梢宰孕行薷模诖蠖鄶?shù)的TCP應用協(xié)議服務器上面,它都可以經(jīng)過少量修改集成到解決方案之中。(Go 語言之中或許該稱為集成到 Package 程序包之中)
四個字節(jié)長度,可以描述到一幀最大?INT32_MAX(2147483647)字節(jié)封裝傳送,其實絕大多數(shù)情況傳遞大包是沒有太大意義的,人們可以自行評估調(diào)整。
值得一提,在絕大多數(shù)的場景之中,如若產(chǎn)生大包,三個字節(jié)來表示長度,人們自行位運算即可,這是因為過大的幀長,可能會導致網(wǎng)絡程序在接受這些大數(shù)據(jù)幀時,產(chǎn)生嚴重的內(nèi)存恐慌問題。
個人一個好的建議是,對于追求網(wǎng)絡吞吐性能的TCP應用協(xié)議,人們在適用 Golang 應該直接廢棄掉,沒有任何意義的各種接口及封裝實現(xiàn),如返回??io.Reader,并且應當適用固定緩沖區(qū)的最大幀對齊,如:4K,即用戶不要發(fā)送超過最大對齊(4K)的單幀報文。
隨機內(nèi)存分配會導致碎片化的產(chǎn)生,影響網(wǎng)絡程序的吞吐能力,同時頻繁的內(nèi)存復制也會導致內(nèi)存、及CPU計算資源負載升高。
但在大多數(shù)場景的網(wǎng)絡程序來說,并不需要在意這塊的優(yōu)化,因為沒有太大意義,但對于純網(wǎng)絡IO密集型應用來說,這是有很大必要的。
本文提供的實現(xiàn)不適用上述場景,但可以適用于略微帶一些大包處理(即用戶不愿意在業(yè)務層分片、組片的場景),但本人更希望大家趨近于共同學習目的。
運行測試:
go run -race test.go
服務器及客戶端實現(xiàn)及封裝:(含測試用例)文章來源:http://www.zghlxwxcb.cn/news/detail-820029.html
main.go文章來源地址http://www.zghlxwxcb.cn/news/detail-820029.html
package main
import (
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"math/rand"
"net"
"strconv"
"sync"
"time"
)
type _ConnectionReader struct {
owner *Connection
length int
offset int
checksum uint32
header_recv []byte
lock_recv sync.Mutex
}
type Connection struct {
disposed bool
connection net.Conn
header_send []byte
lock_sent sync.Mutex
reader *_ConnectionReader
listener *Listener
}
type Listener struct {
sync.Mutex
disposed bool
listener net.Listener
connections map[*Connection]bool
}
/*
#pragma pack(push, 1)
typedef struct {
BYTE bKf; // 關鍵幀字
DWORD dwLength; // 載荷長度
} PACKET_HEADER;
#pragma pack(pop)
static constexpr int PACKET_HEADER_SIZE = sizeof(PACKET_HEADER); // 4 + 1 = 5 BYTE
*/
const (
_CONNECTION_PACKET_HEADER_KF = 0x2A // 置關鍵幀字
_CONNECTION_PACKET_HEADER_SIZE = 5
CONNECTION_MIN_PORT = 0
CONNECTION_MAX_PORT = math.MaxUint16
)
var ErrConnectionClosed = errors.New("connection has been closed")
var ErrConnectionArgP = errors.New("the parameter p cannot be incorrectly null or array length 0")
var ErrConnectionProtocolKf = errors.New("network protocol error, kf check error")
var ErrConnectionProtocolLength = errors.New("network protocol error, length check error")
var ErrConnectionArgAcceptor = errors.New("the acceptor parameter cannot be null")
var ErrConnectionDisconnect = errors.New("connection has been disconnect")
// 功能名:發(fā)送數(shù)據(jù)
// 返回值:
// < 0 發(fā)送錯誤(ERR)
// == 0 鏈接斷開(FIN)
// > 0 已發(fā)送字節(jié)數(shù)
func (my *Connection) Send(buffer []byte, offset int, length int) int {
// 對于欲發(fā)送數(shù)據(jù)的參數(shù)檢查
if buffer == nil || offset < 0 || length < 1 {
return -1
}
// 檢查是否溢出BUFF緩存大小
len := len(buffer)
if offset+length > len {
return -1
}
// 檢查鏈接是否存在
connection := my.connection
if connection == nil {
return -1
}
// 預備環(huán)境及變量
bytes_transferred := 0
sync := &my.lock_sent
header := my.header_send
payload := buffer[offset : offset+length]
// 如果可以直接獲取到信號,否則其它協(xié)同程序就等待發(fā)送結束,不要用管道這些莫名其妙的東西。
sync.Lock()
defer sync.Unlock()
// 檢查當前鏈接是否已經(jīng)釋放
if my.disposed {
return -1
}
// 先發(fā)送協(xié)議幀頭
header[0] = _CONNECTION_PACKET_HEADER_KF
binary.BigEndian.PutUint32(header[1:], uint32(length))
written_size, err := connection.Write(header)
if err != nil {
return -1
} else {
bytes_transferred += written_size
}
// 在發(fā)送協(xié)議載荷
written_size, err = connection.Write(payload)
if err != nil {
return -1
}
// 加上已傳送的字節(jié)數(shù)
bytes_transferred += written_size
return bytes_transferred
}
// 功能名:收取數(shù)據(jù)
// 上個 Reader 未完成之前一直阻塞當前協(xié)程直到對方結束后返回
func (my *Connection) Receive() io.Reader {
// 檢查當前鏈接是否已經(jīng)釋放
if my.disposed {
return nil
}
// 檢查鏈接是否存在
connection := my.connection
if connection == nil {
return nil
}
// 返回幀讀入器
reader := my.reader
reader.lock_recv.Lock()
return reader
}
// 功能名:實例化一個鏈接對象
func NewConnection(conn net.Conn, listener *Listener) *Connection {
var connection *Connection
if conn != nil {
connection = &Connection{
disposed: false,
connection: conn,
listener: listener,
header_send: make([]byte, _CONNECTION_PACKET_HEADER_SIZE),
}
connection.reader = &_ConnectionReader{
owner: connection,
length: 0,
offset: 0,
checksum: 0,
header_recv: make([]byte, _CONNECTION_PACKET_HEADER_SIZE),
}
}
return connection
}
// 功能名:鏈接主機
func Connect(host string, port int) *Connection {
// 檢查端口參數(shù)的有效性
if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT {
return nil
}
// 服務器主機地址不可為空
if len(host) < 1 {
return nil
}
// 服務器地址并且嘗試鏈接
address := host + ":" + strconv.Itoa(port)
conn, err := net.Dial("tcp", address)
if err != nil {
return nil
}
// 返回TCP鏈接的封裝對象
return NewConnection(conn, nil)
}
// 功能名:關閉鏈接(網(wǎng)絡)
func (my *Connection) close(connection net.Conn) error {
// 強制關閉鏈接,但可能會失敗
if my.disposed {
return nil
}
my.disposed = true
return connection.Close()
}
// 功能名:關閉鏈接
func (my *Connection) Close(await bool) (err error) {
// 檢查鏈接是否存在
connection := my.connection
if connection == nil {
return
}
// 如果可以直接獲取到信號,否則其它協(xié)同程序就等待發(fā)送結束,不要用管道這些莫名其妙的東西。
sync := &my.lock_sent
if await {
sync.Lock()
sync.Unlock()
// 檢查當前鏈接是否已經(jīng)釋放
err = my.close(connection)
} else {
err = my.close(connection)
}
// 如果是服務器接受的鏈接對象,就從服務器列表之中刪除這個鏈接實例。
listener := my.listener
if listener != nil {
listener.Lock()
delete(listener.connections, my)
listener.Unlock()
}
return
}
func (my *Connection) connection_get_ip_end_point(remote bool) string {
connection := my.connection
if connection == nil {
return ""
}
var address net.Addr
if remote {
address = connection.RemoteAddr()
} else {
address = connection.LocalAddr()
}
if address == nil {
return ""
}
return address.String()
}
// 功能名:獲取遠程地址
func (my *Connection) GetRemoteEndPoint() string {
return my.connection_get_ip_end_point(true)
}
// 功能名:獲取本地地址
func (my *Connection) GetLocalEndPoint() string {
return my.connection_get_ip_end_point(false)
}
// 功能名:讀入幀數(shù)據(jù)
func (my *_ConnectionReader) Read(p []byte) (n int, err error) {
// 檢查當前鏈接是否已經(jīng)釋放
owner := my.owner
if owner.disposed {
return 0, ErrConnectionClosed
}
// 檢查參數(shù)P不可以為NUL或數(shù)組長度為0
length := len(p)
if length < 1 {
return 0, ErrConnectionArgP
}
// 幀已經(jīng)被全部收取完成
if my.length < 0 {
my.length = 0
my.lock_recv.Unlock()
return 0, io.EOF
}
// 收取協(xié)議報文的頭部
if my.length == 0 {
header := my.header_recv
n, err := io.ReadFull(owner.connection, header)
if err != nil {
return n, err
}
// 判斷協(xié)議關鍵幀字
kf := header[0]
if kf != _CONNECTION_PACKET_HEADER_KF {
return 0, ErrConnectionProtocolKf
}
// 檢查載荷的總長度
my.length = int(binary.BigEndian.Uint32(header[1:]))
my.offset = 0
my.checksum = 0
if my.length < 1 {
return 0, ErrConnectionProtocolLength
}
}
// 循環(huán)收取數(shù)據(jù)到緩存區(qū)P之中
remain := my.length - my.offset
if length <= remain {
n, err = owner.connection.Read(p)
} else {
n, err = owner.connection.Read(p[:remain])
}
// 從鏈接之中讀入數(shù)據(jù)出現(xiàn)錯誤
if err != nil {
return n, err
}
// 是否收取到FIN字節(jié)(0)
if n < 1 {
return n, ErrConnectionDisconnect
}
// 計算當前幀是否已經(jīng)收取完畢
my.offset += n
if my.offset < my.length {
return n, nil
} else {
my.offset = 0
my.length = -1
my.checksum = 0
return n, nil
}
}
// 功能名:實例化一個偵聽器
func NewListener(host string, port int) *Listener {
// 檢查端口參數(shù)的有效性
if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT {
return nil
}
// 服務器主機地址不可為空
if len(host) < 1 {
return nil
}
// 服務器地址并且嘗試綁定
address := host + ":" + strconv.Itoa(port)
listener, err := net.Listen("tcp", address)
if err != nil {
return nil
}
return &Listener{
disposed: false,
listener: listener,
connections: make(map[*Connection]bool),
}
}
// 功能名:偵聽服務器
func (my *Listener) ListenAndServe(acceptor func(*Connection)) error {
// 接收器參數(shù)不可以為空
if acceptor == nil {
return ErrConnectionArgAcceptor
}
// 網(wǎng)絡偵聽器已經(jīng)關閉
if my.disposed {
return ErrConnectionClosed
}
any := false
listener := my.listener
for {
// 網(wǎng)絡如果已經(jīng)被關閉了
if my.disposed {
return nil
}
// 嘗試接收一個網(wǎng)絡鏈接
conn, err := listener.Accept()
if err != nil {
if any {
return nil
} else {
return err
}
}
// 如果沒有獲取到鏈接的引用則迭代到下個鏈接接受
if conn == nil {
continue
}
// 構建一個封裝的網(wǎng)絡鏈接對象
connection := NewConnection(conn, my)
my.Lock()
my.connections[connection] = true
my.Unlock()
// 啟動對于鏈接處理的協(xié)同程序
go acceptor(connection)
}
}
// 功能名:關閉全部鏈接
func (my *Listener) Close() {
// 強制關閉服務器的偵聽器
listener := my.listener
if listener != nil {
listener.Close()
}
// 釋放全部持有的托管資源
my.Lock()
my.disposed = true
connections := my.connections
my.connections = make(map[*Connection]bool)
my.Unlock()
// 強制關閉全部的網(wǎng)絡鏈接
for connection := range connections {
connection.Close(false)
}
}
func test() {
rand.Seed(time.Now().UnixNano())
// 鏈接服務器
packet := 0
connection := Connect("127.0.0.1", 11111)
for i, c := 0, rand.Intn(100)+1; i < c; i++ {
length := rand.Intn(128) + 1
buffer := make([]byte, length)
for j := 0; j < length; j++ {
buffer[j] = byte(rand.Intn(26)) + 97
}
// 發(fā)送數(shù)據(jù)
transferred := connection.Send(buffer, 0, length)
if transferred < 1 {
break
} else {
// 接受數(shù)據(jù)
r := connection.Receive()
if r == nil {
break
}
// 讀取全部數(shù)據(jù)(一幀)
buf, err := io.ReadAll(r)
if err != nil {
break
} else if len(buf) < 1 {
break
}
// 打印收到的幀數(shù)據(jù)
packet++
fmt.Printf("[%s]: client packet=%d length=%d string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), string(buf))
}
}
// 關閉鏈接
connection.Close(true)
// 客戶端關閉網(wǎng)絡鏈接
fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "client connection closed")
}
func main() {
// 運行客戶端測試協(xié)程
go test()
// 打開服務器偵聽器喲
listener := NewListener("127.0.0.1", 11111)
listener.ListenAndServe(func(c *Connection) {
packet := 0
remoteEP := c.GetRemoteEndPoint()
for {
// 獲取網(wǎng)絡接收器
r := c.Receive()
if r == nil {
break
}
// 讀取全部數(shù)據(jù)(一幀)
buf, err := io.ReadAll(r)
if err != nil {
break
} else if len(buf) < 1 {
break
}
// 打印收到的幀數(shù)據(jù)
packet++
fmt.Printf("[%s]: server packet=%d length=%d remote=%s string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), remoteEP, string(buf))
// 回顯客戶端的數(shù)據(jù)
transferred := c.Send(buf, 0, len(buf))
if transferred < 1 {
break
}
}
// 關閉客戶端鏈接
c.Close(true)
// 服務器關閉網(wǎng)絡鏈接
fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "server connection closed")
})
}
到了這里,關于Golang TCP/IP服務器/客戶端應用程序,設計一個簡單可靠幀傳送通信協(xié)議。(并且正確處理基于流式控制協(xié)議,帶來的應用層沾幀[沾包]問題)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!