Netty + Protocol Buffers Java <-> C 通信问题

Posted

技术标签:

【中文标题】Netty + Protocol Buffers Java <-> C 通信问题【英文标题】:Netty + Protocol Buffers Java <-> C communication issue 【发布时间】:2017-09-04 16:27:51 【问题描述】:

我正在尝试使用 Netty 和协议缓冲区(以及加密,但这不会影响此问题)。服务器使用 Netty 用 Ja​​va 编写,客户端应该用 C 和 Java 编写。这是Java服务器端代码。

应用类:

@SpringBootApplication
public class Application 

public static void main(String[] args) throws InterruptedException 
    ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
    TcpServer tcpServer = context.getBean(TcpServer.class);
    tcpServer.start();


@Autowired
private SomethingChannelInitializer somethingChannelInitializer;

@SuppressWarnings( "unchecked", "rawtypes" )
@Bean
public ServerBootstrap bootstrap() 
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup(), workerGroup()).channel(NioserverSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(somethingChannelInitializer);
    Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
    Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
    for (ChannelOption option : keySet) 
        b.option(option, tcpChannelOptions.get(option));
    
    return b;


@Bean(name = "tcpChannelOptions")
public Map<ChannelOption<?>, Object> tcpChannelOptions() 
    Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
    options.put(ChannelOption.SO_KEEPALIVE, true);
    options.put(ChannelOption.SO_BACKLOG, 3);
    return options;


@Bean(destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() 
    return new NioEventLoopGroup(2);


@Bean(destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() 
    return new NioEventLoopGroup(2);


@Bean
public InetSocketAddress tcpPort() 
    return new InetSocketAddress(12888);


SomethingChannelInitializer 类:

@Component
public class SomethingChannelInitializer extends ChannelInitializer<SocketChannel> 

@Autowired
private ChannelInboundHandlerAdapter somethingServerHandler;

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception 
    ChannelPipeline pipeline = socketChannel.pipeline();

    // SSL stuff

    pipeline.addLast(new ProtobufVarint32FrameDecoder());
    pipeline.addLast(new ProtobufDecoder(ProtocolMessage.OneRequest.getDefaultInstance()));

    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
    pipeline.addLast(new ProtobufEncoder());

    pipeline.addLast(somethingServerHandler);



SomethingServerHandler 类:

@Component
@Sharable
public class SomethingServerHandler extends ChannelInboundHandlerAdapter 

private static Logger logger = LoggerFactory.getLogger(SomethingServerHandler.class);

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception 



@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
    logger.debug("read ...");
    ProtocolMessage.OneRequest req = (ProtocolMessage.OneRequest) msg;
    switch (req.getType()) 
    case LOGIN:
        logger.debug(", ", req.getLoginRequest().getLogin(), req.getLoginRequest().getPassword());
        break;
    case REGISTER:
        logger.debug(", ", req.getRegistrationRequest().getEmail(), req.getRegistrationRequest().getPassword());
        break;
    default:
        break;
    

    ProtocolMessage.RegistrationResponse registrationResponse = ProtocolMessage.RegistrationResponse.newBuilder().setStatus("got it").build();
    ProtocolMessage.OneResponse rsp = ProtocolMessage.OneResponse.newBuilder().setRegistrationResponse(registrationResponse).build();
    ctx.writeAndFlush(rsp);


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
    logger.error(cause.getMessage(), cause);
    //ctx.close();


@Override
public void channelInactive(ChannelHandlerContext ctx)



ProtocolMessage.proto:

package com.company.model;

message LoginRequest 
  required string login = 1;
  required string password = 2;


message RegistrationRequest 
  required string login = 1;
  required string email = 2;
  required string password = 3;


message RegistrationResponse 
  required string status = 1;


message OneRequest 
  enum Type  LOGIN = 1; REGISTER = 2; 

  required Type type = 1;
  oneof request 
    LoginRequest loginRequest = 2;
    RegistrationRequest registrationRequest = 3;
  


message OneResponse 
  oneof response 
    RegistrationResponse registrationResponse = 1;
  

C 客户端:

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <unistd.h>
#include <string.h>
#include <openssl/ssl.h>
#include <openssl/err.h>

#include "ProtocolMessage.pb-c.h"

#define HERR(source) (fprintf(stderr,"%s(%d) at  %s:%d\n",source,h_errno,__FILE__,__LINE__),\
         exit(EXIT_FAILURE))

int main(int argc, char **argv) 
// SSL initialization, making new connection etc.

Com__Company__Model__LoginRequest login = COM__COMPANY__MODEL__LOGIN_REQUEST__INIT;
login.login="sample_login";
login.password="secret_password";
Com__Company__Model__OneRequest req = COM__COMPANY__MODEL__ONE_REQUEST__INIT;
req.loginrequest=&login;
req.type=COM__COMPANY__MODEL__ONE_REQUEST__TYPE__LOGIN;

unsigned len = com__company__model__one_request__get_packed_size(&req);
void *buf = malloc(len);
com__company__model__one_request__pack(&req, buf);

SSL_write(clientssl, buf, len);
printf("SSL server sent %d\n", len);
SSL_shutdown(clientssl);
close(clientsocketfd);
SSL_free(clientssl);
SSL_CTX_free(ssl_client_ctx);
return 0;

服务器日志:

2017-09-04 18:14:37.209 DEBUG 63166 --- [ntLoopGroup-3-1] io.netty.handler.ssl.SslHandler          : [id: 0x445a7c98, L:/127.0.0.1:12888 - R:/127.0.0.1:50688] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
2017-09-04 18:14:37.218 ERROR 63166 --- [ntLoopGroup-3-1] p.o.g.handlers.SomethingServerHandler    : com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).

io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98) ~[netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1273) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1084) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) [netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]

Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89) ~[protobuf-java-2.6.1.jar!/:na]
    at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:158) ~[protobuf-java-2.6.1.jar!/:na]
    at com.company.model.ProtocolMessage$OneRequest.<init>(ProtocolMessage.java:2037) ~[classes!/:0.0.1-SNAPSHOT]
    at com.company.model.ProtocolMessage$OneRequest.<init>(ProtocolMessage.java:2000) ~[classes!/:0.0.1-SNAPSHOT]
    at com.company.model.ProtocolMessage$OneRequest$1.parsePartialFrom(ProtocolMessage.java:2116) ~[classes!/:0.0.1-SNAPSHOT]
    at com.company.model.ProtocolMessage$OneRequest$1.parsePartialFrom(ProtocolMessage.java:2111) ~[classes!/:0.0.1-SNAPSHOT]
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:137) ~[protobuf-java-2.6.1.jar!/:na]
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:168) ~[protobuf-java-2.6.1.jar!/:na]
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:174) ~[protobuf-java-2.6.1.jar!/:na]
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) ~[protobuf-java-2.6.1.jar!/:na]
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121) ~[netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:64) ~[netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) ~[netty-all-4.1.13.Final.jar!/:4.1.13.Final]
    ... 30 common frames omitted

