国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RPC教程 2.支持并發(fā)與異步的客戶端

這篇具有很好參考價(jià)值的文章主要介紹了RPC教程 2.支持并發(fā)與異步的客戶端。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1.客戶端的使用例子

func main(){
	//1. 建立連接
	client, err := rpc.Dial("tcp", "localhost:1234")

	//2.調(diào)用調(diào)用指定的RPC方法
    var reply string //string有默認(rèn)值
	err = client.Call("HelloService.Hello", "hi", &reply)    //即是一次請(qǐng)求
}

對(duì)?net/rpc?而言,一個(gè)函數(shù)需要能夠被遠(yuǎn)程調(diào)用,它必須滿足一定的條件,否則其會(huì)被忽略。

這些條件是:

  • 方法的類(lèi)型是可輸出的 (the method’s type is exported)
  • 方法本身也是可輸出的 (the method is exported)
  • 方法必須由兩個(gè)參數(shù),必須是輸出類(lèi)型或者是內(nèi)建類(lèi)型 (the method has two arguments, both exported or builtin types)
  • 方法的第二個(gè)參數(shù)必須是指針類(lèi)型 (the method’s second argument is a pointer)
  • 方法返回類(lèi)型為 error (the method has return type error)

一個(gè)輸出方法的格式如下:?

func (t *T) MethodName(argType T1, replyType *T2) error

這個(gè)方法的第一個(gè)參數(shù)代表調(diào)用者(client)提供的參數(shù),第二個(gè)參數(shù)代表要返回給調(diào)用者的計(jì)算結(jié)果。

2.定義一個(gè)請(qǐng)求

封裝結(jié)構(gòu)體 Call 來(lái)承載一次 RPC 調(diào)用所需要的信息。

type Call struct {
	ServiceMethod string      // The name of the service and method to call.
	Args          interface{} // The argument to the function (*struct).
	Reply         interface{} // The reply from the function (*struct).
	Error         error       // After completion, the error status.
	Done          chan *Call  // Receives *Call when Go is complete.
	Seq           uint64
}

func (call *Call) done() {
	call.Done <- call
}

請(qǐng)求內(nèi)容至少需要包括:

  • 請(qǐng)求的服務(wù)以及方法名
  • 請(qǐng)求參數(shù)和請(qǐng)求的回復(fù)
  • 請(qǐng)求出錯(cuò)時(shí)返回的錯(cuò)誤信息

為了支持異步調(diào)用,Call 結(jié)構(gòu)體中添加了一個(gè)字段 Done,Done 的類(lèi)型是?chan *Call,當(dāng)調(diào)用結(jié)束時(shí),會(huì)調(diào)用?call.done()?通知調(diào)用方。?

3.實(shí)現(xiàn) Client

type Client struct {
	code     codec.Codec
	opt      *Option
	sending  sync.Mutex
	header   codec.Header
	mutex    sync.Mutex    //保護(hù)下面的變量
	seq      uint64
	pending  map[uint64]*Call
	closing  bool //user has called Close
	shutdown bool // server has told us to stop
}

var ErrShutdown = errors.New("connection is shut down")

func (client *Client) Close() error {
	client.mutex.Lock()
	defer client.mutex.Unlock()
	if client.closing {
		return ErrShutdown
	}
	client.closing = true
	return client.code.Close()
}

func (client *Client) IsAvailable() bool {
	client.mutex.Lock()
	defer client.mutex.Unlock()
	return !client.closing && !client.shutdown
}
  • code是消息的編解碼器,和服務(wù)端類(lèi)似的。
  • sending是互斥鎖,和服務(wù)端類(lèi)似,為了保證請(qǐng)求的有序發(fā)送,即防止出現(xiàn)多個(gè)請(qǐng)求報(bào)文混淆。
  • header 是每個(gè)請(qǐng)求的消息頭,header 只有在請(qǐng)求發(fā)送時(shí)才需要,而請(qǐng)求發(fā)送是互斥的,因此每個(gè)客戶端只需要一個(gè),聲明在 Client 結(jié)構(gòu)體中可以復(fù)用。
  • seq是用于給請(qǐng)求進(jìn)行編號(hào),從1開(kāi)始編號(hào)自增,每個(gè)請(qǐng)求有唯一的編號(hào)。
  • pending是存儲(chǔ)未完成的請(qǐng)求,鍵是編號(hào),值是 Call 實(shí)例。
  • closing 和 shutdown 任意一個(gè)值置為 true,則表示 Client 處于不可用的狀態(tài),但有些許的差別,closing 是用戶主動(dòng)關(guān)閉的,即調(diào)用?Close?方法,而 shutdown 置為 true 一般是有錯(cuò)誤發(fā)生。

