netty编解码之使用protobuf
Posted 叶长风
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty编解码之使用protobuf相关的知识,希望对你有一定的参考价值。
netty编解码之使用protobuf
protobuf这个序列化框架在我们公司使用了,我负责的模块中使用protobuf生成了一些model,然后使用了protostuff对缓存在redis中的数据进行序列化和反序列化,速度非常快,解决了一些当时的序列化和反序列化太慢的问题,这节来讲下netty中使用protobuf进行序列化的用法。
这里可以下载protobuf工具然后使用命令根据proto生成java类文件,也可以通过使用相应的protobuf插件来生成,这里提供一下protobuf的maven插件和相应依赖,编写的时候就不必再去网上寻找和进行调试了,如下:
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.2.0</version>
</dependency>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.0.0-beta-2:exe:$os.detected.classifier</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:0.13.2:exe:$os.detected.classifier</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
基本以上这些就能满足需求。
proto文件
使用protobuf需要另外编写proto文件,我编写的请求类与响应类如下:
SubscribeReq.proto
package netty;
option java_package = "cn.com.netty.codec.protobuf";
option java_outer_classname = "SubscribeReqProto";
message SubscribeReq
required int32 subReqID = 1;
required string userName = 2;
required string productName = 3;
required string address = 4;
SubscribeResp.proto
package netty;
option java_package = "cn.com.netty.codec.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp
required int32 subReqID = 1;
required int32 respCode = 2;
required string desc= 3;
这两个文件放在与src同级下的proto文件夹下,在控制台执行命令mvn install后可以在生成的target文件夹中发现生成的源码,如下图:
server程序
serverHandler类
package cn.com.protobuf;
import cn.com.netty.codec.protobuf.SubscribeReqProto;
import cn.com.netty.codec.protobuf.SubscribeRespProto;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Created by xiaxuan on 17/11/27.
*/
@ChannelHandler.Sharable
public class SubReqServerHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
if ("xiaxuan".equalsIgnoreCase(req.getUserName()))
System.out.println("Service accept client subscribe req : [" + req.toString() + "]");
ctx.writeAndFlush(resp(req.getSubReqID()));
private SubscribeRespProto.SubscribeResp resp(int subReqID)
SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
builder.setSubReqID(subReqID);
builder.setRespCode(0);
builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
return builder.build();
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
ctx.close();
这里的业务逻辑和之前没有太多区别,只是在构建resp返回的时候使用了protobuf生成的类。
server类
package cn.com.protobuf;
....
/**
* Created by xiaxuan on 17/11/27.
*/
public class SubReqServer
public void bind(int port)
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder()
);
ch.pipeline().addLast(
new ProtobufDecoder(
SubscribeReqProto.SubscribeReq.getDefaultInstance()
)
);
ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender()
);
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
);
//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
catch (InterruptedException e)
e.printStackTrace();
finally
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
public static void main(String[] args)
int port = 8080;
new SubReqServer().bind(port);
server和之前编写的程序其实也是没有明显区别的,就是加入的编解码的框架不同了,这里使用的是:
ch.pipeline().addLast(
new ProtobufDecoder(
SubscribeReqProto.SubscribeReq.getDefaultInstance()
)
);
ch.pipeline().addLast(new ProtobufEncoder());
这里使用了protobuf提供的ProtobufDecoder和ProtobufEncoder进行编解码,因此构造的resp响应中无需再进行手动编解码,在pipline中会自动进行编解码处理。
client程序
ClientHandler类
package cn.com.protobuf;
import cn.com.netty.codec.protobuf.SubscribeReqProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Created by xiaxuan on 17/11/27.
*/
public class SubReqClientHandler extends ChannelInboundHandlerAdapter
public SubReqClientHandler()
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
for (int i = 0; i < 10; i++)
ctx.write(subReq(i));
ctx.flush();
private SubscribeReqProto.SubscribeReq subReq(int i)
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqID(i);
builder.setUserName("xiaxuan");
builder.setProductName("netty book for protobuf");
builder.setAddress("beijin");
return builder.build();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("Receive server response : [" + msg + "]");
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
ctx.flush();
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
ctx.close();
client的业务逻辑处理类也是比较简单的,构建十个客户端请求然后一次性发出,最后打出服务端响应结果。
client类
package cn.com.protobuf;
import cn.com.netty.codec.protobuf.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* Created by xiaxuan on 17/11/28.
*/
public class SubReqClient
public void connect(int port, String host)
EventLoopGroup group = new NioEventLoopGroup();
try
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(
new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance())
);
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
);
//发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
catch (InterruptedException e)
e.printStackTrace();
finally
group.shutdownGracefully();
public static void main(String[] args)
new SubReqClient().connect(8080, "127.0.0.1");
在pipline中添加的处理器和server基本类似,但是都有一个没有提的就是ProtobufVarint32LengthFieldPrepender,进入这个类的源码中可以很清晰的看到如下注释:
/**
* An encoder that prepends the the Google Protocol Buffers
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html#varints">Base
* 128 Varints</a> integer length field. For example:
* <pre>
* BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes)
* +---------------+ +--------+---------------+
* | Protobuf Data |-------------->| Length | Protobuf Data |
* | (300 bytes) | | 0xAC02 | (300 bytes) |
* +---------------+ +--------+---------------+
* </pre> *
*
* @see @link CodedOutputStream or (@link CodedOutputByteBufferNano)
*/
作用就是在在传输的数据上加了一个消息头,这个就是用来进行一种对半包的处理,如果注释掉这行代码,在运行过程中可能会报错,ProtobufVarint32FrameDecoder在接收到半包消息的时候便无法处理,便会报错,因此一般要加上这个处理器。
运行结果
分别启动server和client,结果如下:
server:
client:
运行成功,在使用上还是比较简单。
以上是关于netty编解码之使用protobuf的主要内容,如果未能解决你的问题,请参考以下文章