C++编程中使用框架protobuf

Posted hsy12342611

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++编程中使用框架protobuf相关的知识,希望对你有一定的参考价值。

protobuf作为protocol buffer协议使用最广泛的框架之一,在很多编程语言中都有对应实现,本篇文章介绍以下C++语言中如何使用protobuf。

目录

1.编译安装protobuf

2.Protobuf2和Protobuf3

3.定义proto接口文件

3.1Protobuf2定义

3.2Protobuf2定义

4.C++语言中使用protobuf

 4.1生成C++接口文件

4.2PB对象与字节流互转,PB对象与文件流

4.3CodedOutputStream与CodedInputStream使用 

4.4prototxt文本文件写和读

​编辑 4.5FileOutputStream和FileInputStream操作二进制文件 

 4.6PB对象与std::stringstream


1.编译安装protobuf

参考博客:

protocol buffer协议的两种常用框架protobuf和nanopb_nanopb repeated_hsy12342611的博客-CSDN博客

2.Protobuf2和Protobuf3

Protobuf有两个版本,Protobuf2和Protobuf3,目前使用最多是Protobuf3,关于Protobuf2和Protobuf3,有如下点需要说明

(1)在Protobuf2中,消息的字段可以加required和optional修饰符,也支持default修饰符指定默认值。默认一个optional字段如果没有设置,或者显式设置成了默认值,在序列化成二进制格式时,这个字段会被去掉,反序列化后,无法区分是当初没有设置还是设置成了默认值但序列化时被去掉了,虽然Protobuf2对于原始数据类型字段都有hasXxx()方法,在反序列化后,对于这个“缺失”字段,hasXxx()总是false,因此失去了其判定意义。

(2)在 Protobuf3中,去掉了required和optional修饰符,所有字段都是optional的,而且对于原始数据类型字段(包括repeated),不提供hasXxx()方法。

(3)缺失值和默认
缺失值或者显示设置默认值,效果是一样,序列化时字段会被去掉。
对于整数类型,默认值为0。
对于字符串,默认值为空字符串。
对于字节,默认值为空字节。
对于布尔值,默认值为false。
对于数字类型,默认值为零。
对于枚举,默认值为第一个定义的枚举值,必须为0。

3.定义proto接口文件

以表达人信息为例,对比Protobuf2和Protobuf3定义person.proto

3.1Protobuf2定义

syntax = "proto2";
package cunion.test;


// proto2
message Person
    required string name = 1;
    required int32 id = 2;
    optional string email = 3;
     
    enum PhoneType
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
   
     
    message PhoneNumber
        required string number = 1;
        optional PhoneType type = 2 [default = HOME];
   
    optional repeated PhoneNumber phone = 4;
    message Addr
        optional string province = 1;
        optional string city = 2;
        optional string county = 3;
   
    optional Addr addr = 5;

3.2Protobuf2定义

syntax = "proto3";

package cunion.test;

// proto3

message Person

string name = 1;

int32 id = 2;

string email = 3;

enum PhoneType

MOBILE = 0;

HOME = 1;

WORK = 2;

message PhoneNumber

string number = 1;

PhoneType type = 2;

repeated PhoneNumber phone = 4;

message Addr

string province = 1;

string city = 2;

string county = 3;

Addr addr = 5;

4.C++语言中使用protobuf

本人安装是protobuf V3.6.1版本,所以就以protobuf3为例介绍C++中使用protobuf框架。

 4.1生成C++接口文件

/home/tiger/protobuf-3.6.1/protobuf/bin/protoc  -I=/home/tiger/cpp/protocalbuffer --cpp_out=/home/tiger/cpp/protocalbuffer  person.proto 

4.2PB对象与字节流互转,PB对象与文件流

#include <algorithm>
#include <iostream>
#include <string>
#include <iostream>
#include <sstream>
#include <fstream>  

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#include "person.pb.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"  // ArrayOutputStream
#include "google/protobuf/io/zero_copy_stream_impl.h"   // FileOutputStream    OstreamOutputStream
#include "google/protobuf/text_format.h"
#include "google/protobuf/util/delimited_message_util.h"

// ArrayOutputStream, FileOutputStream, OstreamOutputStream  
// ArrayInputStream, FileInputStream, OstreamInputStream均继承ZeroCopyOutputStream

using namespace ::cunion::test;
using namespace std;


