如何使用Netty打造工业级RPC框架
Posted tangtong1
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Netty打造工业级RPC框架相关的知识,希望对你有一定的参考价值。
如何使用Netty打造工业级RPC框架
郑重声明:此框架旨在理解rpc框架的本质,切勿直接在生产环境使用,否则后果自负。
rpc框架的几点疑问
如何通信?
两个服务之间的通信方式有很多,比如http、tcp、websocket、http2.0(stream),每种方式都有优秀和弊端的地方,比如http是短连接,高并发的时候不合适。
这里我们使用Netty来作为我们的传输层,Netty是对底层io的封装,本身就支持多种协议,扩展性好,性能棒,我们直接使用它的nio传输方式即可。
如何像调用本地服务一样无感?
首先,客户端与服务端需要有共同的接口,服务端实现业务逻辑,客户端需要有一个接口的实例,这个实例我们怎么构造呢?其实很简单,通过动态代理即可,动态代理的实现方式有很多,比如jdk proxy,javasist,cglib等,其中jdk proxy方便学习,所以我们这里直接使用jdk proxy作为我们的动态代理部分。
我们通过jdk proxy生成目标接口的一个实例,这实例内部封装接口调用的参数、类型等信息,通过传输层发送到远程服务端,远程服务端通过反射拿到具体的实现类,通过反射调用其对应的方法,拿到结果,再把结果封装通过传输层再传回客户端,客户端拿到结果展示即可。
客户端如何动态感知服务端?
通过zookeeper的监听机制,我们可以动态感知一个服务的上线下线,一个客户端可同时监听多个服务端,一个服务端下线了,重试其它服务端,我们今天的框架中先不写注册中心相关的东西。
整体流程
通过上图我们可以看到一个rpc框架的全貌,基本包括代理层、协议层、传输层等几个部分,其中协议层又包括编解码、序列化反序列化等部分,代理层又包括客户端的代理和服务端的代理(Invoker),下面我们一一来介绍并编写相关部分的代码。
支持层
这一层实际并不存在,是不在上图中的其它的一些对象的统称,你可以在看主体流程的过程中,回来查看相应的数据结构,这部分对象包括:
- 实际传输的对象,RpcRequest、RpcResponse
RpcRequest.java
package com.coolcoding.rpc.model;
import com.alibaba.fastjson.JSON;
public class RpcRequest
// 唯一标识一次请求
private Long id;
// 服务名称
private String serviceName;
// 方法名称
private String methodName;
// 参数
private Object[] params;
// 参数类型
private Class<?>[] paramTypes;
public Long getId()
return id;
public void setId(Long id)
this.id = id;
public String getServiceName()
return serviceName;
public void setServiceName(String serviceName)
this.serviceName = serviceName;
public String getMethodName()
return methodName;
public void setMethodName(String methodName)
this.methodName = methodName;
public Class<?>[] getParamTypes()
return paramTypes;
public void setParamTypes(Class<?>[] paramTypes)
this.paramTypes = paramTypes;
public Object[] getParams()
return params;
public void setParams(Object[] params)
this.params = params;
@Override
public String toString()
return JSON.toJSONString(this);
RpcResponse.java
package com.coolcoding.rpc.model;
import com.alibaba.fastjson.JSON;
public class RpcResponse
// 唯一标识一个请求,与RpcRequest中对应
private Long id;
// 方法调用的结果
private Object response;
// 异常信息
private Throwable cause;
public Long getId()
return id;
public void setId(Long id)
this.id = id;
public Object getResponse()
return response;
public void setResponse(Object response)
this.response = response;
public Throwable getCause()
return cause;
public void setCause(Throwable cause)
this.cause = cause;
@Override
public String toString()
return JSON.toJSONString(this);
- 参数类型的转换,ConvertUtils
ConvertUtils.java
package com.coolcoding.rpc.convert;
/**
* 转换类型工具类
* 因为json序列化是不带类型的,生产中建议使用Protobuf等序列化框架,double会被转成BigDecimal
*/
public class ConvertUtils
public static Object convert(Object value, Class<?> clz)
// 暂时只支持Double,且不支持小写
if (clz == Double.class)
return new Double(value.toString());
throw new UnsupportedTypeException();
- 支持异步调用,RpcResponseFuture
RpcResponseFuture.java
package com.coolcoding.rpc.future;
import com.coolcoding.rpc.convert.ConvertUtils;
import com.coolcoding.rpc.model.RpcResponse;
import java.util.concurrent.BlockingQueue;
/**
* 用于支持异步获取调用结果
*/
public class RpcResponseFuture
// 使用ThreadLocal保存当前线程的RpcResponseFuture
private static final ThreadLocal<RpcResponseFuture> LOCAL = new ThreadLocal<>();
// 一个队列
private BlockingQueue<RpcResponse> queue;
// 方法调用的返回类型
private Class<?> returyType;
public void setQueue(BlockingQueue<RpcResponse> queue)
this.queue = queue;
public void setReturyType(Class<?> returyType)
this.returyType = returyType;
public static RpcResponseFuture getRpcResponseFuture()
return LOCAL.get();
// 创建一个RpcResponseFuture并把它设置到ThreadLocal中
public static RpcResponseFuture newRpcResponseFuture()
RpcResponseFuture rpcResponseFuture = new RpcResponseFuture();
LOCAL.set(rpcResponseFuture);
return rpcResponseFuture;
// 获取异步调用的结果
public Object get()
if (queue != null)
try
RpcResponse rpcResponse = queue.take();
if (rpcResponse.getCause() != null)
throw rpcResponse.getCause();
return ConvertUtils.convert(rpcResponse.getResponse(), returyType);
catch (Throwable e)
e.printStackTrace();
return null;
- 保存服务
ServiceHolder.java
package com.coolcoding.rpc.invoker;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ServiceHolder
private static Map<String, Object> map = new ConcurrentHashMap<>();
public static void publishService(Class<?> serviceClz, Object service)
map.put(serviceClz.getName(), service);
public static Object getService(String serviceName)
return map.get(serviceName);
传输层
传输层即客户端与服务端之间的直接通信,使用Netty的话很方便,直接上代码。
- 客户端代码
代码比较简单,都在注释里面了,这里主要是要把后面发送数据需要用到的channel保存下来。
NettyClient.java
package com.coolcoding.rpc.client;
import com.coolcoding.rpc.codec.RpcDecoder;
import com.coolcoding.rpc.codec.RpcEncoder;
import com.coolcoding.rpc.model.RpcRequest;
import com.coolcoding.rpc.model.RpcResponse;
import com.coolcoding.rpc.serialize.SerializerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NiosocketChannel;
public class NettyClient
private Channel channel;
// 发送rpc request
public void sendRpcRequest(RpcRequest rpcRequest) throws Exception
try
// 使用channel直接写数据
this.channel.writeAndFlush(rpcRequest).sync();
catch (Exception e)
throw e;
// 启动客户端,连接服务端
public void start(String host, int port)
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try
// 配置参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>()
@Override
public void initChannel(SocketChannel channel) throws Exception
channel.pipeline()
.addLast(new RpcEncoder(SerializerFactory.getSerializer()))
.addLast(new RpcDecoder(SerializerFactory.getSerializer(), RpcResponse.class))
.addLast(new NettyClientHandler());
).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, true);
// 客户端连接到服务端
this.channel = bootstrap.connect(host, port).sync().channel();
System.out.println("conn to server success...");
catch (Exception e)
e.printStackTrace();
finally
// 这里不能把group关了,关了EventLoop就关了,后面发送请求发不出去了。
NettyClientHandle.java
package com.coolcoding.rpc.client;
import com.coolcoding.rpc.model.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 客户端收到响应的处理器
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse>
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception
// 取得发送数据时的绑定的队列,并把返回值放入进去
Waiter.getQueue(msg.getId()).offer(msg);
Waiter.removeQueue(msg.getId());
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
ctx.close();
- 服务端代码
服务端主要是监听端口,设置收到消息的处理器NettyServerHandler。
package com.coolcoding.rpc.server;
import com.coolcoding.rpc.codec.RpcDecoder;
import com.coolcoding.rpc.codec.RpcEncoder;
import com.coolcoding.rpc.invoker.InvokerFactory;
import com.coolcoding.rpc.model.RpcRequest;
import com.coolcoding.rpc.serialize.SerializerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer
private final int port;
public NettyServer(int port)
this.port = port;
// 启动服务端
public void start()
// 接受连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 处理业务
EventLoopGroup workerGroup = new NioEventLoopGroup();
try
// 配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
public void initChannel(SocketChannel channel) throws Exception
channel.pipeline()
.addLast(new RpcDecoder(SerializerFactory.getSerializer(), RpcRequest.class))
.addLast(new RpcEncoder(SerializerFactory.getSerializer()))
.addLast(new NettyServerHandler(InvokerFactory.getInvoker()));
).option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,启动监听
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("server started...");
// 这里会阻塞
future.channel().closeFuture().sync();
catch (Exception e)
e.printStackTrace();
finally
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
NettyServerHandler.java
package com.coolcoding.rpc.server;
import com.coolcoding.rpc.model.RpcRequest;
import com.coolcoding.rpc.invoker.Invoker;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 服务端收到请求的处理器
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest>
/**
* 调用器
*/
private Invoker invoker;
public NettyServerHandler(Invoker invoker)
this.invoker = invoker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception
// 通过调用器调用到具体的服务进行处理
ctx.writeAndFlush(invoker.invoke(msg));
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
ctx.close();
协议层
协议层主要是对请求响应的编解码、序列化等操作,当然,像dubbo那种复杂的框架还包含对传输协议等的封装。
编解码器
RpcEncoder.java
package com.coolcoding.rpc.codec;
import com.coolcoding.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
以上是关于如何使用Netty打造工业级RPC框架的主要内容,如果未能解决你的问题,请参考以下文章
#yyds干货盘点# 基于Netty,20分钟手写一个RPC框架