?需要存儲(chǔ)未完成的請(qǐng)求,可以想象,一個(gè)用戶發(fā)出10個(gè)不同的請(qǐng)求,要是客戶端不存儲(chǔ)這些請(qǐng)求,那收到回復(fù)的時(shí)候,就難知道如何處理了。

所以在發(fā)起請(qǐng)求的時(shí)候,需要注冊(cè)這個(gè)請(qǐng)求(往pending中添加),得到回復(fù)后需要?jiǎng)h除(從pending中delete)

由此,需要實(shí)現(xiàn)和Call相關(guān)的注冊(cè)和刪除方法。

而terminateCalls方法是服務(wù)端或客戶端發(fā)生錯(cuò)誤時(shí)調(diào)用,將 shutdown 設(shè)置為 true,且將錯(cuò)誤信息通知所有 pending 狀態(tài)的 call。該方法需要都獲得sending鎖和mutex鎖。該方法的使用地方后面會(huì)講到的。

func (client *Client) RegisterCall(call *Call) (uint64, error) {
	client.mutex.Lock()
	defer client.mutex.Unlock()

	if client.closing || client.shutdown {
		return 0, ErrShutdown
	}

	call.Seq = client.seq //設(shè)置Call的序號(hào)
	client.pending[call.Seq] = call
	client.seq++
	return call.Seq, nil
}

func (client *Client) removeCall(seq uint64) *Call {
	client.mutex.Lock()
	defer client.mutex.Unlock()
	call := client.pending[seq]
	delete(client.pending, seq)
	return call
}

func (client *Client) terminateCalls(err error) {
	client.sending.Lock()
	defer client.sending.Unlock()
	client.mutex.Lock()
	defer client.mutex.Unlock()

	client.shutdown = true
	for _, call := range client.pending {
		call.Error = err
		call.done()
	}
}

創(chuàng)建客戶端

按照前面的例子,創(chuàng)建客戶端就

client, err := rpc.Dial("tcp", "localhost:1234")

那我們也按照這樣來(lái)。

Dail函數(shù)通過(guò)?...*Option?將 Option 實(shí)現(xiàn)為可選參數(shù)(...表示可以0個(gè)參數(shù)或多個(gè)參數(shù)),可以不填寫(xiě)opts參數(shù),使用默認(rèn)的option(即是gob編解碼)

//使用例子 client, err := rpc.Dial("tcp", "localhost:1234")
func Dail(network, address string, opts ...*Option) (client *Client, err error) {
	opt, err := parseOptions(opts...)
	if err != nil {
		return nil, err
	}
	conn, err := net.Dial(network, address)
	if err != nil {
		return nil, err
	}
	return NewClient(conn, opt)
}

parseOption函數(shù)就是解析Option,判斷其Option是否符合要求等。

NewClient函數(shù),創(chuàng)建 Client 實(shí)例,首先需要完成一開(kāi)始的協(xié)議交換,即發(fā)送?Option?信息給服務(wù)端,協(xié)商好消息的編解碼方式。

func NewClient(conn net.Conn, opt *Option) (*Client, error) {
	// send options with server
	if err := json.NewEncoder(conn).Encode(opt); err != nil {
		log.Println("rpc client: options error: ", err)
		conn.Close()
		return nil, err
	}
	f := codec.NewCodeFuncMap[opt.CodecType]
	if f == nil { //沒(méi)有符合條件的編解碼器
		err := fmt.Errorf("invalid codec type %s", opt.CodecType)
		log.Println("rpc client: codec error:", err)
		return nil, err
	}

	return &Client{
		seq:     1,    //序號(hào)從1開(kāi)始,序號(hào)0表示可以表示錯(cuò)誤
		code:    f(conn),
		opt:     opt,
		pending: make(map[uint64]*Call),
	}, nil
}

func parseOptions(opts ...*Option) (*Option, error) {
	if len(opts) == 0 || opts[0] == nil {
		return DefaultOption, nil
	}
	if len(opts) != 1 {
		return nil, errors.New("number of options is more than 1")
	}
	opt := opts[0]
	opt.MagicNumber = DefaultOption.MagicNumber

	if opt.CodecType == "" {
		opt.CodecType = DefaultOption.CodecType
	}
	if _, ok := codec.NewCodeFuncMap[opt.CodecType]; !ok {
		return nil, fmt.Errorf("invalid codec type %s", opt.CodecType)
	}
	return opt, nil
}

請(qǐng)求和創(chuàng)建客戶端完成后,那就是到關(guān)鍵的接收和發(fā)送請(qǐng)求了。

