protobuf源码解析与netty+rpc实战
Posted 有山先生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了protobuf源码解析与netty+rpc实战相关的知识,希望对你有一定的参考价值。
1. 背景
grpc由protobuf+netty实现,为了研究grpc框架的设计思想,本文从protobuf生成的java源码出发,脱离grpc框架研究protobuf的框架。在此基础上,基于protobuf+netty手写一个rpc框架,熟悉rpc的设计思路。
2. 逐步深入protobuf
2.1 protobuf工作流程
protobuf全称是protocol buffers,它是一个序列化/反序列化框架。.proto文件定义消息结构,通过protoc命令编译.proto文件成为java类,实例化java对象,填入消息内容,调用序列化接口将java实例转化成为二进制数据,传输到对端,通过反序列化接口将二进制数据转化成为java实例。这就是protobuf工作全流程。
2.2 安装protobuf编译器
protobuf编译器负责将.proto文件编译成java代码。安装时,使用源码编译安装即可,没遇到坑。安装步骤如下:
#到github中下载protobuf源码包,地址为https://github.com/protocolbuffers/protobuf/releases/tag/v3.6.1
tar -xzf protobuf-3.6.1.tar.gz
cd protobuf-3.6.1
# 对即将安装的软件进行配置,检查当前的环境是否满足要安装软件的依赖关系
./configure --prefix=/usr/local/
# 编译 && 安装
make && make install
protoc --version
2.3 编写.proto文件
如下所示,定义一个最简单的hello.proto文件。文件中,定义了HelloRequest类型的消息,消息中只包含一个字符串类型的greeting字段:
syntax = "proto3";
option java_outer_classname="Hello";
package protobuf;
message HelloRequest
string greeting = 1;
编译后,会生成类似如下结构的java类:
public class Hello
class HelloRequest
private String greeting;
2.4 Idea引入protobuf依赖
如果不需要rpc功能,只需要引入protobuf-java包。protoc生成的java类会依赖这个包:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.9.0</version>
</dependency>
2.5 编译.proto文件
方法一:protoc命令编译
通过protoc --java_out=src/main/java/ src/main/proto/hello.proto
命令,编译src/main/proto/hello.proto文件,在src/main/java/目录下生成Hello文件。生成的java类结构如下,最终使用到的就是HelloRequest类。java生成类解析会在后面会展开:
方法二:配置plugin编译
如果本地不方便安装protoc命令,可通过配置pom插件的方式编译.protoc文件,注意,要使配置生效,一定要指定通过<protoSourceRoot>指定.proto文件所在的目录;通过<outputDirectory>指定编译的java文件保存的目录。
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.9.1:exe:$os.detected.classifier</protocArtifact>
<protoSourceRoot>$project.basedir/src/main/java/proto</protoSourceRoot>
<!-- 生成文件的目录 -->
<outputDirectory>$project.basedir/src/main/java/</outputDirectory>
<!-- 生成文件前是否把目标目录清空,这个最好设置为false,以免误删项目文件 -->
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
proto项目结构如下:
如下,插件会自动下载所需版本的protoc编译器:
注意,可以省略<outputDirectory>配置,这样.proto的生成类会放在generated-sources目录中:
调用maven package命令打包后,generated-sources也会出现在jar包中:
因此建议只写protoSourceRoot配置,省略outputDirectory配置。
2.6 编写java应用 & 运行
先写一个序列化/反序列化示例体验一下protobuf:
package protobuf;
import com.google.protobuf.InvalidProtocolBufferException;
public class HelloTest
public static void main(String[] args) throws InvalidProtocolBufferException
//创建builder
Hello.HelloRequest.Builder helloRequest = Hello.HelloRequest.newBuilder();
//通过builder构建对象
Hello.HelloRequest request = helloRequest.setGreeting("yes").build();
//打印
System.out.println(request);
//序列化
byte[] bytes = request.toByteArray();
//反序列化
Hello.HelloRequest deserRequest = Hello.HelloRequest.parseFrom(bytes);
//打印
System.out.println(deserRequest);
//查看是否是同一个对象,结果是false,说明是两个不同的对象
System.out.println(deserRequest == request);
//查看对象是否相同,结果为true,表示两个对象成员变量相同
System.out.println(deserRequest.equals(request));
打印结果如下:
可以看到,序列化前后,打印结果一致,证明了protobuf的正确性。
2.7 protobuf java生成类解析
上述hello.proto是最简单的.proto文件,其生成的java类如下所示:
descriptor解析
生成的Hello类包含三个FileDescriptor、Descriptor、FieldAccessorTable静态成员变量,它记录了hello.proto文件定义的信息:
//静态成员
private static com.google.protobuf.Descriptors.FileDescriptor descriptor;
private static final com.google.protobuf.Descriptors.Descriptor internal_static_protobuf_HelloRequest_descriptor;
private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_protobuf_HelloRequest_fieldAccessorTable;
//getter方法
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor()
return descriptor;
//静态代码块,初始化静态变量
static
//生成的Java类已经包含了hello.proto文件内容
java.lang.String[] descriptorData =
"\\n\\032src/main/proto/hello.proto\\022\\010protobuf\\" " +
"\\n\\014HelloRequest\\022\\020\\n\\010greeting\\030\\001 \\001(\\tB\\007B\\005Hell" +
"ob\\006proto3"
;
descriptor = com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[]
);
internal_static_protobuf_HelloRequest_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_protobuf_HelloRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_protobuf_HelloRequest_descriptor,
new java.lang.String[] "Greeting", );
三种数据类型的含义如下所示:
- FileDescriptor:对一个proto文件的描述,它包含文件名、包名、选项(如java_package、java_outer_classname等)、文件中定义的所有message、文件中定义的所有enum、文件中定义的所有service、文件中所有定义的extension、文件中定义的所有依赖文件(import)等。在FileDescriptor中还存在一个DescriptorPool实例,它保存了所有的dependencies(依赖文件的FileDescriptor)、name到GenericDescriptor的映射、字段到FieldDescriptor的映射、枚举项到EnumValueDescriptor的映射,从而可以从该DescriptorPool中查找相关的信息,因而可以通过名字从FileDescriptor中查找Message、Enum、Service、Extensions等。
如下是FileDescriptor成员内容,其中FileDescriptorProto成员记录hello.proto文件内容:
messagesType成员变量记录message类型信息,它是数组类型,记录所有message类型:
而在messagesType成员变量中,记录了字段信息,例如字段名:
FileDescriptor还包含pool成员变量,记录所有Descriptor成员变量以及filed信息,直接通过pool可以遍历所有Descriptor:
- Descriptor:对一个message定义的描述,它包含该message定义的名字、所有字段、内嵌message、内嵌enum、关联的FileDescriptor等。可以使用字段名或字段号查找FieldDescriptor。
Descriptor记录message信息,可以通过FileDescriptor找到:
- FieldAccessorTable:对一个字段或扩展字段定义的描述,它包含字段名、字段号、字段类型、字段定义(required/optional/repeated/packed)、默认值、是否是扩展字段以及和它关联的Descriptor/FileDescriptor等。
FieldAccessorTable记录字段信息:
HelloRequestOrBuilder解析
HelloRequestOrBuilder是一个接口,定义get方法获取HelloRequest成员变量的值:
HelloRequest类解析
HelloRequest文件中,包含如下信息。其中,parseFrom用于反序列化:
其中:
-
Builder用于构建HelloRequest对象:
- HelloRequest的父类GeneratedMessageV3中包含unknownFields字段,该字段用于处理proto版本不一致的情况。比如服务端proto增加一个字段,但是客户端没有增加,可以选择在客户端unknownFields中增加对应字段。不过这种情况建议同步更新客户端的proto文件,重新编译整个项目。
2.8 protobuf中的service类型
首先看下service类型的定义和使用:
通过在hello.proto文件中配置java_generic_services=true
可以让protoc编译器生成HelloService代码:
syntax = "proto3";
option java_outer_classname="Hello";
package protobuf;
option java_generic_services = true;
service HelloService
rpc SayHello (HelloRequest) returns (HelloResponse);
message HelloRequest
string greeting = 1;
message HelloResponse
string reply = 1;
hello.proto定义了HelloService类型,HelloService是service类型。它定义了输入参数是HelloRequest,输出参数是HelloResponse的SayHello方法。在SayHello名称前面可以看到rpc关键字,就可以知道这个方法是用于进行远程调用的。加入service概念也非常合理,序列化/反序列框架一般的场景就是进行网络传输,而网络传输过程中,通过rpc的方式请求服务端最方便。protobuf不是rpc框架,在protobuf中定义service后,会生成rpc抽象类,具体通信过程,需要用户自定义实现,或者引入grpc框架生成通信过程的实现。
编译后hello.proto文件,Hello增加了HelloResponse这个message对应的descriptor。生成了三个重要的子类:HelloRequest、HelloResponse、HelloServer:
HelloService简析:
HelloService有两个接口及其子类:BlockingInterface/BlockingStub和Interface/Stub,其中,BlockingStub是BlockingInterface的实现类;Stub是Interface的实现类。如下所示:
其它就是HelloService相关的构建方法以及存根对象的构建方法,不再解析:
Stub存根
Stub的中文解释是存根,存根的意思是票据、证件等开出后所留的底子。RPC中出现的stub,表达的意思差不多;服务调用过程中,真正的方法逻辑存在于服务端中,那么客户端保存就是服务端真实方法的一个存根(也可以认为是服务端的代理,存放服务端的地址等信息);即当客户端需要远程访问服务端方法的时候,就可以凭借服务端在客户端中的存根来组装发起远程调用所需要的信息;类似于我在银行存了一笔钱,下回来存钱的时候,就可以凭借存根,知道我上回存款信息。在RPC的Stub中,实现了网络通信框架。
RPC过程如下:
- 客户端调用客户端stub(client stub)。这个调用是在本地,并将调用参数push到栈(stack)中。
- 客户端stub(client stub)将这些参数包装,并通过系统调用发送到服务端机器。打包的过程叫 marshalling。(常见方式:XML、JSON、二进制编码)
- 客户端本地操作系统发送信息至服务器。(可通过自定义TCP协议或HTTP传输)
- 服务器系统将信息传送至服务端stub(server stub)。
- 服务端stub(server stub)解析信息。该过程叫 unmarshalling。
- 服务端stub(server stub)调用程序,处理客户端请求,返回结果给客户端,此过程无需调用客户端的rpc方法。
如图所示:
marshalling跟serialization的本质都是序列化,二者的区别如下:
- Serialization:负责传输对象、对象持久化。serialize对象的时候,只会将该对象内部数据写进字节流。
- Marshalling:负责远程传输参数(RMI的时候)。serialize对象的时候,除了对象内部数据,还会包含一些codebase信息(比如实现该对象的代码位置信息等)。
同步和异步RPC
BlockingStub就是同步阻塞RPC,Stub就是异步RPC。同步RPC表示客户端远程调用服务端方法时,阻塞等待服务端结果。如下可以看到BlockingInterface的sayHello方法只有controller和request参数,它会阻塞调用RPC:
public interface BlockingInterface
public protobuf.Hello.HelloResponse sayHello(
com.google.protobuf.RpcController controller,
protobuf.Hello.HelloRequest request) throws com.google.protobuf.ServiceException;
异步RPC表示客户端远程调用服务端方法时,不阻塞等待服务端结果,直接处理下一条命令,但与此同时,客户端在调用方法时,必须传入回调方法,当服务端返回结果时,会自动执行回调方法。如下所示,Stub的接口Interface的sayHello方法就必须传入回调对象:
public interface Interface
public abstract void sayHello(
com.google.protobuf.RpcController controller,
protobuf.Hello.HelloRequest request,
com.google.protobuf.RpcCallback<protobuf.Hello.HelloResponse> done);
一般RPC框架会实现HelloService类,如果只使用序列化功能,没必要生成HelloService这个类。
3. 手写基于Java序列化的RPC Demo
在了解grpc之前,需要熟悉RPC实现思想,因此这里我手动实现RPC Demo,在这个过程中,逐步优化RPC代码。
3.1 项目代码框架
先将RPC底层的通信框架写出来,其中,通信框架就用netty实现:
maven netty依赖如下:
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.52.Final</version>
</dependency>
</dependencies>
3.2 netty服务端基础框架
服务端main方法就是通用代码,客户端请求的处理逻辑在ServerHandler类中:
public class Server
public static void main(String[] args)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);
try
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioserverSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new ServerHandler());
);
ChannelFuture future = serverBootstrap.bind(8088).sync();
future.channel().closeFuture().sync();
catch (Exception e)
e.printStackTrace();
finally
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
重写channelRead和exceptionCaught方法,channelRead用于接收客户端数据,并发送数据;exceptionCaught用于客户端连接异常,一定要重写,不然会打印异常堆栈:
public class ServerHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if(msg instanceof ByteBuf)
String res = ((ByteBuf) msg).toString(Charset.defaultCharset());
System.out.println(res);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("hello".getBytes(StandardCharsets.UTF_8));
ctx.channel().writeAndFlush(buffer);
super.channelRead(ctx, msg);
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
System.out.println(cause.getLocalizedMessage());
//super.exceptionCaught(ctx, cause);
3.3 netty客户端基础框架
客户端在connect方法与服务端建立连接后,发送请求数据
public class Client
public static void main(String[] args)
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
try
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new ClientHandler());
);
//与服务端建立连接
ChannelFuture channelFuture = bootstrap.connect("localhost", 8088).sync();
//申请空间,向服务端发送数据
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("hello".getBytes(StandardCharsets.UTF_8));
ChannelFuture future = channelFuture.channel().writeAndFlush(buffer);
future.channel().closeFuture().sync();
catch (Exception e)
throw new RuntimeException(e);
finally
workerGroup.shutdownGracefully();
客户端handler用于打印服务端的响应:
public class ClientHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("received msg");
//打印
if(msg instanceof ByteBuf)
String res = ((ByteBuf) msg).toString(Charset.defaultCharset());
System.out.println(res);
super.channelRead(ctx, msg);
3.4 基础框架通信演练
依此启动服务端和客户端,发现客户端接收到了客户端请求,打印了客户端数据:
服务端后续向客户端响应,客户端打印数据:
上述通信结果符合预期。
3.5 扩展:netty优雅关闭
观察netty服务端/客户端不难发现,main方法大致框架如下:
public static void main(String[] args)
NioEventLoopGroup group = new NioEventLoopGroup();
try
//bootstrap启动逻辑,省略
future.channel().closeFuture().sync();
catch (Exception e)
e.printStackTrace();
finally
group.shutdownGracefully();
3.4.1 future.channel().closeFuture().sync()解析
以客户端为例,future.channel()方法就是NioServerChannel。NioServerChannel继承AbstractChannel,AbstractChannel有CloseFuture静态内部类,它继承自DefaultChannelPromise方法:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel
private final CloseFuture closeFuture = new CloseFuture(this);
public ChannelFuture closeFuture()
return closeFuture;
static final class CloseFuture extends DefaultChannelPromise
CloseFuture(AbstractChannel ch)
super(ch);
@Override
public ChannelPromise setSuccess()
throw new IllegalStateException();
@Override
public ChannelPromise setFailure(Throwable cause)
throw new IllegalStateException();
@Override
public boolean trySuccess()
throw new IllegalStateException();
@Override
public boolean tryFailure(Throwable cause)
throw new IllegalStateException();
//netty包外无法访问该方法,package-private范围
boolean setClosed()
return super.trySuccess();
DefaultChannelPromise定义了sync方法:
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint
public ChannelPromise sync() throws InterruptedException
super.sync();
return this;
DefaultChannelPromise调用了父类juc中的DefaultPromise的sync方法,而DefaultPromise#sync首先就调用了DefaultPromise#await方法。DefaultPromise表示异步操作结果:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V>
public Promise<V> sync() throws InterruptedException
await();
rethrowIfFailed();
return this;
DefaultPromise#await方法内部会通过isDone检查异步操作是否结束,如果结束就直接返回,否则,调用Object#wait阻塞,等待Object#notify唤醒:
public Promise<V> await() throws InterruptedException
if (isDone())
return this;
if (Thread.interrupted())
throw new InterruptedException(toString());
checkDeadLock();
synchronized (this)
while (!isDone())
incWaiters();
try
wait();
finally
decWaiters();
return this;
从上面可以了解到,future.channel().closeFuture().sync()
最终调用Object#wait()方法将main线程阻塞。那么netty应用程序可以直接唤醒main线程吗?答案是不行。因为唯一可能唤醒main线程的就是CloseFuture#setClosed()方法,但是该方法是package-private的:
boolean setClosed()
return super.trySuccess();
只有netty包内部才能调用setClosed方法,也就是说,netty包内部其他public 方法封装了CloseFuture#setClosed()方法,解除了main线程的阻塞状态。setClosed()方法最终调用DefaultPromise#setSuccess0()方法,它用于设置result成员的值:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V>
private boolean setSuccess0(V result)
return setValue0(result == null ? SUCCESS : result);
当设置了result的值后,调用isDone()方法时,就返回true了,表示异步操作直接结束:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V>
private static boolean isDone0(Object result)
return result != null && result != UNCANCELLABLE;
当然,还有一个问题,就是在哪里调用Object#notify唤醒wait()阻塞状态,这其实在setValue0中就解决了,那就是notifyListeners方法,Listener实现包含Object#notify逻辑,这部分在netty优雅关闭中详解:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V>
private boolean setValue0(Object objResult)
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult))
if (checkNotifyWaiters())
notifyListeners();
return true;
return false;
通过寻找调用CloseFuture#setClosed()方法的逻辑,发现以下条件会给DefaultPromise的result对象setValue:
- 当register注册失败时。
- 当flush数据失败时。
- 优雅关闭时。
这时触发Listeners逻辑,就会唤醒主线程,future.channel().closeFuture().sync()
结束执行。
3.4.2 netty优雅关闭
netty客户端/服务端退出前,最好先把剩下的task执行完,后面关闭socketchannel,再关闭selector,最后关闭NioEventLoop线程。netty的NioEventLoopGroup实现了shutdownGracefully实现上述优雅关闭流程。
在服务端和客户端代码可以看到,最后都会在finally代码块中执行group.shutdownGracefully()方法,用于实现优雅关闭。
netty优雅关闭流程如下:
- 执行NioEventLoopGroup的shutdownGracefully方法
- 转化为执行NioEventLoopGroup中的每个NioEventLoop的shutdownGracefully带参方法
- 修改NioEventLoop的状态为STSHUTTINGDOWN,唤醒主流程
- 执行closeAll操作: (1)取消NioTask的任务 (2)有请求正在响应,等待响应 (3)关闭channel的连接,触发pipeline inActive事件 (4)释放已经flush和未flush的消息对象 (5)取消channel在selector上的注册,触发pipeline deregister事件
- 执行confirmshutdown操作 (1)将所有的定时任务取消 (2)执行完成所有的task任务 (3)执行完所有的shutdownhook任务 (4)等待quiteperiod时间(默认2s),如果有新的任务提交,重新执行主流程操作,也就会重新从步骤4开始执行(4已经执行完成后,再次执行也是直接退出,其实就是重新执行步骤5)。如果在quiteperiod时间段内没有新的任务提交,则返回true确认关闭,否则等待到 shutdowntimeout超时(默认15s)。
- cleanupAndTerminate操作 修改NioEventLoop的状态,关闭selector。
- 各个NioEventLoop关闭完成,触发监听回调,为Goup的terminationFuture设值,同时关闭executor。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
if (timeout < quietPeriod)
throw new IllegalArgumentException(
"timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
ObjectUtil.checkNotNull(unit, "unit");
if (isShuttingDown())
return terminationFuture();
boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;)
if (isShuttingDown())
return terminationFuture();
int newState;
wakeup = true;
oldState = state;
if (inEventLoop)
//设置当前流程的状态为正在关闭状态
newState = ST_SHUTTING_DOWN;
else
switch (oldState)
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default:
newState = oldState;
wakeup = false;
if (STATE_UPDATER.compareAndSet(this, oldState, newState))
break;
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
if (ensureThreadStarted(oldState))
return terminationFuture;
if (wakeup)
//向EventLoop线程添加一个空作业,EventLoop会执行后续
taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp)
wakeup(inEventLoop);
return terminationFuture();
NioEventLoop执行完task后,执行closeAll方法关闭socketChannel和selector,最后执行confirmShutdown方法后退出NioEventLoop线程:
public final class NioEventLoop extends SingleThreadEventLoop
protected void run()
int selectCnt = 0;
for (;;)
//省略
// Always handle shutdown even if the loop processing threw an exception.
try
if (isShuttingDown())
closeAll();
//如果关闭成功了,就退出NioEventLoop线程
if (confirmShutdown())
return;
catch (Throwable t)
handleLoopException(t);
如下,closeAll方法用于关闭socketChannel和selector:
public final class NioEventLoop extends SingleThreadEventLoop
private void closeAll()
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys)
Object a = k.attachment();
if (a instanceof AbstractNioChannel)
channels.add((AbstractNioChannel) a);
else
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
for (AbstractNioChannel ch: channels)
ch.unsafe().close(ch.unsafe().voidPromise());
3.4.3 客户端断开连接
如下,当客户端连上服务端,并发送请求数据后直接立刻断开连接:
此时服务端报错,表示连接意外断开:
如下,netty负责读取客户端数据,在读取过程中,客户端突然断开连接,服务端进入catch代码块的handleReadException逻辑,处理异常:
public abstract class AbstractNioByteChannel extends AbstractNioChannel
protected class NioByteUnsafe extends AbstractNioUnsafe
public final void read()
final ChannelConfig config = config();
if (shouldBreakReadReady(config))
clearReadPending();
return;
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try
do
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0)
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close)
// There is nothing left to read as we received an EOF.
readPending = false;
break;
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close)
closeOnRead(pipeline);
catch (Throwable t)
handleReadException(pipeline, byteBuf, t, close, allocHandle);
finally
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead())
removeReadOp();
此时进入handleReadException执行pipeline.fireExceptionCaught(cause)
处理异常:
public abstract class AbstractNioByteChannel extends AbstractNioChannel
protected class NioByteUnsafe extends AbstractNioUnsafe
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle)
if (byteBuf != null)
if (byteBuf.isReadable())
readPending = false;
pipeline.fireChannelRead(byteBuf);
else
byteBuf.release();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//抛出异常
pipeline.fireExceptionCaught(cause);
// If oom will close the read event, release connection.
// See https://github.com/netty/netty/issues/10434
if (close || cause instanceof OutOfMemoryError || cause instanceof IOException)
closeOnRead(pipeline);
由于没有Handler实现exceptionCaught方法,最后运行TailContext的exceptionCaught方法:
public class DefaultChannelPipeline implements ChannelPipeline
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
onUnhandledInboundException(cause);
其处理exceptionCaught的方法就是打印:
public class DefaultChannelPipeline implements ChannelPipeline
protected void onUnhandledInboundException(Throwable cause)
try
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
finally
ReferenceCountUtil.release(cause);
为了避免服务端打印大段报错,直接在childHandler中重写exceptionCaught方法即可:
public class ServerHandler extends ChannelInboundHandlerAdapter
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
System.out.println(cause.getLocalizedMessage());
//super.exceptionCaught(ctx, cause);
解决方法:为了避免服务端还没有处理客户端请求,客户端就断开连接,客户端可以再退出前,阻塞一段时间,一般阻塞1s即可,但是如果网络较差,可以适当增大。如下所示:
此时服务端也没有报错了:
3.6 基于netty实现远程调用
3.6.1 Demo设计思路
客户端向服务端发送请求,需要记录服务端将处理的类和方法,输入参数类型和数据参数,服务端编写服务的类和接口,服务端反序列化调用服务类和方法,服务端返回执行的结果给客户端。
3.6.2 Demo实现
客户端构建请求,并序列化,发送请求:
public class Client
public static void main(String[] args)
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
try
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
ch.pipeline().addLast(new ClientHandler());
ch.pipeline().addLast(new LengthFieldPrepender(2, false));
);
ChannelFuture channelFuture = bootstrap.connect("localhost", 8088).sync();
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
//指定服务端的接口,方法,输入参数类型和输入参数
MsgTemp temp = new MsgTemp("HelloService", "sayHello", new Class[]String.class, Integer.class, new Object[]"hello", 18);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
//序列化
objectOutputStream.writeObject(temp);
buffer.writeBytes(byteArrayOutputStream.toByteArray());
ChannelFuture future = channelFuture.channel().writeAndFlush(buffer);
future.channel().closeFuture().sync();
catch (Exception e)
System.out.println(e);
finally
workerGroup.shutdownGracefully();
服务端定义RPC接口及其实现类:
//接口
public interface HelloService
String sayHello(String name, Integer age);
//实现类
public class HelloServiceImpl implements HelloService
@Override
public String sayHello(String name, Integer age)
return "hello " + name + " " + age;
客户端请求的是接口,服务端增加一个类,用于记录接口名与实现类的映射:
public final class ClassRegister
private static Map<String, Class> registerMap;
public static void register(String className, Class classObject)
//如果还没有经过注册,就初始化注册表,并进行默认注册
if (registerMap == null)
registerMap = new HashMap<>();
preRegister();
registerMap.put(className, classObject);
private static void preRegister()
ClassRegister.register("HelloService", HelloServiceImpl.class);
public static Class get(String className)
//如果还没有经过注册,就初始化注册表,并进行默认注册
if (registerMap == null)
registerMap = new HashMap<>();
preRegister();
return registerMap.get(className);
有了上述接口名与实现类的对应关系,服务端可以直接反射调用方法,将结果返回给客户端:
public class ServerHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if(msg instanceof ByteBuf)
//反序列化
byte[] array = new byte[((ByteBuf)msg).capacity()];
((ByteBuf) msg).readBytes(array);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(array);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
MsgTemp request = (MsgTemp) objectInputStream.readObject();
//获取请求内容
String className = request.getClassName();
String methodName = request.getMethodName();
Object[] params = request.getParams();
Class[] parmType = request.getParmType();
//反射执行对应方法
Class clazz = ClassRegister.get(className);
Method method = clazz.getMethod(methodName, parmType);
//直接就返回String类型了,不判断了
//调用方法
String res = (String) method.invoke(clazz.newInstance(), params);
System.out.println("send msg: " + res);
//返回调用方法的值
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(res.getBytes(StandardCharsets.UTF_8));
ctx.channel().writeAndFlush(buffer);
super.channelRead(ctx, msg);
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
//System.out.println(cause.getLocalizedMessage());
super.exceptionCaught(ctx, cause);
客户端解析并打印服务端的返回结果:
public class ClientHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
//打印
if(msg instanceof ByteBuf)
String res = ((ByteBuf) msg).toString(Charset.defaultCharset());
System.out.println("received msg: " + res);
super.channelRead(ctx, msg);
3.6.3 拆包问题
TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区 的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成 一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。如下,发现本来要读取1024个字节的数据包,结果只有373个字节可以处理,这就是拆包问题:
为了解决这个问题,在pipeline中增加LengthFieldBasedFrameDecoder解码器和LengthFieldPrepender编码器:
public class Server
public static void main(String[] args)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);
try
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
ch.pipeline().addLast(new ServerHandler());
ch.pipeline().addLast(new LengthFieldPrepender(2, false));
);
ChannelFuture future = serverBootstrap.bind(8088).sync();
future.channel().closeFuture().sync();
catch (Exception e)
e.printStackTrace();
finally
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
客户端同样增加编码器和解码器:
public class Client
public static void main(String[] args)
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
try
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
ch.pipeline().addLast(new ClientHandler());
ch.pipeline().addLast(new LengthFieldPrepender(2, false));
);
ChannelFuture channelFuture = bootstrap.connect("localhost", 8088).sync();
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
MsgTemp temp = new MsgTemp("HelloService", "sayHello", new Class[]String.class, Integer.class, new Object[]"hello", 18);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(temp);
buffer.writeBytes(byteArrayOutputStream.toByteArray());
ChannelFuture future = channelFuture.channel().writeAndFlush(buffer);
future.channel().closeFuture().sync();
catch (Exception e)
System.out.println(e);
finally
workerGroup.shutdownGracefully();
LengthFieldPrepender编码器
编码器LengthFieldPrepender最终的API如下:
public LengthFieldPrepender(
ByteOrder byteOrder, int lengthFieldLength,
int lengthAdjustment, boolean lengthIncludesLengthFieldLength);
它会在要发送的数据包前,加上数据长度前缀,这样应用端根据数据长度,就知道取多少数据了。就不会发生拆包的问题了。具体参数如下:
- ByteOrder表示字节顺序是大端模式;
- lengthFieldLength表示长度字段的大小,上述例子中指定2Byte;
- lengthAdjustment表示长度调整值,一般为0,当实际传输的数据需要增加或者减少时,设置该参数。
- initialBytesToStrip表示需要跳过的字节数,比如开头2个字节的长度字段需要跳过。
LengthFieldBasedFrameDecoder解码器:
LengthFieldBasedFrameDecoder会解码TCP数据流,其API如下所示:
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip);
- maxFrameLength表示一个包最大长度;
- lengthFieldOffset表示长度域开始的地方;
- lengthFieldLength表示长度字段的大小,上述例子中指定2Byte;
- lengthAdjustment表示长度调整值,一般为0,当实际传输的数据需要增加或者减少时,设置该参数。
- initialBytesToStrip表示需要跳过的字节数,比如开头2个字节的长度字段需要跳过。
增加编码器和解码器后,上述远程调用运行正常:
服务端解析到用户请求,并成功调用实现类方法:
客户端收到服务端的响应:
3.6.5 客户端实现代理进行远程调用
对于客户端来说,程序员不仅需要提供要发送的数据,还要构建netty客户端代码。这非常麻烦,希望构建netty客户端代码的部分能够省略掉,让远程调用更简洁。这就需要使用到JDK动态代理了。如下,传入要代理的对象的类名,实现InvocationHandler类,最终调用代理类就是调用InvocationHandler实现:
public class ProxyFactory
public static <T> T getProxy(Class interfaceClass)
Object proxy = Proxy.newProxyInstance(ProxyFactory.class.getClassLoader(), new Class[]interfaceClass, new InvocationHandler()
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
try
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
ch.pipeline().addLast(new ClientHandler());
ch.pipeline().addLast(new LengthFieldPrepender(2, false));
);
ChannelFuture channelFuture = bootstrap.connect("localhost", 8088).sync();
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
MsgTemp temp = new MsgTemp(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(temp);
buffer.writeBytes(byteArrayOutputStream.toByteArray());
channelFuture.channel().writeAndFlush(buffer);
//阻塞主线程,等待服务端处理完,再优雅关闭客户端连接
Thread.sleep(1000);
catch (Exception e)
System.out.println(e);
finally
workerGroup.shutdownGracefully();
return null;
);
return (T)proxy;
客户端此时创建代理类,调用代理类方法sayHello,传入参数,最终会执行上述实现的InvovationHandler匿名内部类的invoke方法,即构建netty客户端并发送数据:
public class Client
public static void main(String[] args)
HelloService proxy = ProxyFactory.getProxy(HelloService.class);
proxy.sayHello("yuci", 18);
此时研究下动态代理过程:Proxy#newProxyInstance方法,它接受要代理的接口和InvocationHandler实现。在方法里面先生成代理类的字节码,再创建动态代理对象:
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
throws IllegalArgumentException
Objects.requireNonNull(h);
final Class<?>[] intfs = interfaces.clone();
final SecurityManager sm = System.getSecurityManager();
if (sm != null)
checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
/*
* Look up or generate the designated proxy class.
*/
//生成动态代理类二进制代码
Class<?> cl = getProxyClass0(loader, intfs);
/*
* Invoke its constructor with the designated invocation handler.
*/
try
if (sm != null)
checkNewProxyPermission(Reflection.getCallerClass(), cl);
//创建动态代理对象的构造器
final Constructor<?> cons = cl.getConstructor(constructorParams);
final InvocationHandler ih = h;
if (!Modifier.isPublic(cl.getModifiers()))
AccessController.doPrivileged(new PrivilegedAction<Void>()
public Void run()
cons.setAccessible(true);
return null;
);
//通过构造器创建动态代理对象
return cons.newInstance(new Object[]h);
catch (IllegalAccessException|InstantiationException e)
throw new InternalError(e.toString(), e);
catch (InvocationTargetException e)
Throwable t = e.getCause();
if (t instanceof RuntimeException)
throw (RuntimeException) t;
else
throw new InternalError(t.toString(), t);
catch (NoSuchMethodException e)
throw new InternalError(e.toString(), e);
通过设置sun.misc.ProxyGenerator.saveGeneratedFiles
属性为true保存动态代理生成的代码:
public static void main(String[] args)
System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles","true");
HelloService e = ProxyFactory.getProxy(HelloService.class);
e.sayHello("h",1);
从上面看到,在创建代理对象时,将实现的InvocationHandler传到了动态代理类中。这样,当程序结束时,会保存生成的类com.sun.proxy.$Proxy0。代理类已经继承了Proxy,无法继承其他类,只能实现接口,所以JDK动态代理只能代理接口,不能代理类。调用sayHello方法,实际就是调用实现的InvocationHandler方法。生成的动态代理类如下所示:
public final class $Proxy0 extends Proxy implements HelloService
private static Method m1;
private static Method m2;
private static Method m3;
private static Method m0;
public $Proxy0(InvocationHandler var1) throws
super(var1);
public final String sayHello(String var1, Integer var2) throws
try
return (String)super.h.invoke(this, m3, new Object[]var1, var2);
catch (RuntimeException | Error var4)
throw var4;
catch (Throwable var5)
throw new UndeclaredThrowableException(var5);
static
try
m1 = Class.forName("java.lang.Object").getMethod("equals", Class.forName("java.lang.Object"));
m2 = Class.forName("java.lang.Object").getMethod("toString");
m3 = Class.forName("HelloService").getMethod("sayHello", Class.forName("java.lang.String"), Class.forName("java.lang.Integer"));
m0 = Class.forName("java.lang.Object").getMethod("hashCode");
catch (NoSuchMethodException var2)
throw new NoSuchMethodError(var2.getMessage());
catch (ClassNotFoundException var3)
throw new NoClassDefFoundError(var3.getMessage());
4. 基于Protobuf序列化的RPC框架
通过protobuf生成的源码可以看到,protobuf只生成了序列化/反序列化类,只需要将上述java序列化类MsgTemp转化为protobuf生成的类进行传输即可,具体的不展开了。GRPC就是基于protobuf+netty实现的RPC框架,后续研究GRPC时,可以看到细节。
5. 总结
可以发现,本文的rpc项目其实就是netty+序列化+反射,了解这几个重点是熟悉rpc的关键。
以上是关于protobuf源码解析与netty+rpc实战的主要内容,如果未能解决你的问题,请参考以下文章
《Elasticsearch 源码解析与优化实战》第15章:Transport模块分析
Spark 1.6 RPC内幕解密:运行机制源码详解Netty与Akka等(DT大数据梦工厂)