Rpc服務(wù)消費(fèi)者(Rpc服務(wù)調(diào)用者)實(shí)現(xiàn)思路
前面幾節(jié)說到Rpc消費(fèi)者主要通過UserServiceRPc_Stub這個(gè)protobuf幫我們生成的類來實(shí)現(xiàn),上代碼回顧一下
class UserServiceRpc_Stub : public UserServiceRpc {
public:
UserServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);
UserServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel,
::PROTOBUF_NAMESPACE_ID::Service::ChannelOwnership ownership);
~UserServiceRpc_Stub();
inline ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel() { return channel_; }
// implements UserServiceRpc ------------------------------------------
void Login(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::fixbug::LoginRequest* request,
::fixbug::LoginResponse* response,
::google::protobuf::Closure* done);
private:
::PROTOBUF_NAMESPACE_ID::RpcChannel* channel_;
bool owns_channel_;
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(UserServiceRpc_Stub);
//xxx.pb.cc
void UserServiceRpc_Stub::Login(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::fixbug::LoginRequest* request,
::fixbug::LoginResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}
};
UserServiceRpc_Stub可以看做是一個(gè)給用戶提供rpc遠(yuǎn)程調(diào)用的代理類,這里面有rpcclient和rpcserver約定好的遠(yuǎn)程方法Login,Login方法是調(diào)用了一個(gè)channel_的callMethod方法,那么聯(lián)想到其他服務(wù)方法應(yīng)該也是底層調(diào)用了這個(gè)函數(shù)(底層根據(jù)method的索引來區(qū)分具體的method),那么這個(gè)方法具體如何工作的?這個(gè)類需要接受一個(gè)RpcChannel類作為參數(shù),看看RpcChannel類:
class PROTOBUF_EXPORT RpcChannel {
public:
inline RpcChannel() {}
virtual ~RpcChannel();
virtual void CallMethod(const MethodDescriptor* method,
RpcController* controller, const Message* request,
Message* response, Closure* done) = 0;
private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};
很簡單就是一個(gè)抽象類,有個(gè)純虛函數(shù)CallMethod需要rpc_stub來實(shí)現(xiàn)。要實(shí)現(xiàn)rpcService_Stub,顯然先需要具體化一個(gè)MyRpcChannel類,這個(gè)MyRpcChannel類繼承自RpcChannel 類,最后通過多態(tài)調(diào)用在callMethod方法里面完成rpc遠(yuǎn)程調(diào)用的過程。那么具體怎么進(jìn)行?
void UserServiceRpc_Stub::Login(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::fixbug::LoginRequest* request,
::fixbug::LoginResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}
Login方法接收4個(gè)參數(shù),第一個(gè)controller先不用管,顯然第四個(gè)參數(shù)done也不需要管因?yàn)樽鳛閞pc消費(fèi)者不需要在將什么結(jié)果反饋給rpc提供者。那么就只需要傳遞兩個(gè)參數(shù),request即是rpc_stub封裝用戶端請求的參數(shù),response即是在遠(yuǎn)程調(diào)用完成之后rpcserver幫rpcclient填寫的,然后rpc_stub只需要反序列化出來結(jié)果反饋給用戶即可,那么基本實(shí)現(xiàn)思路就有了:
step1:實(shí)現(xiàn)MyRpcchannel類,繼承自RpcChannel ,并重寫其CallMethod方法,CallMethod實(shí)現(xiàn)有如下幾小步:
step1.1:獲取具體服務(wù)名和方法名以及將用戶傳入的request序列化,最終封裝成一個(gè)字符流(header_size + service_name method_name args_size + args_str)
//獲取服務(wù)名和方法名
const google::protobuf::ServiceDescriptor* service = method->service();
std::string service_name = service->name(); //service_name
std::string method_name = method->name(); //method_name
//獲取參數(shù)的序列化字符串長度 args_size
std::string args_str;
uint32_t args_size = 0;
if(request->SerializeToString(&args_str))
{
args_size = args_str.size();
}
else
{
std::cout << "request serlize error" << std::endl;
return;
}
//定義rpc請求的Header
RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);
std::string rpc_header_str;
uint32_t header_size = 0;
if(rpcHeader.SerializeToString(&rpc_header_str))
{
header_size = rpc_header_str.size();
}
else
{
std::cout << "rpcheader_str serlize error" << std::endl;
return;
}
//組織待發(fā)送的rpc請求的字符串
std::string send_rpc_str;
send_rpc_str.insert(0, std::string((char*)&header_size, 4)); // header_size
send_rpc_str += rpc_header_str; //rpcHeader
send_rpc_str += args_str; //args
step1.2:連接遠(yuǎn)程rpcServer服務(wù)器(可以是不同機(jī)器上的不同進(jìn)程,也可以是在不同的機(jī)器上)connect to rpcServer
step1.2:將封裝的rpc字符流通過socket發(fā)送給RpcServer(遠(yuǎn)程rpc調(diào)用請求)send----->send_rpc_str
step1.3:阻塞等待rpcServer的響應(yīng),并將響應(yīng)飯序列化出來給用戶端。recv------>recv_buf
//反序列化rpc調(diào)用的響應(yīng)數(shù)據(jù)
std::string response_str(recv_buf, 0, recv_size);
if(!response->ParseFromString(response_str))
{
std::cout<< "parse error ! response_str:" << response_str << std::endl;
return;
}
step2:用戶創(chuàng)建UserServiceRpc_Stub類對象,并將MyRpcchannel實(shí)例對象傳入進(jìn)行構(gòu)造。最后通過UserServiceRpc_Stub對象實(shí)例調(diào)用對應(yīng)的rpc服務(wù)。(實(shí)際調(diào)用的就是上述實(shí)現(xiàn)的CallMethod方法)
fixbug::UserServiceRpc_Stub stub(new MyRpcChannel());
//rpc方法的請求參數(shù)
fixbug::LoginRequest request;
request.set_name("zhang san");
request.set_pwd("123");
//rpc方法的響應(yīng),同步的rpc調(diào)用過程
fixbug::LoginResponse response;
stub.Login(nullptr, &request, &response, nullptr); //RpcChannel-->Rpchannel::callMethod 集中來做所有rpc方法調(diào)用的參數(shù)序列化和網(wǎng)絡(luò)發(fā)送
step3:處理阻塞的響應(yīng)消息response做相應(yīng)的業(yè)務(wù)處理。文章來源:http://www.zghlxwxcb.cn/news/detail-624287.html
//一次rpc調(diào)用完成,讀調(diào)用的結(jié)果
if(0 == response.result().errcode())
{
std::cout << "rpc login response success: " << response.success() << std::endl;
}
else
{
std::cout << "rpc login error msg : " << response.result().errmsg() << std::endl;
}
至此RpcConsumer基本實(shí)現(xiàn)。文章來源地址http://www.zghlxwxcb.cn/news/detail-624287.html
到了這里,關(guān)于Rpc服務(wù)消費(fèi)者(Rpc服務(wù)調(diào)用者)實(shí)現(xiàn)思路的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!