// PB对象与字节流互转,PB对象与文件流
int use_1() 
    Person person1;
    person1.set_name("hello tree");
    //person1.has_phone(); //error
    std::cout << person1.has_addr() << std::endl;
    std::cout << "person1 " << person1.DebugString() << std::endl;

    std::string strPerson1;
    //PB对象转字节流
    person1.SerializeToString(&strPerson1);  // person1.SerializeToArray(buffer, n);
    Person person2;
    //字节流转PB对象
    person2.ParseFromString(strPerson1);
    std::cout << person2.has_addr() << std::endl;
    std::cout << "person2 " << person2.DebugString() << std::endl;

    const char* fileName = "person.bin";
    // 写文件
    std::fstream output(fileName, ios::out | ios::binary);
    if (!output)
    
        cout<< "output" << fileName << " error"<<endl;
      
    //PB对象转文件流
    person1.SerializeToOstream(&output);
    output.close();

    // 读文件
    fstream input(fileName, ios::in | ios::binary);
    if (!input)
    
        cout<<"input "<< fileName << " error"<<endl;
        return -1;
    
    Person person3;
    //文件流转PB对象
    if (!person3.ParseFromIstream(&input))
    
        cerr << "person3.ParseFromIstream !"<< endl;
    
    std::cout << "person3 " << person2.DebugString() << std::endl;



int main() 

    GOOGLE_PROTOBUF_VERIFY_VERSION;

    use_1();

    google::protobuf::ShutdownProtobufLibrary();

    return 0;

编译:

//错误编译方法1
g++  person.pb.cc  main.cpp  -I /home/tiger/protobuf-3.6.1/protobuf/include -lpthread  -L /home/tiger/protobuf-3.6.1/protobuf/lib  -lprotobuf  


//错误编译方法2
g++   -I /home/tiger/protobuf-3.6.1/protobuf/include -L /home/tiger/protobuf-3.6.1/protobuf/lib  -lpthread -lprotobuf person.pb.cc  main.cpp  

// 正确编译方法
g++ -I /home/tiger/protobuf-3.6.1/protobuf/include person.pb.cc  main.cpp  -L /home/tiger/protobuf-3.6.1/protobuf/lib  -lprotobuf -lpthread


运行:

export LD_LIBRARY_PATH=/home/tiger/protobuf-3.6.1/protobuf/lib
./a.out

运行结果如下:

4.3CodedOutputStream与CodedInputStream使用 

#include <algorithm>
#include <iostream>
#include <string>
#include <iostream>
#include <sstream>
#include <fstream>  

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#include "person.pb.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"  // ArrayOutputStream
#include "google/protobuf/io/zero_copy_stream_impl.h"   // FileOutputStream    OstreamOutputStream
#include "google/protobuf/text_format.h"
#include "google/protobuf/util/delimited_message_util.h"

// ArrayOutputStream, FileOutputStream, OstreamOutputStream  
// ArrayInputStream, FileInputStream, OstreamInputStream均继承ZeroCopyOutputStream

using namespace ::cunion::test;
using namespace std;


// CodedOutputStream与CodedInputStream使用
int use_2() 
    Person person;
    person.set_name("hello liudehua");

    int len = person.ByteSize();

    char *buffer = new char[len];
	memset(buffer, 0, len);
    std::shared_ptr<char> pBuf;
    pBuf.reset(buffer);

    //写入pBuf  
    google::protobuf::io::ArrayOutputStream arrayOut(pBuf.get(), len);
    google::protobuf::io::CodedOutputStream codedOut(&arrayOut);
    codedOut.WriteVarint32(len);
    person.SerializeToCodedStream(&codedOut);
    std::cout << "person " << person.DebugString() << std::endl;

    //读取pBuf
    Person person1;
	google::protobuf::io::ArrayInputStream array_in(pBuf.get(), len);
	google::protobuf::io::CodedInputStream coded_in(&array_in);
	google::protobuf::uint32 size;
	coded_in.ReadVarint32(&size);
	google::protobuf::io::CodedInputStream::Limit msg_limit = coded_in.PushLimit(size);
	person1.ParseFromCodedStream(&coded_in);
	coded_in.PopLimit(msg_limit);
    std::cout << "person1 " << person1.DebugString() << std::endl;


int main() 

    GOOGLE_PROTOBUF_VERIFY_VERSION;

    use_2();

    google::protobuf::ShutdownProtobufLibrary();

    return 0;

编译运行:

 g++ -I /home/tiger/protobuf-3.6.1/protobuf/include person.pb.cc  main.cpp  -L /home/tiger/protobuf-3.6.1/protobuf/lib  -lprotobuf -lpthread