實(shí)現(xiàn)接收回復(fù)和發(fā)送請(qǐng)求

那先來(lái)看看發(fā)送請(qǐng)求。

    var reply string //string有默認(rèn)值
	err = client.Call("HelloService.Hello", "hi", &reply) 

?先實(shí)現(xiàn)個(gè)send方法,其參數(shù)是*Call。內(nèi)容是注冊(cè)該Call,進(jìn)行編碼并發(fā)送給服務(wù)端。

func (client *Client) send(call *Call) {
    // make sure that the client will send a complete request
	client.sending.Lock()
	defer client.sending.Unlock()

	//注冊(cè),添加到pending中
	seq, err := client.RegisterCall(call)
	if err != nil {
		call.Error = err
		call.done()
		return
	}
    
    //復(fù)用同一個(gè)header
	client.header.ServiceMethod = call.ServiceMethod
	client.header.Seq = seq
	client.header.Error = ""

	// encode and send the request
	if err := client.code.WriteResponse(&client.header, call.Args); err != nil {
		call := client.removeCall(seq)
		if call != nil {
			call.Error = err
			call.done()
		}
	}
}

代碼中經(jīng)常出現(xiàn)call.done(),done方法是為了支持異步調(diào)用的,當(dāng)調(diào)用結(jié)束時(shí),會(huì)調(diào)用?call.done()?通知調(diào)用方。?那就會(huì)有個(gè)異步調(diào)用的Go方法。

異步調(diào)用的Go方法中,會(huì)先判斷chan是否符合條件,之后根據(jù)函數(shù)參數(shù)來(lái)創(chuàng)建Call,之后調(diào)用send方法。

func (client *Client) Go(serviceMethod string, args, reply any, done chan *Call) *Call {
	if done == nil {
		done = make(chan *Call, 10) //10或1或其他的也可以的,大于0即可
	} else if cap(done) == 0 {
		log.Panic("rpc client: done channel is unbuffered")
	}

	call := &Call{
		ServiceMethod: serviceMethod,
		Args:          args,
		Reply:         reply,
		Done:          done,
	}
	client.send(call)
	return call
}

func (client *Client) Call(serviceMethod string, args, reply any) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}

而Call方法中,其是對(duì)?Go?的封裝,阻塞 call.Done,等待響應(yīng)返回,是一個(gè)同步接口。

發(fā)送解決后,如何進(jìn)行接收信息呢?

調(diào)用Call方法,這是個(gè)同步接口,會(huì)一直阻塞在call := <-client.Go(...).Done這里,之后當(dāng)使用call.done()時(shí)候,才會(huì)解除阻塞。但是按照目前的正常情況,是不會(huì)調(diào)用call.done()的。這時(shí)我們可以新啟一個(gè)協(xié)程去接收信息,處理完信息后就調(diào)用call.done()即可。

接收功能,接收到的響應(yīng)有三種情況:

  • call 不存在,可能是請(qǐng)求沒(méi)有發(fā)送完整,或者因?yàn)槠渌虮蝗∠?,但是服?wù)端仍舊處理了。
  • call 存在,但服務(wù)端處理出錯(cuò),即 h.Error 不為空。
  • call 存在,服務(wù)端處理正常,那么需要從 body 中讀取 Reply 的值。
func (client *Client) receive() {
	var err error
	for err == nil {
		var h codec.Header
		if err = client.code.ReadHeader(&h); err != nil {
			break
		}

		call := client.removeCall(h.Seq)
		switch {
		case call == nil:
			err = client.code.ReadBody(nil)
		case h.Error != "":
			call.Error = fmt.Errorf(h.Error)
			err = client.code.ReadBody(nil)
			call.done()
		default:
			err = client.code.ReadBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()
		}
	}
	client.terminateCalls(err)
}

在recieve中就使用了terminateCalls方法。在讀取Header失敗break,就執(zhí)行該方法。

那么這個(gè)新的協(xié)程在哪里開(kāi)啟好呢?那可以在創(chuàng)建客戶端的時(shí)候就開(kāi)啟這個(gè)協(xié)程。

func NewClient(conn net.Conn, opt *Option) (*Client, error) {
    //......
	f := codec.NewCodeFuncMap[opt.CodecType]

    //前面代碼沒(méi)有變化,就下面封裝成一個(gè)函數(shù),其內(nèi)部就使用go client.receive()
	return newClientCodec(f(conn), opt), nil
}

func newClientCodec(code codec.Codec, opt *Option) *Client {
	client := &Client{
		seq:     1,
		code:    code,
		opt:     opt,
		pending: make(map[uint64]*Call),
	}
	go client.receive()
	return client
}