protobuf-java 2.6.1 protobuf-c 1.0.2 libprotoc 2.6.1

基本上这是一些从网上获取并稍作修改的示例代码。不要查看硬编码值或其他缺陷。这仅用于学习目的。我可以使用 Java 客户端与服务器通信,没有任何问题。但是,当我尝试从 C 客户端发送相同的消息时,服务器会立即引发异常。我读过一些关于定界的东西,但我不知道如何处理。我也尝试过首先将消息的长度作为 int 发送,然后是来自 C 客户端的实际消息,但这也无济于事。加密在这里不是问题。如果我禁用它,我会得到相同的结果。

我在这里缺少什么?这是否可能与 C 客户端进行通信?

【问题讨论】:

你分析网络流量了吗?使用wireshark 之类的工具?只是为了得到一个提示,在哪一边搜索问题。 【参考方案1】:

尝试使用 LengthFieldBasedFrameDecoder 代替 ProtobufVarint32FrameDecoder, 并先发送包裹长度:

void *buf = malloc(len);

// send package len
buf[0] = (len >> 24) & 0xFF;
buf[1] = (len >> 16) & 0xFF;
buf[2] = (len >> 8) & 0xFF;
buf[3] = len & 0xFF;
SSL_write(clientssl, buf, 4);

// send package
com__company__model__one_request__pack(&req, buf);
SSL_write(clientssl, buf, len); 

您需要根据需要指定字节顺序 BIG_ENDIAN 或 LITTLE_ENDIAN。

【讨论】:

以上是关于Netty + Protocol Buffers Java <-> C 通信问题的主要内容,如果未能解决你的问题,请参考以下文章

Protocol buffers--python 实践 protocol buffers vs json

Protocol Buffers教程

Protocol buffers编写风格指南

从Protocol Buffers 到 gRPC

Google Protocol Buffers浅析

Java使用Protocol Buffers入门四步骤