./a.out 
运行结果如下:

通过运行结果看:

google::protobuf::io::ArrayOutputStream和google::protobuf::io::CodedOutputStream实现PB对象的序列化,google::protobuf::io::ArrayInputStream和google::protobuf::io::CodedInputStream实现反序列化。

4.4prototxt文本文件写和读

#include <algorithm>
#include <iostream>
#include <string>
#include <iostream>
#include <sstream>
#include <fstream>  

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#include "person.pb.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"  // ArrayOutputStream
#include "google/protobuf/io/zero_copy_stream_impl.h"   // FileOutputStream    OstreamOutputStream
#include "google/protobuf/text_format.h"
#include "google/protobuf/util/delimited_message_util.h"

// ArrayOutputStream, FileOutputStream, OstreamOutputStream  
// ArrayInputStream, FileInputStream, OstreamInputStream均继承ZeroCopyOutputStream

using namespace ::cunion::test;
using namespace std;

// FileOutputStream FileInputStream 
// prototxt文本文件
int use_3() 
    const char* fileName = "person1.prototxt";

    // 写入
    int fd = open(fileName, O_WRONLY | O_CREAT | O_TRUNC, 0777);
    google::protobuf::io::FileOutputStream * output = new google::protobuf::io::FileOutputStream(fd);
 
    Person person;
    person.set_name("hello guofucheng");
    person.set_id(110);
    google::protobuf::TextFormat::Print(person, output);
    delete output;
    close(fd);
    std::cout << "person " << person.DebugString() << std::endl;

    // 读取
    fd = open(fileName, O_RDONLY);
    Person person1;
    google::protobuf::io::FileInputStream * input = new google::protobuf::io::FileInputStream(fd);
    bool success = google::protobuf::TextFormat::Parse(input, &person1);
    delete input;
    close(fd);
    std::cout << "person1 " << person1.DebugString() << std::endl;



int main() 

    GOOGLE_PROTOBUF_VERIFY_VERSION;

    use_3();

    google::protobuf::ShutdownProtobufLibrary();

    return 0;

运行结果如下:

 4.5FileOutputStream和FileInputStream操作二进制文件 

#include <algorithm>
#include <iostream>
#include <string>
#include <iostream>
#include <sstream>
#include <fstream>  

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#include "person.pb.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"  // ArrayOutputStream
#include "google/protobuf/io/zero_copy_stream_impl.h"   // FileOutputStream    OstreamOutputStream
#include "google/protobuf/text_format.h"
#include "google/protobuf/util/delimited_message_util.h"

// ArrayOutputStream, FileOutputStream, OstreamOutputStream  
// ArrayInputStream, FileInputStream, OstreamInputStream均继承ZeroCopyOutputStream

using namespace ::cunion::test;
using namespace std;

// FileOutputStream和FileInputStream操作二进制文件
int use_4() 
    const char* fileName = "person3.bin";

    // 写入
    int fd = open(fileName, O_WRONLY | O_CREAT | O_TRUNC, 0777);
    
    google::protobuf::io::ZeroCopyOutputStream* raw_output = new google::protobuf::io::FileOutputStream(fd);
    google::protobuf::io::CodedOutputStream* coded_output = new google::protobuf::io::CodedOutputStream(raw_output);

    Person person;
    person.set_name("hello zhangxueyou");
    person.set_id(119);
    coded_output->WriteVarint32(person.ByteSize());
    person.SerializeToCodedStream(coded_output);

    Person person1;
    person1.set_name("hello liming");
    person1.set_id(120);
    coded_output->WriteVarint32(person1.ByteSize());
    person1.SerializeToCodedStream(coded_output);

    person1.set_name("hello xijinping");
    person1.set_id(121);
    coded_output->WriteVarint32(person1.ByteSize());
    person1.SerializeToCodedStream(coded_output);

    person1.set_name("hello liqiang");
    person1.set_id(122);
    coded_output->WriteVarint32(person1.ByteSize());
    person1.SerializeToCodedStream(coded_output);

    delete coded_output;
    delete raw_output;
    close(fd);

    std::cout << "person " << person.DebugString() << std::endl;
    std::cout << "person1 " << person1.DebugString() << std::endl;

    // 读取
    fd = open(fileName, O_RDONLY);
    google::protobuf::io::FileInputStream fin(fd);
    Person person2;
    
    google::protobuf::io::FileInputStream input(fd);
    google::protobuf::io::CodedInputStream codedIn(&input);

    google::protobuf::uint32 size;
    while (true) 
        if (!codedIn.ReadVarint32(&size)) 
            return -1;
        
        std::cout << "person2 " << size << std::endl;
	    google::protobuf::io::CodedInputStream::Limit msg_limit = codedIn.PushLimit(size);
	    person2.ParseFromCodedStream(&codedIn);
	    codedIn.PopLimit(msg_limit);
        std::cout << "person2 " << person2.DebugString() << std::endl;
    