這樣,接收和發(fā)送也都處理好了。至此,一個(gè)支持異步和并發(fā)的 GeeRPC 客戶端已經(jīng)完成。

4.測(cè)試

上一章節(jié)只實(shí)現(xiàn)了服務(wù)端,我們?cè)?main 函數(shù)中手動(dòng)模擬了整個(gè)通信過(guò)程。因此,這一章節(jié)我們就將 main 函數(shù)中通信部分替換為今天的客戶端。

startServer 沒(méi)有發(fā)生變化。

func main() {
	addr := make(chan string)
	go startServer(addr)

	// in fact, following code is like a simple geerpc client
	client, _ := geerpc.Dail("tcp", <-addr) //上一節(jié)是使用net.Dail
	defer client.Close()
	time.Sleep(time.Second * 1)
	num := 3
	var wg sync.WaitGroup
	wg.Add(num)

	for i := 0; i < num; i++ {
		go func(i int) {
			defer wg.Done()
			args := uint64(i)
			var reply string
			if err := client.Call("foo.sum", args, &reply); err != nil {
				log.Fatal("call Foo.Sum error:", err)
			}
			log.Println("reply: ", reply)
		}(i)
	}
	wg.Wait()
}

func startServer(addr chan string) {
	l, err := net.Listen("tcp", "localhost:10000")
	if err != nil {
		log.Fatal("network error:", err)
	}
	log.Println("start rpc server on", l.Addr())
	addr <- l.Addr().String()
	geerpc.Accept(l)
}

完整代碼:https://github.com/liwook/Go-projects/tree/main/geerpc/2-client文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-821418.html

