1.客戶端的使用例子
func main(){
//1. 建立連接
client, err := rpc.Dial("tcp", "localhost:1234")
//2.調(diào)用調(diào)用指定的RPC方法
var reply string //string有默認(rèn)值
err = client.Call("HelloService.Hello", "hi", &reply) //即是一次請(qǐng)求
}
對(duì)?net/rpc
?而言,一個(gè)函數(shù)需要能夠被遠(yuǎn)程調(diào)用,它必須滿足一定的條件,否則其會(huì)被忽略。
這些條件是:
- 方法的類(lèi)型是可輸出的 (the method’s type is exported)
- 方法本身也是可輸出的 (the method is exported)
- 方法必須由兩個(gè)參數(shù),必須是輸出類(lèi)型或者是內(nèi)建類(lèi)型 (the method has two arguments, both exported or builtin types)
- 方法的第二個(gè)參數(shù)必須是指針類(lèi)型 (the method’s second argument is a pointer)
- 方法返回類(lèi)型為 error (the method has return type error)
一個(gè)輸出方法的格式如下:?
func (t *T) MethodName(argType T1, replyType *T2) error
這個(gè)方法的第一個(gè)參數(shù)代表調(diào)用者(client)提供的參數(shù),第二個(gè)參數(shù)代表要返回給調(diào)用者的計(jì)算結(jié)果。
2.定義一個(gè)請(qǐng)求
封裝結(jié)構(gòu)體 Call 來(lái)承載一次 RPC 調(diào)用所需要的信息。
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Receives *Call when Go is complete.
Seq uint64
}
func (call *Call) done() {
call.Done <- call
}
請(qǐng)求內(nèi)容至少需要包括:
- 請(qǐng)求的服務(wù)以及方法名
- 請(qǐng)求參數(shù)和請(qǐng)求的回復(fù)
- 請(qǐng)求出錯(cuò)時(shí)返回的錯(cuò)誤信息
為了支持異步調(diào)用,Call 結(jié)構(gòu)體中添加了一個(gè)字段 Done,Done 的類(lèi)型是?chan *Call
,當(dāng)調(diào)用結(jié)束時(shí),會(huì)調(diào)用?call.done()
?通知調(diào)用方。?
3.實(shí)現(xiàn) Client
type Client struct {
code codec.Codec
opt *Option
sending sync.Mutex
header codec.Header
mutex sync.Mutex //保護(hù)下面的變量
seq uint64
pending map[uint64]*Call
closing bool //user has called Close
shutdown bool // server has told us to stop
}
var ErrShutdown = errors.New("connection is shut down")
func (client *Client) Close() error {
client.mutex.Lock()
defer client.mutex.Unlock()
if client.closing {
return ErrShutdown
}
client.closing = true
return client.code.Close()
}
func (client *Client) IsAvailable() bool {
client.mutex.Lock()
defer client.mutex.Unlock()
return !client.closing && !client.shutdown
}
- code是消息的編解碼器,和服務(wù)端類(lèi)似的。
- sending是互斥鎖,和服務(wù)端類(lèi)似,為了保證請(qǐng)求的有序發(fā)送,即防止出現(xiàn)多個(gè)請(qǐng)求報(bào)文混淆。
- header 是每個(gè)請(qǐng)求的消息頭,header 只有在請(qǐng)求發(fā)送時(shí)才需要,而請(qǐng)求發(fā)送是互斥的,因此每個(gè)客戶端只需要一個(gè),聲明在 Client 結(jié)構(gòu)體中可以復(fù)用。
- seq是用于給請(qǐng)求進(jìn)行編號(hào),從1開(kāi)始編號(hào)自增,每個(gè)請(qǐng)求有唯一的編號(hào)。
- pending是存儲(chǔ)未完成的請(qǐng)求,鍵是編號(hào),值是 Call 實(shí)例。
- closing 和 shutdown 任意一個(gè)值置為 true,則表示 Client 處于不可用的狀態(tài),但有些許的差別,closing 是用戶主動(dòng)關(guān)閉的,即調(diào)用?
Close
?方法,而 shutdown 置為 true 一般是有錯(cuò)誤發(fā)生。
?需要存儲(chǔ)未完成的請(qǐng)求,可以想象,一個(gè)用戶發(fā)出10個(gè)不同的請(qǐng)求,要是客戶端不存儲(chǔ)這些請(qǐng)求,那收到回復(fù)的時(shí)候,就難知道如何處理了。
所以在發(fā)起請(qǐng)求的時(shí)候,需要注冊(cè)這個(gè)請(qǐng)求(往pending中添加),得到回復(fù)后需要?jiǎng)h除(從pending中delete)。
由此,需要實(shí)現(xiàn)和Call相關(guān)的注冊(cè)和刪除方法。
而terminateCalls方法是服務(wù)端或客戶端發(fā)生錯(cuò)誤時(shí)調(diào)用,將 shutdown 設(shè)置為 true,且將錯(cuò)誤信息通知所有 pending 狀態(tài)的 call。該方法需要都獲得sending鎖和mutex鎖。該方法的使用地方后面會(huì)講到的。
func (client *Client) RegisterCall(call *Call) (uint64, error) {
client.mutex.Lock()
defer client.mutex.Unlock()
if client.closing || client.shutdown {
return 0, ErrShutdown
}
call.Seq = client.seq //設(shè)置Call的序號(hào)
client.pending[call.Seq] = call
client.seq++
return call.Seq, nil
}
func (client *Client) removeCall(seq uint64) *Call {
client.mutex.Lock()
defer client.mutex.Unlock()
call := client.pending[seq]
delete(client.pending, seq)
return call
}
func (client *Client) terminateCalls(err error) {
client.sending.Lock()
defer client.sending.Unlock()
client.mutex.Lock()
defer client.mutex.Unlock()
client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
}
}
創(chuàng)建客戶端
按照前面的例子,創(chuàng)建客戶端就
client, err := rpc.Dial("tcp", "localhost:1234")
那我們也按照這樣來(lái)。
Dail函數(shù)通過(guò)?...*Option
?將 Option 實(shí)現(xiàn)為可選參數(shù)(...表示可以0個(gè)參數(shù)或多個(gè)參數(shù)),可以不填寫(xiě)opts參數(shù),使用默認(rèn)的option(即是gob編解碼)
//使用例子 client, err := rpc.Dial("tcp", "localhost:1234")
func Dail(network, address string, opts ...*Option) (client *Client, err error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn, opt)
}
parseOption函數(shù)就是解析Option,判斷其Option是否符合要求等。
NewClient函數(shù),創(chuàng)建 Client 實(shí)例,首先需要完成一開(kāi)始的協(xié)議交換,即發(fā)送?Option
?信息給服務(wù)端,協(xié)商好消息的編解碼方式。
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
// send options with server
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error: ", err)
conn.Close()
return nil, err
}
f := codec.NewCodeFuncMap[opt.CodecType]
if f == nil { //沒(méi)有符合條件的編解碼器
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
return &Client{
seq: 1, //序號(hào)從1開(kāi)始,序號(hào)0表示可以表示錯(cuò)誤
code: f(conn),
opt: opt,
pending: make(map[uint64]*Call),
}, nil
}
func parseOptions(opts ...*Option) (*Option, error) {
if len(opts) == 0 || opts[0] == nil {
return DefaultOption, nil
}
if len(opts) != 1 {
return nil, errors.New("number of options is more than 1")
}
opt := opts[0]
opt.MagicNumber = DefaultOption.MagicNumber
if opt.CodecType == "" {
opt.CodecType = DefaultOption.CodecType
}
if _, ok := codec.NewCodeFuncMap[opt.CodecType]; !ok {
return nil, fmt.Errorf("invalid codec type %s", opt.CodecType)
}
return opt, nil
}
請(qǐng)求和創(chuàng)建客戶端完成后,那就是到關(guān)鍵的接收和發(fā)送請(qǐng)求了。
實(shí)現(xiàn)接收回復(fù)和發(fā)送請(qǐng)求
那先來(lái)看看發(fā)送請(qǐng)求。
var reply string //string有默認(rèn)值
err = client.Call("HelloService.Hello", "hi", &reply)
?先實(shí)現(xiàn)個(gè)send方法,其參數(shù)是*Call。內(nèi)容是注冊(cè)該Call,進(jìn)行編碼并發(fā)送給服務(wù)端。
func (client *Client) send(call *Call) {
// make sure that the client will send a complete request
client.sending.Lock()
defer client.sending.Unlock()
//注冊(cè),添加到pending中
seq, err := client.RegisterCall(call)
if err != nil {
call.Error = err
call.done()
return
}
//復(fù)用同一個(gè)header
client.header.ServiceMethod = call.ServiceMethod
client.header.Seq = seq
client.header.Error = ""
// encode and send the request
if err := client.code.WriteResponse(&client.header, call.Args); err != nil {
call := client.removeCall(seq)
if call != nil {
call.Error = err
call.done()
}
}
}
代碼中經(jīng)常出現(xiàn)call.done(),done方法是為了支持異步調(diào)用的,當(dāng)調(diào)用結(jié)束時(shí),會(huì)調(diào)用?call.done()
?通知調(diào)用方。?那就會(huì)有個(gè)異步調(diào)用的Go方法。
異步調(diào)用的Go方法中,會(huì)先判斷chan是否符合條件,之后根據(jù)函數(shù)參數(shù)來(lái)創(chuàng)建Call,之后調(diào)用send方法。
func (client *Client) Go(serviceMethod string, args, reply any, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10) //10或1或其他的也可以的,大于0即可
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}
call := &Call{
ServiceMethod: serviceMethod,
Args: args,
Reply: reply,
Done: done,
}
client.send(call)
return call
}
func (client *Client) Call(serviceMethod string, args, reply any) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
而Call方法中,其是對(duì)?Go
?的封裝,阻塞 call.Done,等待響應(yīng)返回,是一個(gè)同步接口。
發(fā)送解決后,如何進(jìn)行接收信息呢?
調(diào)用Call方法,這是個(gè)同步接口,會(huì)一直阻塞在call := <-client.Go(...).Done這里,之后當(dāng)使用call.done()時(shí)候,才會(huì)解除阻塞。但是按照目前的正常情況,是不會(huì)調(diào)用call.done()的。這時(shí)我們可以新啟一個(gè)協(xié)程去接收信息,處理完信息后就調(diào)用call.done()即可。
接收功能,接收到的響應(yīng)有三種情況:
- call 不存在,可能是請(qǐng)求沒(méi)有發(fā)送完整,或者因?yàn)槠渌虮蝗∠?,但是服?wù)端仍舊處理了。
- call 存在,但服務(wù)端處理出錯(cuò),即 h.Error 不為空。
- call 存在,服務(wù)端處理正常,那么需要從 body 中讀取 Reply 的值。
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.code.ReadHeader(&h); err != nil {
break
}
call := client.removeCall(h.Seq)
switch {
case call == nil:
err = client.code.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.code.ReadBody(nil)
call.done()
default:
err = client.code.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
client.terminateCalls(err)
}
在recieve中就使用了terminateCalls方法。在讀取Header失敗break,就執(zhí)行該方法。
那么這個(gè)新的協(xié)程在哪里開(kāi)啟好呢?那可以在創(chuàng)建客戶端的時(shí)候就開(kāi)啟這個(gè)協(xié)程。
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
//......
f := codec.NewCodeFuncMap[opt.CodecType]
//前面代碼沒(méi)有變化,就下面封裝成一個(gè)函數(shù),其內(nèi)部就使用go client.receive()
return newClientCodec(f(conn), opt), nil
}
func newClientCodec(code codec.Codec, opt *Option) *Client {
client := &Client{
seq: 1,
code: code,
opt: opt,
pending: make(map[uint64]*Call),
}
go client.receive()
return client
}
這樣,接收和發(fā)送也都處理好了。至此,一個(gè)支持異步和并發(fā)的 GeeRPC 客戶端已經(jīng)完成。
4.測(cè)試
上一章節(jié)只實(shí)現(xiàn)了服務(wù)端,我們?cè)?main 函數(shù)中手動(dòng)模擬了整個(gè)通信過(guò)程。因此,這一章節(jié)我們就將 main 函數(shù)中通信部分替換為今天的客戶端。
startServer 沒(méi)有發(fā)生變化。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-821418.html
func main() {
addr := make(chan string)
go startServer(addr)
// in fact, following code is like a simple geerpc client
client, _ := geerpc.Dail("tcp", <-addr) //上一節(jié)是使用net.Dail
defer client.Close()
time.Sleep(time.Second * 1)
num := 3
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func(i int) {
defer wg.Done()
args := uint64(i)
var reply string
if err := client.Call("foo.sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Println("reply: ", reply)
}(i)
}
wg.Wait()
}
func startServer(addr chan string) {
l, err := net.Listen("tcp", "localhost:10000")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
addr <- l.Addr().String()
geerpc.Accept(l)
}
完整代碼:https://github.com/liwook/Go-projects/tree/main/geerpc/2-client文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-821418.html
到了這里,關(guān)于RPC教程 2.支持并發(fā)與異步的客戶端的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!