int main() 

    GOOGLE_PROTOBUF_VERIFY_VERSION;

    use_4();

    google::protobuf::ShutdownProtobufLibrary();

    return 0;

运行结果如下:

 4.6PB对象与std::stringstream

#include <algorithm>
#include <iostream>
#include <string>
#include <iostream>
#include <sstream>
#include <fstream>  

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#include "person.pb.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"  // ArrayOutputStream
#include "google/protobuf/io/zero_copy_stream_impl.h"   // FileOutputStream    OstreamOutputStream
#include "google/protobuf/text_format.h"
#include "google/protobuf/util/delimited_message_util.h"

// ArrayOutputStream, FileOutputStream, OstreamOutputStream  
// ArrayInputStream, FileInputStream, OstreamInputStream均继承ZeroCopyOutputStream

using namespace ::cunion::test;
using namespace std;

// std::stringstream
int use_5() 
    std::stringstream stream;

    Person person;
    
    
        person.set_name("name one");
        person.set_id(1);
        google::protobuf::util::SerializeDelimitedToOstream(person, &stream);
    

    
        person.set_name("name two");
        person.set_id(2);
        google::protobuf::util::SerializeDelimitedToOstream(person, &stream);
    

    
        person.set_name("name three");
        person.set_id(3);
        google::protobuf::util::SerializeDelimitedToOstream(person, &stream);
    

    
        person.set_name("name four");
        person.set_id(4);
        google::protobuf::util::SerializeDelimitedToOstream(person, &stream);
    


    bool keep = true;
    bool clean_eof = true;
    Person person1;
    google::protobuf::io::IstreamInputStream zstream(&stream);
    while (keep)
    
        clean_eof = true;
        keep = google::protobuf::util::ParseDelimitedFromZeroCopyStream(&person1, &zstream, &clean_eof);
        std::cout << person1.name() << " " << person1.id() << std::endl;
    
    
    return 0;



int main() 

    GOOGLE_PROTOBUF_VERIFY_VERSION;

    use_5();

    google::protobuf::ShutdownProtobufLibrary();

    return 0;

运行结果如下:

从上面的举例可以看出protobuf框架对C++提供了丰富的操作API,利用这些API可以很容易处理java,python等语言处理过后的PB文件流。 

基于Protobuf的Rpc实现原理

RPC(remote procedure call)远程过程调用

常见RPC框架

  • gRPC:Google公布的开源软件,支持众多编程语言
  • Thrift:Facebook的开源RPC框架
  • Dubbo:阿里集团开源的一个极为出名的RPC框架,在很多互联网公司和企业应用中广泛使用。
  • BRPC:百度开源的RPC框架,已在百度有上百万实例的应用,C++语言实现

一. Protobuf介绍

主流序列化协议

xml:可扩展标记语言,是一种通用和重量级的数据交换格式,以文本格式存储

json:一种通用和轻量级的数据交换格式,以文本格式进行存储

protocol buffer:Google一种独立和轻量级的数据交换格式,以二进制格式进行存储

protobuf协议在编码和空间效率都是上非常高效的(做了大量的优化),这也是很多公司采用protobuf作为数据序列化和通信协议的原因:

  • 是一种 语言无关平台无关可扩展的序列化结构数据的方法,它可用于(数据)通信协议、数据存储等
  • 是一种 灵活高效自动化机制的结构数据序列化方法-可类比 XML,但是比 XML 更小(3 ~ 10倍)、更快(20 ~ 100倍)、更为简单。
  • 可以 自定义数据结构,然后使用特殊生成的源代码轻松的在各种数据流中使用各种语言进行编写和读取结构数据。你甚至可以更新数据结构,而不破坏由旧数据结构编译的已部署程序。

protobuffer使用流程:

  1. 编写.proto文件,定义协议内容

  2. protoc .proto文件,生成对应文件(.cc .h) protoc xx.proto --cpp_out=.(可生成对应语言的pb文件)

  3. 将文件引入项目;g++ test_msg.cc msg.pb.cc -lprotobuf