到了這里,關(guān)于RPC教程 2.支持并發(fā)與異步的客戶端的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • elasaticsearch新版java客戶端ElasticsearchClient詳細(xì)教程,支持響應(yīng)式編程,Lambda表達(dá)式,兼容舊版High Level Rest Client

    elasaticsearch新版java客戶端詳細(xì)教程,支持響應(yīng)式編程,Lambda表達(dá)式。兼容舊版High Level Rest Client。網(wǎng)上相關(guān)教程少,我在這里出一個(gè)。elasaticsearch相關(guān)安裝這里不介紹了 有幾種方式,這里介紹兩種,如果不考慮之前舊版High Level Rest Client的客戶端采用第一種就行 阻塞和異步客戶

    2023年04月15日
    瀏覽(19)
  • php-golang-rpc jsonrpc和php客戶端tivoka/tivoka包實(shí)踐

    php-golang-rpc jsonrpc和php客戶端tivoka/tivoka包實(shí)踐

    golang 代碼: package main import ( ? ? \\\"fmt\\\" ? ? \\\"net\\\" ? ? \\\"net/rpc\\\" ? ? \\\"net/rpc/jsonrpc\\\" ) type App struct{} type Res struct { ? ? Code int ? ?`json:\\\"code\\\"` ? ? Msg ?string `json:\\\"msg\\\"` ? ? Data any ? ?`json:\\\"data\\\"` } func (*App) Hi(mp map[string]any, res *Res) error { ? ? res.Code = 200 ? ? res.Msg = \\\"成功\\\" ? ? var rmp = mak

    2024年02月15日
    瀏覽(24)
  • C/S客戶端核服務(wù)端-并發(fā)服務(wù)器

    1、新建兩個(gè)程序,分別引用兩個(gè)函數(shù),先執(zhí)行server端的程序,再執(zhí)行client端的程序 2、實(shí)現(xiàn)功能:當(dāng)client和sever連接成功后,從client輸入什么都會(huì)傳輸給server端,當(dāng)輸入第一個(gè)字母為q時(shí) 兩端程序都會(huì)退出 3、特別注意:需要修改SERVER_HOST 為自己主機(jī)地址 4、本程序編寫(xiě)的環(huán)境

    2024年02月09日
    瀏覽(23)
  • Python中websockets服務(wù)端從客戶端接收消息并發(fā)送給多線程

    目錄 一、消息隊(duì)列 二、服務(wù)端 三、設(shè)備功能 四、主線程 五、客戶端 六、更新 思路: 1.websockets需要從客戶端接收消息,由于websockets創(chuàng)建服務(wù)端只能綁定一個(gè)端口,所以需要單獨(dú)占用一個(gè)線程。收到的消息,我們需要共享給主線程,然后主線程根據(jù)設(shè)備(多線程)分發(fā)消息

    2024年04月25日
    瀏覽(32)
  • C# Socket通信從入門(mén)到精通(14)——多個(gè)異步UDP客戶端C#代碼實(shí)現(xiàn)

    C# Socket通信從入門(mén)到精通(14)——多個(gè)異步UDP客戶端C#代碼實(shí)現(xiàn)

    在之前的文章C# Socket通信從入門(mén)到精通(13)——單個(gè)異步UDP客戶端C#代碼實(shí)現(xiàn)我介紹了單個(gè)異步Udp客戶端的c#代碼實(shí)現(xiàn),但是有的時(shí)候,我們需要連接多個(gè)服務(wù)器,并且對(duì)于每個(gè)服務(wù)器,我們都有一些比如異步發(fā)送、異步接收的操作,那么這時(shí)候我們使用之前單個(gè)異步Udp客戶

    2024年02月03日
    瀏覽(232)
  • 使用Go語(yǔ)言的HTTP客戶端進(jìn)行負(fù)載均衡

    使用Go語(yǔ)言的HTTP客戶端進(jìn)行負(fù)載均衡

    負(fù)載均衡是分布式系統(tǒng)中的重要概念,它用于將流量分散到多個(gè)服務(wù)器或服務(wù)上,以實(shí)現(xiàn)更好的性能、可靠性和可擴(kuò)展性。在Go語(yǔ)言中,可以使用HTTP客戶端進(jìn)行負(fù)載均衡,確保請(qǐng)求被均勻地分配到多個(gè)服務(wù)器或服務(wù)上。 下面是一個(gè)使用Go語(yǔ)言HTTP客戶端進(jìn)行負(fù)載均衡的示例:

    2024年01月21日
    瀏覽(27)
  • client-go源碼結(jié)構(gòu)及客戶端對(duì)象

    client-go源碼結(jié)構(gòu)及客戶端對(duì)象

    G? Goup 資源組,包含一組資源操作的集合 V Version 資源版本,用于區(qū)分不同API的穩(wěn)定程度及兼容性 R Resource 資源信息,用于區(qū)分不同的資源API K Kind 資源對(duì)象類(lèi)型,每個(gè)資源對(duì)象都需要Kind來(lái)區(qū)分它自身代表的資源類(lèi)型 (1)通過(guò) GVR 可以構(gòu)造 REST Api ?進(jìn)行接口調(diào)用,而 GVK 可以

    2024年04月26日
    瀏覽(65)
  • SocketTools 11在所有HTTP客戶端組件支持

    SocketTools 11在所有HTTP客戶端組件支持

    在所有HTTP客戶端組件中添加了對(duì)HTTP/2.0協(xié)議的支持。 更新了TLS 1.2(及更高版本)和SSH 2.0的安全選項(xiàng),以使用Microsoft Windows 11和Windows Server 2022中提供的密碼套件。較舊、安全性較低的密碼套件已被棄用,在建立連接時(shí)將不會(huì)使用。回退選項(xiàng)可用于使用TLS 1.0連接到舊版服務(wù)器。

    2024年02月05日
    瀏覽(26)
  • MYSQL連接報(bào)錯(cuò):客戶端不支持服務(wù)器請(qǐng)求的身份驗(yàn)證協(xié)議;考慮升級(jí)MYSQL客戶端數(shù)據(jù)庫(kù)

    在進(jìn)行MYSQL數(shù)據(jù)庫(kù)連接時(shí),有時(shí)候可能會(huì)遇到如上所述的錯(cuò)誤:“客戶端不支持服務(wù)器請(qǐng)求的身份驗(yàn)證協(xié)議;考慮升級(jí)MYSQL客戶端數(shù)據(jù)庫(kù)”。這個(gè)錯(cuò)誤通常發(fā)生在客戶端使用的MYSQL版本與服務(wù)器所要求的身份驗(yàn)證協(xié)議不兼容的情況下。幸運(yùn)的是,您可以通過(guò)升級(jí)MYSQL客戶端來(lái)解

    2024年02月03日
    瀏覽(24)
  • 【六、http】go的http的客戶端重定向

    【六、http】go的http的客戶端重定向

    重定向過(guò)程 :客戶瀏覽器發(fā)送http請(qǐng)求----》web服務(wù)器接受后發(fā)送302狀態(tài)碼響應(yīng)及對(duì)應(yīng)新的location給客戶瀏覽器–》客戶瀏覽器發(fā)現(xiàn)是302響應(yīng),則自動(dòng)再發(fā)送一個(gè)新的http請(qǐng)求,請(qǐng)求url是新的location地址----》服務(wù)器根據(jù)此請(qǐng)求尋找資源并發(fā)送給客戶。在這里location可以重定向到任

    2024年02月05日
    瀏覽(21)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包