2015年12月22日火曜日

Protocol Buffersを使ってC++でRPCをやってみた

Googleが作っているProtocolBuffersは、異なるPC間やプロセス間で関数コールやデータのやり取りをするライブラリです。いろいろな言語で動くので、サーバ間で分散処理を行うのにとても便利です。Googleのサーバ側の分散処理の最も重要な中核な技術といっても過言ではないと思います。ProtocolBuffersを制するものはサーバを制します。

しかしC++でこのProtocol Buffersを使ってRPCをやろうとするとちょっとめんどくさいからなのか、GoogleのProtocolBuffersの公式ドキュメントにもC++のRPCの完全なサンプルがありません。
みんなC++でProtocolBuffers使わないのかなぁ。ネットで検索しても世の中にあまりいいサンプルがありません。
なので、C++でProtocol Buffersを使ってTCP/IPでRPCをするサンプルを作ってみました。



まず、person.protoを定義します。
オプションにcc_generic_servicesをつけないとRPC用のServiceのヘッダーを生成してくれないので注意が必要です。


person.proto
-----------------------------

option cc_generic_services = true;

message Person {
  required int32 id = 1;
  required string name = 2;
}

message HelloResponse {
  required string message = 1;
}

service HelloService {
  rpc hello(Person) returns (HelloResponse);
}

-----------------------------
このファイルをprotocにかけて、person.pb.hとperson.pb.ccを生成します。




次にサーバ
main_server.cc
-----------------------------
#include "person.pb.h"
//#include "HelloRpcChannel.h"

#include "sock2_local.h"

static int myrecv(SOCKET sc,char* buf,int sz)
{
int ret = 0;
int readed = 0;
while (sz > 0){
ret = recv(sc, buf, sz, 0);
if (ret < 1)break;
sz -= ret;
buf += sz;
readed += sz;
}
return readed;
}

void hello(int id, const char* name, char* ret, int sz)
{
printf("id=%d\n", id);
printf("name=%s\n", name);
strcpy(ret, "hagehagehagehage");
}

int main(int argc, char* argv[]) {

SOCKET s=-1;
char buf[1024];
int sz = 0;
char*p;
int ret;
SOCKADDR_IN sa;
memset(&sa, 0, sizeof(sa));

winsock_init();

sa.sin_addr.s_addr = htonl(INADDR_ANY);
sa.sin_port = htons(12340);
sa.sin_family = PF_INET;

s = socket(PF_INET, SOCK_STREAM, 0);
if (s == -1)goto error;
ret = bind(s, (SOCKADDR*)&sa, sizeof(sa));
if (ret == -1){
printf("biund error\n");
goto error;
}
ret = listen(s, 20);
if (ret == -1){
printf("listen error\n");
goto error;
}
while (1){
char resbuf[256];
Person person;
HelloResponse res;
SOCKET sc;
SOCKADDR_IN sac;
memset(&sac, 0, sizeof(sac));
sz = sizeof(sac);
printf("before accept\n");
sc = accept(s,(SOCKADDR*)&sac,&sz);
printf("after accept\n");
if (sc == -1){
printf("accept error\n");
break;
}

ret = myrecv(sc, (char*)&sz, 4);
sz = ntohl(sz);
if (sz<1 || sz>1024)goto next;
printf("server recv data size=%d\n",sz);
myrecv(sc, buf, sz);
person.ParseFromArray(buf, sz);

resbuf[0] = 0;
hello(person.id(), person.name().c_str(), resbuf, sizeof(resbuf));
res.set_message(resbuf);
sz=res.ByteSize();
if (sz<1 || sz>1024)goto next;
printf("server send response %d\n",sz);
ret = htonl(sz);
send(sc, (char*)&ret, 4, 0);
res.SerializeToArray(buf, sz);
send(sc, buf, sz, 0);
next:
closesocket(sc);
}

error:
if(s!=-1)closesocket(s);
winsock_done();




return 0;
}
-----------------------------




最後にクライアント
main_client.cc
-----------------------------
#include "person.pb.h"

#include "sock2_local.h"


static int myrecv(SOCKET sc, char* buf, int sz)
{
int ret = 0;
int readed = 0;
while (sz > 0){
ret = recv(sc, buf, sz, 0);
if (ret < 1)break;
sz -= ret;
buf += sz;
readed += sz;
}
return readed;
}


class HelloRpcChannel :public google::protobuf::RpcChannel{
public:
HelloRpcChannel()
{
}
virtual ~HelloRpcChannel()
{
}
virtual void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done)
{
SOCKET s;
SOCKADDR_IN sa;
char buf[1024];
int sz;
int ret;

winsock_init();
s = socket(PF_INET, SOCK_STREAM, 0);
if (s == -1)goto error;

memset(&sa, 0, sizeof(sa));
sa.sin_port = htons(12340);
sa.sin_addr.s_addr = htonl(0x7f000001);
sa.sin_family = PF_INET;

ret = connect(s, (SOCKADDR*)&sa, sizeof(sa));
if (ret == -1){
printf("client connect NG\n");
goto error;
}
else{
printf("client connect OK\n");
}
sz = request->ByteSize();
if (sz<1 || sz>1000)goto error;

request->SerializeToArray(buf, sz);

printf("client request data size=%d\n", sz);
ret = htonl(sz);
ret = send(s, (char*)&ret, 4, 0);
ret = send(s, (char*)buf, sz, 0);
ret = myrecv(s, (char*)&sz, 4);
sz = ntohl(sz);
if (sz<1 || sz>1000)goto error;

ret = myrecv(s, buf, sz);
printf("client response data size=%d\n", sz);
response->ParseFromArray(buf, sz);
//printf("response Message=%s\n",esponse.message().c_str());
error:
if (s != -1)closesocket(s);
done->Run();
winsock_done();
}

};


class MyRpcController : public google::protobuf::RpcController{
public:
MyRpcController()
{
}
virtual ~MyRpcController()
{
}
virtual void Reset()
{
}
virtual bool Failed () const
{
return false;
}
virtual std::string ErrorText() const
{
return "";
}
virtual void StartCancel()
{
}
virtual void SetFailed(const std::string& reason)
{
}
virtual bool IsCanceled() const
{
return false;
}
virtual void NotifyOnCancel(google::protobuf::Closure* callback){
}
};

class MyClosure : public google::protobuf::Closure{
public:
MyClosure()
{
}
virtual ~MyClosure()
{
}
virtual void Run()
{
}
};


void hello(int id, const char* name, char* buf, int sz)
{
HelloRpcChannel hrc;

HelloService_Stub hss(&hrc);
Person req;
HelloResponse res;
MyRpcController con;
MyClosure clo;


if (name == NULL || buf == NULL || sz < 1)return;
buf[0] = 0;

req.set_id(123);
req.set_name("hogehoge");
hss.hello(&con, &req, &res, &clo);

strncpy(buf,res.message().c_str(),sz);
buf[sz - 1] = 0;
}


int main(int argc, char* argv[]) {
char buf[256];
hello(123, "hoge", buf, sizeof(buf));
printf("hello() return=>>%s<<\n ",buf);
return 0;
}
-----------------------------

DeepLearningのライブラリTensorFlowとかもこうやって分散処理させているのね。
多言語間のプロセス間通信って意外と簡単。
これで誰でも分散処理ができる。


0 件のコメント:

コメントを投稿