google protobuf只负责消息的打包和解包,包含了RPC的定义(但不包括实现)。也不包括通讯机制。

二. 如何实现一个简单的RPC框架?

远程调用:将要远方执行的方法和相关参数通过网络传到对端,对端收到数据后,对其进行解析然后执行;

需要解决的问题

  1. 如何知道远端有哪些接口是暴露的?( 服务发现
  2. 如何高效的将数据序列化并发送至远端?( 序列化+网络传输
  3. 远端如何知道传输过来的数据rpc调用以及调用哪个方法?( 服务寻址

Protobuf RPC框架

  • 客户端/服务端  service_stub: 用于获取和调用远端rpc方法
  • 客户端/服务端  rpc_service:实现暴露给远端的rpc方法
  • 网络传输模块
protobuf_rpc_flow

三. Protobuf+Python实现Rpc框架

1. 编写game_service.proto文件,并生成对应py文件

syntax = "proto3"; // 指定使用proto3语法
import "google/protobuf/empty.proto"; // 用于使用empty类型
package pluto; // 可理解为python模块
option py_generic_services = true; // 这个必须要,不然无法生成两个完整的类
message RequestMessage
{
string msg = 1; // 1 可理解为 tag 或者 id
}
message ResponseMessage
{
string msg = 1;
}
// service会生成两个类 一个IEchoService(作为被调用端) 一个IEchoService_Stub(继承IEchoService 作为调用方)
// 客户端调用服务端和服务端调用客户端是两个单独的过程(因为各自暴露的给对方的接口是不一样的)
// 客户端发给服务端 IEchoSerice供服务端实现对应方法 IEchoService_Stub客户端调用rpc方法
service IEchoService
{
rpc echo(RequestMessage) returns(google.protobuf.Empty);
}
// 服务端发给客户端
service IEchoClient
{
rpc echo_reply(ResponseMessage) returns(google.protobuf.Empty);
}

通过protoc game_service.proto --python_out=. 生成相应一个game_service_pb2.py文件,针对每一个service会生成 RpcServiceStubRpcService类:

  • RpcService:定义了被调用端暴露给调用端的函数接口,具体实现需要用户自己 继承这个类来实现。
  • RpcServiceStub:定义了被调用端暴露的函数的描述,调用端通过其 调用远端rpc方法,它会把调用统一转换到 RpcChannel中的 CallMethod方法

如何解决服务发现和服务寻址问题?

  • 在RPC中,所有函数都必须有自己的一个 ID本地调用中,函数体是直接通过 函数指针来指定的,但 远程调用中,函数指针是不行的, 因为两个进程的地址空间完全不一样。
  • 客户端和服务端 分别维护一个函数和 rpc ID的对应表( descriptor),在 远程调用时,查表找到相应rpc的ID,然后传给服务端
  • 服务端收到数据后,也根据 rpc ID查表,来确定客户端需要调用的函数,然后执行相应函数的代码。
_IECHOSERVICE = _descriptor.ServiceDescriptor(
  name='IEchoService',
  full_name='pluto.IEchoService',
  file=DESCRIPTOR,
  index=0,
  options=None,
  serialized_start=100,
  serialized_end=158,
  methods=[
  _descriptor.MethodDescriptor(
    name='echo',
    full_name='pluto.IEchoService.echo',
    index=0,
    containing_service=None,
    input_type=_REQUESTMESSAGE,
    output_type=_VOID,
    options=None,
  ),
])

IEchoService = service_reflection.GeneratedServiceType('IEchoService', (_service.Service,), dict(
  DESCRIPTOR = _IECHOSERVICE,
  __module__ = 'game_service_pb2'
  ))

IEchoService_Stub = service_reflection.GeneratedServiceStubType('IEchoService_Stub', (IEchoService,), dict(
  DESCRIPTOR = _IECHOSERVICE,
  __module__ = 'game_service_pb2'
  ))

2.rpc接口定义的实现

# echo_server.py
# 被调用方的Service要自己实现具体的rpc处理逻辑
class MyEchoService(IEchoService):
    def echo(self, controller, request, done):
        rpc_channel = controller.rpc_channel
        msg = request.msg
        response = ResponseMessage()
        response.msg = "echo:"+msg
        print "response.msg", response.msg
        # 此时,服务器是调用方,就调用stub.rpc,客户端时被调用方,实现rpc方法。
        client_stub = IEchoClient_Stub(rpc_channel)
        client_stub.echo_reply(controller, response, None)
        
# echo_client.py
class MyEchoClientReply(IEchoClient):
    def echo_reply(self, rpc_controller, request, done):
        print "MyEchoClientReply:%s"%request.msg
    if __name__ == "__main__":
        request = RequestMessage()
        request.msg = "hello world"
        client = TcpClient(LISTEN_IP, LISTEN_PORT, IEchoService_Stub, MyEchoClientReply)
        client.sync_connect()
        client.stub.echo(None, request, None)
        asyncore.loop()  # 启动poll

3. 实现通信层

使用python的asyncore(使用select IO多路复用技术来处理socket连接),以下为python2.7源码中IO多路复用的部分代码:

# python/asyncore.py
def poll(timeout=0.0, map=None):
    # ...
        try:
            r, w, e = select.select(r, w, e, timeout)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            else:
                return
        for fd in r:
            obj = map.get(fd)
            if obj is None:
                continue
            read(obj)  # obj即为asyncore.dispatcher
        for fd in w:
            obj = map.get(fd)
            if obj is None:
                continue
            write(obj)

通信模块需要实现TcpConnection,TcpServer,TcpClient

# TcpConnection.py
# 底层连接,服务数据读和写
class TcpConnection(asyncore.dispatcher):
    def handle_close(self):
        asyncore.dispatcher.handle_close(self)
        self.disconnect()
    def handle_read(self):  # recv
        data = self.recv(self.recv_buff_size)
        self.rpc_channel.input_data(data)
    def handle_write(self):  # 如果writable为true,则send
        if self.writebuff:
            sizes = self.send(self.writebuff)
            self.writebuff = self.writebuff[sizes:]
    def writable(self):  # 是否可写
        return len(self.writebuff) > 0
    # rpcChannel CallMethod调用
    def send_data(self, data):  # 发送数据
        self.writebuff += data

# TcpServer
# 负责accpet, 建立一条TcpConnection通道
class TcpServer(asyncore.dispatcher):
    def __init__(self, ip, port, service_factory):
        asyncore.dispatcher.__init__(self)
        self.ip = ip
        self.port = port
        self.service_factory = service_factory
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind((self.ip, self.port))
        self.listen(50)
    def handle_accept(self):
        sock, addr = self.accept()  # accept连接
        conn = TcpConnection(sock, addr)
        self.handle_new_connection(conn)
    def handle_new_connection(self, conn):
        service = self.service_factory()
        rpc_channel = RpcChannel(service, conn)
        conn.attach_rpc_channel(rpc_channel)

# TcpClient 用户客户端连接服务端
class TcpClient(TcpConnection):
    def __init__(self, ip, port, stub_factory, service_factory):
    pass
    def sync_connect(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect(self.peername)
        sock.setblocking(0)
        self.set_socket(sock)
        self.setsockopt()
        self.handle_connect()
    def handle_connect(self):
        self.status = self.ST_ESTABLISHED
        # service 是被动接收方 stub是主动发送方
        self.service = self.service_factory()
        self.channel = RpcChannel(self.service, self)
        self.stub = self.stub_factory(self.channel)
        self.attach_rpc_channel(self.channel)
    def writable(self):
        return TcpConnection.writable(self)

4. RpcChannel实现

需要继承google.protobuf.RpcChannel,实现CallMethodinput_data接口:

  • CallMethod:用于调用远端rpc方法,序列化+网络传输

  • input_data:接收rpc调用,反序列化,找到对应的method执行

    # rpc_channel.py
    from google.protobuf import service
    class RpcChannel(service.RpcChannel):
     def __init__(self, rpc_service, conn):
      super(RpcChannel, self).__init__()
      self.rpc_service = rpc_service
      self.conn = conn
            
      # 所有对远端的rpc调用都会走CallMethod,protobuf rpc的框架只是RpcChannel中定义了空的CallMethod,因此需要自己实现
      # 其会从method_descriptor获取对应的index,然后对数据进行序列化,发送至远端
     def CallMethod(self, method_descriptor, rpc_controller, request, response_class, done):
      index = method_descriptor.index
      data = request.SerializeToString()   # 序列化
      self.conn.send_data(''.join([struct.pack('!ih', total_len, index), data]))

      # 所有接收远端rpc数据都走input_data,同样需要自己实现
      # 无论是调用端还是被调用端,一个method_descriptor在其Service的index一致
      # 根据index找到对应的method,然后rpc_sevice.CallMethod找到对应的rpc方法并执行
     def input_data(self, data):
      total_len, index = struct.unpack('!ih', data[0:6])
      rpc_service = self.rpc_service
      s_descriptor = rpc_service.GetDescriptor()
      method = s_descriptor.methods[index]
      request = rpc_service.GetRequestClass(method)()
      request.ParseFromString(data)  # 反序列化
      rpc_service.CallMethod(method, self.rpc_controller, request, None)
      return True
python_rpc_flow

四. Protobuf + C++ Boost.Asio实现Rpc框架

Boost.Asio主要用于网络编程,为网络编程提供了很多I/O objects,比如boost::asio::ip::tcp::socket

  • 同步:client对远端的server进行调用,同时自己原地等待,等待rpc返回后,再进行之后的操作
  • 异步:rpc调用后,callmethod结束,继续执行后续动作,等rpc返回之后,会调用事先注册的回调函数执行

1. 同步实现方案(单向)

// test_Server
class MyEchoService : public echo::EchoService {
    virtual void Echo(::google::protobuf::RpcController*,
                     const ::echo::EchoRequest* request,
                     ::echo::EchoResponse* response,
                     ::google::protobuf::Closure* done)
 
{
  std::cout << request->msg() << std::endl;
         response->set_msg(std::string("I have recv '") + request->msg() + std::string("'"));
        done->Run();
    }
};

int main() {
    MyServer my_server;
    MyEchoService echo_service;
    my_server.add(&echo_service);  // 将echo_service添加到server的_services中
    my_server.start("127.0.0.1"8888)
}

// MyServer
class MyServer {
public:
    void add(::google::protobuf::Service* sevice) {
        _services[service_info.sd->name()] = service_info;
    }
    void start(const std::string& ip, const int port);
private:
    void dispatch_msg(...);  // 
    void on_resp_msg_filled(...);
    void pack_message(...);  // 序列化
}

// 创建socket,并accept
void MyServer::start(const std::string& ip, const int port) {
    boost::asio::io_service io;
    boost::asio::ip::tcp::acceptor acceptor(
        io, 
        boost::asio::ip::tcp::endpoint(boost::asio:ip::address::from_string(ip), port)
    )
;
    while (true) {
        // 处理连接,接收数据,发送数据
        auto sock = boost::make_shared<boost::asio::ip::tcp::socket>(io);
        acceptor.accept(*sock);  // 阻塞
        sock->receive(boost::asio::buffer(meta_size));  // 接收数据
        // 反序列化
        dispatch_msg(...);  // 分发数据,处理数据
        } 
    }
}

void MyServer::dipatch_msg(){
    // 根据index获取对应method
    auto service = _services[service_name].service;
    // 回调函数
    DoneParams params = {recv_msg, resp_msg, sock};
    auto done = ::google::protobuf::NewCallback(this, &MyServer::on_resp_msg_filled, params);
    // 调用test_Server实现的Echo方法(rpc方法)并执行
    service->CallMethod(md, &controller, recv_msg, resp_msg, done);
}

// test_Server/Echo/done->Run驱动,填充数据后调用
void MyServer::on_resp_msg_filled(DoneParams params) {
    // 序列化
    pack_message(params.resp_msg, &resp_str);
    // 发送至客户端
    params.sock->send(boost::asio::buffer(resp_str));
}

// test_client
int main() {
    MyChannel channel;
    channel.init("127.0.0.1"8888);  // 创建sock 连接服务端
    echo::EchoRequest request;
    echo::EchoResponse response;
    echo::EchoService_Stub stub(&channel);
    MyController cntl;
    stub.Echo(&cntl, &request, &respnose, NULL);
    std::cout << "resp: " << response.msg() << std::endl;
    return 0;
}

// Channel
// 一个纯虚函数 需要继承并实现
class MyChannel : public ::google::protobuf::RpcChannel {
public:
    void init(const std::string& ip, const int port) {
        _io = boost::make_shared<boost::asio::io_service>();
        _sock = boost::make_shared<boost::asio::ip::tcp::socket>(*_io);
        boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address::from_string(ip), port);
        _sock->connect(ep);
    }
    virtual void CallMethod(const ::google::protobuf::MethodDecriptor* method,
                           ::google::protobuf::RpcController*,
                           const ::google::protobuf::Message* request,
                           ::google::protobuf::Message* response,
                           ::google::protobuf::Closure*)
 
{
        // 序列化
        // 发送数据
        _sock->send(boost::asio::buffer(serialized_str));
        // 接收数据
        _sock->recieve(boost::asio::buffer(resp_data_size));  // 先收大小
        _sock->recieve(boost::asio::buffer(resp_data));  // 再收数据
        response->ParseFromString(std::string(&resp_data[0], resp_data.size())); // 反序列化到response
    }
};

2. 异步实现方案

ASIO异步的核心就是一个boost::asio::io_service类,(I/O Service 被 I/O context概念取代)

I/O service代表一个操作系统接口,在Windows中,boost::asio::io_service基于IOCP,在Linux中,其基于epoll

// TcpServer
class TcpServer
{

public:
    TcpServer(boost::asio::io_service & io);
    void sendMessageToAllClient(std::string str);
    void echo(std::string str);
private:
    boost::asio::ip::tcp::acceptor acceptor;
    std::vector<TcpConnection *> m_cons;  // 管理客户端连接
    TcpConnection * m_waitConn;
    boost::asio::io_service * m_ios;
    void _start();
    // 客户端连接时,回调
    void accept_handler(const boost::system::error_code & ec);
};

void TcpServer::_start(){
 m_waitCon = new TcpConnection(*m_ios);
 m_waitCon->addService(new EchoBackImplService(m_waitCon));//con有service的句柄。
 //目前只能接受一次连接 async_accept即为异步接口
 acceptor.async_accept(*m_waitCon->getSocket(), boost::bind(&TcpServer::accept_hander,this,boost::asio::placeholders::error));
}

// TcpClient
class TcpClient
{

public:
 TcpClient(io_service & io);
 TcpConnection * getConnection();
private:
 TcpConnection * m_con;
 ip::tcp::endpoint ep;
 void conn_hanlder(const boost::system::error_code & ec,TcpConnection * con);
};

// TcpConnection
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/asio/io_service.hpp>
#include <vector>
#include <google/protobuf/service.h>


using namespace boost::asio;
typedef boost::asio::ip::tcp::socket* sock_pt;
class TcpConnection:
 public google::protobuf::RpcChannel
{
public:
 TcpConnection(boost::asio::io_service & io);
 ~TcpConnection();
 //发送数据
 void sendMessage(std::string str);
 //发送数据回调
 void write_handler(const boost::system::error_code &);
 //接收到数据
 void read_handler(const boost::system::error_code& ec,boost::shared_ptr<std::vector<char>> str);
 //rpc server service
 void addService(google::protobuf::Service *serv);
 sock_pt getSocket();
    void CallMethod(const google::protobuf::MethodDescriptor* method,
                              google::protobuf::RpcController* controller,
                              const google::protobuf::Message* request,
                              google::protobuf::Message* response,
                              google::protobuf::Closure* done)
;
private:
 sock_pt _sock;
 std::vector<google::protobuf::Service*> rpcServices;
 //解析rpc string
 void deal_rpc_data(boost::shared_ptr<std::vector<char>> str);
};


void TcpConnection::sendMessage(std::string str){
 std::cout<<"发送:"<<str<<std::endl;
    // async_write_some 异步写
 _sock->async_write_some(
        boost::asio::buffer(str.c_str(), strlen(str.c_str())),
        boost::bind(&TcpConnection::write_handler, this,boost::asio::placeholders::error)
    );
}
void TcpConnection::read_handler(const boost::system::error_code& ec,boost::shared_ptr<std::vector<char>> str){
 deal_rpc_data(str);
 boost::shared_ptr<std::vector<char>> str2(new std::vector<char>(100,0));
    // async_read_some 异步读
 _sock->async_read_some(
        boost::asio::buffer(*str2),
        boost::bind(&TcpConnection::read_handler,this,boost::asio::placeholders::error,str2));
}

总结

主要介绍常用RPC框架主流序列化协议(为啥选取Protobuf),如何实现一个RPC框架,并提供了Python实现版本,以及C++同步和异步的两个版本实现。

参考

Python_RPC:https://github.com/yingshin/Tiny-Tools/tree/master/protobuf-rpc-demo

C++版同步RPC实现: https://izualzhy.cn/demo-protobuf-rpc

C++版异步RPC实现: https://github.com/shinepengwei/miniRPC


以上是关于C++编程中使用框架protobuf的主要内容,如果未能解决你的问题,请参考以下文章

未检查/强制执行 Protobuf C++ 必填字段

带有谷歌时间戳的 Protobuf C++ 消息导致段错误 [重复]

macbook pro 能装C++ JAVA JAVA eclipse这些编程软件吗?

从二进制文件中删除 protobuf c++ 编译的路径字符串

在Qt编程中,如何调用C++的STL?

如何将向量添加到重复字段protobuf c ++