C ++中套接字上的协议缓冲区
Posted
技术标签:
【中文标题】C ++中套接字上的协议缓冲区【英文标题】:Protocol Buffer over socket in C++ 【发布时间】:2012-02-29 08:55:29 【问题描述】:我正在尝试探索 Linux 平台中的协议缓冲区 (PB),我的编码语言是 C++。我在协议缓冲区在线文档中找到了示例,但没有特定于套接字发送和接收的示例(或者我完全错过了它:))。所以我决定在实际消息之前添加消息长度并通过套接字发送。如果有人能提出比我计划做的更好的解决方案,我将不胜感激,并且在 PB 中是否有任何现成的用于创建此类数据包的东西。
但我仍然在服务器端遇到问题,我必须解码数据包。假设客户端发送一个 10 字节的数据包,其中前 4 个字节是数据包的长度;但是在解码数据包之前不可能知道长度。因此,即使我读取了前 4 个字节,我如何使用协议缓冲区推断出一半读取数据包的值。
【问题讨论】:
【参考方案1】:我终于可以让它工作了。我在这里发布代码,以便人们可以查看和评论它,以及如果有人想用 C++ 实现它,这段代码可以提供帮助。这是一个破旧的代码,我的意图是让 Protobuf 以长度前缀的方式工作。我从某个我不记得的站点获取了客户端服务器的代码,并对其进行了修改以适应 protobuf。在这里,服务器首先查看套接字并获取总数据包的长度,然后进行实际的套接字读取以读取整个数据包。可以有无数种方法来做到这一点,但为了快速解决问题,我以这种方式做到了。但我需要找到一种更好的方法来避免每个数据包 2 个接收,但在我的情况下,所有消息的大小都不同,所以这是我猜的唯一方法。
原始文件
message log_packet
required fixed64 log_time =1;
required fixed32 log_micro_sec =2;
required fixed32 sequence_no =3;
required fixed32 shm_app_id =4;
required string packet_id =5;
required string log_level=6;
required string log_msg=7;
协议缓冲区客户端代码
#include <unistd.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
using namespace google::protobuf::io;
using namespace std;
int main(int argv, char** argc)
/* Coded output stram */
log_packet payload ;
payload.set_log_time(10);
payload.set_log_micro_sec(10);
payload.set_sequence_no(1);
payload.set_shm_app_id(101);
payload.set_packet_id("TST");
payload.set_log_level("DEBUG");
payload.set_log_msg("What shall we say then");
cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
int siz = payload.ByteSize()+4;
char *pkt = new char [siz];
google::protobuf::io::ArrayOutputStream aos(pkt,siz);
CodedOutputStream *coded_output = new CodedOutputStream(&aos);
coded_output->WriteVarint32(payload.ByteSize());
payload.SerializeToCodedStream(coded_output);
int host_port= 1101;
char* host_name="127.0.0.1";
struct sockaddr_in my_addr;
char buffer[1024];
int bytecount;
int buffer_len=0;
int hsock;
int * p_int;
int err;
hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1)
printf("Error initializing socket %d\n",errno);
goto FINISH;
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) )
printf("Error setting options %d\n",errno);
free(p_int);
goto FINISH;
free(p_int);
my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = inet_addr(host_name);
if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 )
if((err = errno) != EINPROGRESS)
fprintf(stderr, "Error connecting socket %d\n", errno);
goto FINISH;
for (int i =0;i<10000;i++)
for (int j = 0 ;j<10;j++)
if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 )
fprintf(stderr, "Error sending data %d\n", errno);
goto FINISH;
printf("Sent bytes %d\n", bytecount);
usleep(1);
delete pkt;
FINISH:
close(hsock);
协议缓冲区服务器代码
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <netinet/in.h>
#include <resolv.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
using namespace std;
using namespace google::protobuf::io;
void* SocketHandler(void*);
int main(int argv, char** argc)
int host_port= 1101;
struct sockaddr_in my_addr;
int hsock;
int * p_int ;
int err;
socklen_t addr_size = 0;
int* csock;
sockaddr_in sadr;
pthread_t thread_id=0;
hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1)
printf("Error initializing socket %d\n", errno);
goto FINISH;
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) )
printf("Error setting options %d\n", errno);
free(p_int);
goto FINISH;
free(p_int);
my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = INADDR_ANY ;
if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 )
fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno);
goto FINISH;
if(listen( hsock, 10) == -1 )
fprintf(stderr, "Error listening %d\n",errno);
goto FINISH;
//Now lets do the server stuff
addr_size = sizeof(sockaddr_in);
while(true)
printf("waiting for a connection\n");
csock = (int*)malloc(sizeof(int));
if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1)
printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr));
pthread_create(&thread_id,0,&SocketHandler, (void*)csock );
pthread_detach(thread_id);
else
fprintf(stderr, "Error accepting %d\n", errno);
FINISH:
;//oops
google::protobuf::uint32 readHdr(char *buf)
google::protobuf::uint32 size;
google::protobuf::io::ArrayInputStream ais(buf,4);
CodedInputStream coded_input(&ais);
coded_input.ReadVarint32(&size);//Decode the HDR and get the size
cout<<"size of payload is "<<size<<endl;
return size;
void readBody(int csock,google::protobuf::uint32 siz)
int bytecount;
log_packet payload;
char buffer [siz+4];//size of the payload and hdr
//Read the entire buffer including the hdr
if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1)
fprintf(stderr, "Error receiving data %d\n", errno);
cout<<"Second read byte count is "<<bytecount<<endl;
//Assign ArrayInputStream with enough memory
google::protobuf::io::ArrayInputStream ais(buffer,siz+4);
CodedInputStream coded_input(&ais);
//Read an unsigned integer with Varint encoding, truncating to 32 bits.
coded_input.ReadVarint32(&siz);
//After the message's length is read, PushLimit() is used to prevent the CodedInputStream
//from reading beyond that length.Limits are used when parsing length-delimited
//embedded messages
google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz);
//De-Serialize
payload.ParseFromCodedStream(&coded_input);
//Once the embedded message has been parsed, PopLimit() is called to undo the limit
coded_input.PopLimit(msgLimit);
//Print the message
cout<<"Message is "<<payload.DebugString();
void* SocketHandler(void* lp)
int *csock = (int*)lp;
char buffer[4];
int bytecount=0;
string output,pl;
log_packet logp;
memset(buffer, '\0', 4);
while (1)
//Peek into the socket and get the packet size
if((bytecount = recv(*csock,
buffer,
4, MSG_PEEK))== -1)
fprintf(stderr, "Error receiving data %d\n", errno);
else if (bytecount == 0)
break;
cout<<"First read byte count is "<<bytecount<<endl;
readBody(*csock,readHdr(buffer));
FINISH:
free(csock);
return 0;
【讨论】:
谢谢!这对我遇到的问题非常有帮助 这真的很有帮助。 protobuf 在线路、套接字代码和 zerocopystreams 上的良好示例用法。 在我看来,也许尝试 SOCK_DGRAM 而不是流模型会在这里省去很多麻烦。只需将 protobuf 消息打包成数据报消息并设置它。因此,到达接收方的每个包裹都是完整的包裹,无需担心先达到最大尺寸等问题。 VarInt32 的实际大小通常只有 1 个字节,因此应该从标头中传入 coded_input.CurrentPosition(),而不是在正文中硬编码 +4 缓冲区大小 - 这通常是 1跨度> 【参考方案2】:不幸的是,protobuf 没有提供“打包”(分隔)您的消息的方法:
如果你想将多条消息写入单个文件或流,它 由您来跟踪一条消息的结束位置和下一条消息的结束位置 开始。协议缓冲区线格式不是自定界的,所以 协议缓冲区解析器无法确定消息的结束位置 自己的。解决这个问题最简单的方法是写大小 在您编写消息本身之前的每条消息。
(来自他们的documentation)
所以,他们基本上推荐了与您相同的解决方案。
【讨论】:
以上是关于C ++中套接字上的协议缓冲区的主要内容,如果未能解决你的问题,请参考以下文章