使用动态代理+Netty+Zookeeper+Protobuff手撸一个RPC框架
Posted 恒哥~Bingo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用动态代理+Netty+Zookeeper+Protobuff手撸一个RPC框架相关的知识,希望对你有一定的参考价值。
RPC是什么
RPC(Remote Procedure Call)远程过程调用,一种计算机之间的远程调用技术,客户端能够在不知道服务器底层的通信架构的情况下调用服务器端的方法,就像调用自身的方法一样。
举个例子:
老婆自己去超市买瓶酱油,这是本地调用
老婆发微信要我去买瓶酱油回来,不管我是开车、打车、骑车、坐地铁去超市,这就是远程过程调用
RPC可以做什么
现在的软件系统规模越来越大,很多采用了微服务架构,就是将系统拆分成了一个个独立的服务,部署在不同的服务器上,如电商系统一般会有商品服务、库存服务、订单服务、支付服务、物流服务、优惠券服务、售后服务等等。
服务和服务之间需要相互通信,如订单服务就需要商品服务提供订单商品的数据,这时就可以通过RPC进行网络通信。
RPC如何实现
进行RPC通信的两台计算机,提供服务的称为服务提供者,调用服务的称为服务消费者,消费者需要知道服务提供者的地址(IP和端口)才能进行调用,这时我们需要一台服务器来保存服务提供者的地址提供给服务消费者调用,这台服务器就是服务注册中心Registry。
这就像你要跟一家公司打电话,但是不知道对方的号码,可以去114查询,因为这些公司都在114上注册了自己的电话号码,114告诉你,就可以联系对方了。
使用Dubbo实现RPC
Dubbo简介
Dubbo是阿里的一款高性能、轻量级的开源Java RPC框架
它提供了三大核心能力:
-
透明化的远程方法调用,就像调用本地方法一样调用远程方法,只需简单配置,没有任何API侵入。
-
软负载均衡及容错机制,可在内网替代F5等硬件负载均衡器,降低成本,减少单点。
-
服务自动注册与发现,注册中心基于接口名查询服务提供者的IP地址,并且能够平滑添加或删除服务提供者。
Dubbo的架构
- Provider: 暴露服务的服务提供方。
- Consumer: 调用远程服务的服务消费方。
- Registry: 服务注册与发现的注册中心。
- Monitor: 统计服务的调用次调和调用时间的监控中心。
- Container: 服务运行容器。
Dubbo简单案例
此案例中的注册中心是Zookeeper,首先需要安装和启动Zookeeper
创建父项目,下面三个子项目:
- common_api 通用的接口
- consumer_service 服务消费者
- provider_service 服务提供者
父项目的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>zookeeper_dubbo_demo</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>common_api</module>
</modules>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>
</project>
通用项目添加接口
消费者和服务者都继承父项目并引入通用项目
<parent>
<groupId>org.example</groupId>
<artifactId>zookeeper_dubbo_demo</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<dependency>
<groupId>com.blb</groupId>
<artifactId>common_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
服务提供者配置文件
server.port=6606
spring.application.name=provider-service
#Dubbo
dubbo.application.name=provider-service
# dubbo的注册中心协议类型
dubbo.registry.protocol=zookeeper
# zookeeper的地址
dubbo.registry.address=zookeeper://192.168.223.223:2181
# dubbo的协议配置
dubbo.protocol.name=dubbo
dubbo.protocol.port=20880
# dubbo扫描接口的包
dubbo.scan.base-packages=com.blb.provider_service.service
服务提供者实现接口
package com.blb.provider_service.service;
import com.blb.api.HelloService;
import org.apache.dubbo.config.annotation.Service;
/**
* 接口实现
* @Service是Dubbo的注解
*/
@Service(version = "1.0.0",interfaceClass = HelloService.class)
public class HelloServiceImpl implements HelloService
@Override
public String hello(String name)
return "Hello,我是服务提供者!" + name;
服务消费者的配置文件
server.port=7707
dubbo.application.name=consumer-service
# 服务消费者注册同一个Zookeeper
dubbo.registry.protocol=zookeeper
dubbo.registry.address=zookeeper://192.168.223.223:2181
服务消费者调用服务
@RestController
public class HelloController
//实现远程方法调用 RPC
@Reference(version = "1.0.0")
private HelloService helloService;
@RequestMapping("/hello")
public String hello(String name)
//调用远程方法
return helloService.hello(name);
手撸RPC框架
案例简介
在前面的案例中使用了Dubbo实现RPC,如果想要更加深入的掌握Dubbo实现RPC底层原理,可以自己完成一个简单的RPC框架
案例使用的技术栈:
- Netty 基于NIO的网络通信框架,Dubbo底层也是使用此框架
- Zookeeper 服务注册和发现
- Protobuf 轻量级的序列化框架,实现通信协议的序列化
基本流程
执行流程:
- 服务端实现被调用的接口,启动后注册地址到Zookeeper上
- 客户端调用接口
- 接下来的步骤要对用户透明,就像调用本地接口,所以要生成动态代理
- 将接口类型、方法、参数等封装成请求协议,使用Protobuf序列化工具序列化
- 使用Netty编码器对请求进行编码
- 从Zookeeper中查询服务端的IP和端口
- 通过Netty发送网络请求
- 服务端接收Netty的网络请求
- 使用Netty解码器对请求进行解码
- 通过Protobuf反序列化获得请求协议
- 读取请求协议中的接口类型、方法和参数,通过反射调用服务端接口
- 客户端获得返回的结果
** RPC就是将3~11步骤封装起来,对用户透明,减少网络程序调用的复杂性 **
项目结构
依赖坐标
<!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
<!-- protostuff -->
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.6.0</version>
</dependency>
<!-- ZooKeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<!-- Apache Commons Collections -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.0</version>
</dependency>
<!-- Objenesis -->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<!-- CGLib -->
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
<version>3.1</version>
</dependency>
通用代码
common/service/IDiff
package com.xray.rpc.common.service;
/**
* 用于测试的减法接口
*/
public interface IDiff
double diff(double a, double b);
common/service/ISum
package com.xray.rpc.common.service;
/**
* 用于测试的加法接口
*/
public interface ISum
public int sum(int a, int b);
common/registry/Constant
package com.xray.rpc.common.registry;
/**
* 常量接口
*/
public interface Constant
//Zookeeper连接超时
int ZK_SESSION_TIMEOUT = 10000;
//zk地址
String ZK_CONNECT = "127.0.0.1:2181";
//zk保存路径
String ZK_REGISTRY_PATH = "/registry";
//zk保存节点
String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";
//ip和端口的分隔符号
String ZK_IP_SPLIT = ":";
common/registry/ServiceDiscovery
package com.xray.rpc.common.registry;
import io.netty.util.internal.ThreadLocalRandom;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 服务发现:连接ZK,添加watch事件
*/
public class ServiceDiscovery
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
private final CountDownLatch latch = new CountDownLatch(1);
private volatile List<String> dataList = new ArrayList<>();
private final String registryAddress;
public ServiceDiscovery(String registryAddress)
this.registryAddress = registryAddress;
//连接Zookeeper
ZooKeeper zk = connectServer();
if (zk != null)
//监听registry/data节点
watchNode(zk);
/**
* 读取Zookeeper中的服务端地址
* @return
*/
public String discover()
String data = null;
int size = dataList.size();
if (size > 0)
if (size == 1)
data = dataList.get(0);
LOGGER.debug("using only data: ", data);
else
data = dataList.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("using random data: ", data);
return data;
/**
* 连接Zookeeper服务
* @return
*/
private ZooKeeper connectServer()
ZooKeeper zk = null;
try
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher()
@Override
public void process(WatchedEvent event)
if (event.getState() == Event.KeeperState.SyncConnected)
latch.countDown();
);
latch.await();
catch (IOException | InterruptedException e)
LOGGER.error("", e);
return zk;
/**
* 监听节点修改
* @param zk
*/
private void watchNode(final ZooKeeper zk)
try
//给注册节点添加监听器
List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher()
@Override
public void process(WatchedEvent event)
if (event.getType() == Event.EventType.NodeChildrenChanged)
watchNode(zk);
);
//一旦节点发生改变,就读取地址到dataList中
List<String> dataList = new ArrayList<>();
for (String node : nodeList)
byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
LOGGER.debug("node data: ", dataList);
this.dataList = dataList;
catch (KeeperException | InterruptedException e)
LOGGER.error("", e);
common/registry/ServiceRegistry
package com.xray.rpc.common.registry;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* 连接ZK注册中心,创建服务注册目录
*/
public class ServiceRegistry
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
private final CountDownLatch latch = new CountDownLatch(1);
private ZooKeeper zk;
public ServiceRegistry()
/**
* 注册服务端地址到Zookeeper中
* @param data
*/
public void register(String data)
if (data != null)
zk = connectServer();
if (zk != null)
try
Stat stat = zk.exists(Constant.ZK_REGISTRY_PATH, false);
if (stat == null)
zk.create(Constant.ZK_REGISTRY_PATH, new byte[], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
stat = zk.exists(Constant.ZK_DATA_PATH, false);
if (stat == null)
zk.create(Constant.ZK_DATA_PATH, new byte[], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//保存服务端地址
zk.setData(Constant.ZK_DATA_PATH, data.getBytes(), -1);
LOGGER.info("create zookeeper node ( => )", Constant.ZK_DATA_PATH, data);
catch (InterruptedException | KeeperException e)
LOGGER.error("Connect Zookeeper Error ", e);
/**
* 连接Zookeeper
* @return
*/
private ZooKeeper connectServer()
ZooKeeper zk = null;
try
zk = new ZooKeeper(Constant.ZK_CONNECT, Constant.ZK_SESSION_TIMEOUT, new Watcher()
@Override
public void process(WatchedEvent event)
// 判断是否已连接ZK,连接后计数器递减.
if (event.getState() == Event.KeeperState.SyncConnected)
latch.countDown();
);
// 若计数器不为0,则等待.
latch.await();
catch (IOException | InterruptedException e)
LOGGER.error("Connect Zookeeper Error ", e);
return zk;
common/codec/RPCRequest
package com.xray.rpc.common.codec;
/**
* 请求协议
*/
public class RPCRequest
//请求id
private String requestId;
//接口类型
private String className;
//方法名
private String methodName;
//方法参数
private Class<?>[] parameterTypes;
//参数值
private Object[] parameters;
public String getRequestId()
return requestId;
public void setRequestId(String requestId)
this.requestId = requestId;
public String getClassName()
return className;
public void setClassName(String className)
this.className = className;
public String getMethodName()
return methodName;
public void setMethodName(String methodName)
this.methodName = methodName;
public Class<?>[] getParameterTypes()
return parameterTypes;
public void setParameterTypes(Class<?>[] parameterTypes)
this.parameterTypes = parameterTypes;
public Object[] getParameters()
return parameters;
public void setParameters(Object[] parameters)
this.parameters = parameters;
common/codec/RPCResponse
package com.xray.rpc.common.codec;
/**
* 请求响应
*/
public class RPCResponse
//请求id
private String requestId;
//错误对象
private Throwable error;
//返回值
private Object result;
public String getRequestId()
return requestId;
public void setRequestId(String requestId)
this.requestId = requestId;
public Throwable getError()
return error;
public void setError(Throwable error)
this.error = error;
public Object getResult()
return result;
public void setResult(Object result)
this.result = result;
common/codec/SerializationUtil
package com.xray.rpc.common.codec;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 基于Protobuff的序列化工具类
*/
public class SerializationUtil
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
private static Objenesis objenesis = new ObjenesisStd(true);
private SerializationUtil()
/**
* 返回协议类型
*/
@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> cls)
Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null)
schema = RuntimeSchema.createFrom(cls);
if (schema != null)
cachedSchema.put(cls, schema);
return schema;
/**
* 序列化方法
*/
@SuppressWarnings("unchecked")
public static <T> byte[] serialize(T obj)
Class<T> cls = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try
Schema<T> schema = getSchema(cls);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
catch (Exception e)
throw new IllegalStateException(e.getMessage(), e);
finally
buffer.clear();
/**
* 反序列化方法
*/
public static <T> T deserialize(byte[] data, Class<T> cls)
try
T message = (T) objenesis.newInstance(cls);
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data, message, schema);
return message;
catch (Exception e)
throw new IllegalStateException(e.getMessage(), e);
common/codec/RPCDecoder
package com.xray.rpc.common.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 请求解码器
*/
public class RPCDecoder extends ByteToMessageDecoder
//泛型类型
private Class<?> genericClass;
public RPCDecoder(Class<?> genericClass)
this.genericClass = genericClass;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
final int length = in.readableBytes();
final byte[] bytes = new byte[length];
//读取字节
in.readBytes(bytes, 0, length);
//反序列化读取请求协议
Object obj = SerializationUtil.deserialize(bytes, genericClass);
out.add(obj);
common/codec/RPCEncoder
package com.xray.rpc.common.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 请求编码器
*/
public class RPCEncoder extends MessageToByteEncoder<Object>
//泛型类型
private Class<?> genericClass;
public RPCEncoder(Class<?> genericClass)
this.genericClass = genericClass;
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception
if (genericClass.isInstance(in))
//序列化请求协议
byte[] data = SerializationUtil.serialize(in);
//发送字节
out.writeBytes(data);
服务端代码
server/impl/DiffImpl
package com.xray.rpc.server.impl;
import com.xray.rpc.common.service.IDiff;
/**
* 接口实现 减法
*/
public class DiffImpl implements IDiff
@Override
public double diff(double a, double b)
return a - b;
server/impl/SumImpl
package com.xray.rpc.server.impl;
import com.xray.rpc.common.service.ISum;
/**
* 加法实现
*/
public class SumImpl implements ISum
@Override
public int sum(int a, int b)
return a + b;
rpc/server/RPCServer
package com.xray.rpc.server;
/**
* RPC服务端
*/
public class RPCServer
/**
* 将可调用的接口名和实现类包装到Map集合中
* @return
*/
private Map<String, Object> getServices()
Map<String, Object> services = new HashMap<String, Object>();
services.put(ISum.class.getName(), new SumImpl());
services.put(IDiff.class.getName(), new DiffImpl());
return services;
/**
* 绑定监听某个端口
* @param port
*/
private void bind(int port)
//创建线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try
//创建启动对象
ServerBootstrap b = new ServerBootstrap();
//配置参数
b.group(bossGroup, workerGroup).channel(NioserverSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2))
//添加RPC解码器
.addLast(new RPCDecoder(RPCRequest.class))
.addLast(new LengthFieldPrepender(2))
//添加RPC编码器
.addLast(new RPCEncoder(RPCResponse.class))
//添加RPC处理器
.addLast(new RPCServerHandler(getServices()));
);
//绑定端口
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
catch (InterruptedException e)
e.printStackTrace();
finally
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
/**
* 获取本机ip
* @return
*/
public static String getAddress()
InetAddress host = null;
try
host = InetAddress.getLocalHost();
catch (UnknownHostException e)
e.printStackTrace();
String address = host.getHostAddress();
return address;
/**
* 初始化服务器
* @param port
*/
public void initService(int port)
ServiceRegistry serviceRegistry = new ServiceRegistry();
String ip = getAddress();
//向zookeeper注册服务地址
serviceRegistry.register(ip + Constant.ZK_IP_SPLIT+port);
bind(port);
rpc/server/RPCServerHandler
package com.xray.rpc.server;
/**
* RPC处理器
*/
public class RPCServerHandler extends ChannelHandlerAdapter
private static final Logger LOGGER = LoggerFactory.getLogger(RPCServerHandler.class);
//本地接口Map
private final Map<String, Object> handlerMap;
public RPCServerHandler(Map<String, Object> handlerMap)
this.handlerMap = handlerMap;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
RPCResponse response = new RPCResponse();
//解码得到RPC协议
RPCRequest request = (RPCRequest) msg;
response.setRequestId(request.getRequestId());
try
//调用本地方法
Object result = handle(request);
response.setResult(result);
catch (Throwable t)
response.setError(t);
//返回响应结果
ctx.writeAndFlush(response);
/**
* 调用本地方法
*/
private Object handle(RPCRequest request) throws Throwable
//获得接口类型名
String className = request.getClassName();
//获得接口类型对应的实现对象
Object serviceBean = handlerMap.get(className);
//获得接口类型、方法名、参数、参数值
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
FastClass serviceFastClass = FastClass.create(serviceClass);
//获得方法
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
//调用本地方法
return serviceFastMethod.invoke(serviceBean, parameters);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
LOGGER.error("server caught exception", cause);
ctx.close();
rpc/server/RPCServiceTest
package com.xray.rpc.server;
public class RPCServiceTest
public static void main(String[] args)
//启动服务器测试
int port = 9090;
new RPCServer().initService(port);
客户端代码
rpc/client/RPCProxy
package com.xray.rpc.client;
import com.xray.rpc.common.codec.RPCRequest;
import com.xray.rpc.common.codec.RPCResponse;
import com.xray.rpc.common.registry.Constant;
import com.xray.rpc.common.registry.ServiceDiscovery;
import net.sf.cglib.proxy.InvocationHandler;
import net.sf.cglib.proxy.Proxy;
import java.lang.reflect.Method;
import java.util.UUID;
/**
* RPC动态代理类
*/
public class RPCProxy
private String serverAddress;
private ServiceDiscovery serviceDiscovery;
public RPCProxy(ServiceDiscovery serviceDiscovery)
this.serviceDiscovery = serviceDiscovery;
/**
* 创建RPC代理对象
*/
@SuppressWarnings("unchecked")
public <T> T create(Class<?> interfaceClass)
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] interfaceClass ,
new InvocationHandler()
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
// 创建并初始化 RPC 请求
RPCRequest request = new RPCRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// 通过Zookeeper查询服务端地址
if (serviceDiscovery != null)
serverAddress = serviceDiscovery.discover();
// 分割出IP和端口 "123.23.213.23:9090"
String[] array = serverAddress.split(Constant.ZK_IP_SPLIT);
String host = array[0];
int port = Integer.parseInt(array[1]);
// 初始化 RPC客户端
RPCClient client = new RPCClient(host, port);
// 发送网络请求
RPCResponse response = client.send(request);
// 返回结果或错误信息
if (response.getError() != null)
throw response.getError();
else
return response.getResult();
);
rpc/client/RPCClient
package com.xray.rpc.client;
/**
* RPC客户端
*/
public class RPCClient
private final String host;
private final int port;
private final CountDownLatch latch;
public RPCClient(String host,int port)
this.host = host;
this.port = port;
this.latch = new CountDownLatch(1);
/**
* 发出RPC请求
* @param request
* @return
*/
public RPCResponse send(RPCRequest request)
//自定义RPC客户端处理器
RPCClientHandler handler = new RPCClientHandler(request,latch);
//创建netty启动对象
EventLoopGroup group = new NioEventLoopGroup();
RPCResponse response = null;
try
Bootstrap b = new Bootstrap().group(group)
//配置参数
.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
//设置处理器
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2))
//设置RPC解码器
.addLast(new RPCDecoder(RPCResponse.class))
.addLast(new LengthFieldPrepender(2))
//设置RPC编码器
.addLast(new RPCEncoder(RPCRequest.class))
//设置RPC请求处理器
.addLast(handler);
);
//连接服务端
ChannelFuture f = b.connect(host, port).sync();
//阻塞线程,直到处理器读取到服务端数据为止
latch.await();
//获得服务端响应
response = handler.getResponse();
if(response != null)
f.channel().close();
catch (InterruptedException e)
e.printStackTrace();
finally
group.shutdownGracefully();
return response;
rpc/client/RPCClientHandler
package com.xray.rpc.client;
import com.xray.rpc.common.codec.RPCRequest;
import com.xray.rpc.common.codec.RPCResponse;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CountDownLatch;
/**
* 客户端处理器
*/
public class RPCClientHandler extends ChannelHandlerAdapter
//请求对象
private RPCRequest request;
//响应对象
private RPCResponse response;
//RPCClient传来的latch
private final CountDownLatch latch;
public RPCClientHandler(RPCRequest request, CountDownLatch latch)
this.request = request;
this.latch = latch;
public RPCResponse getResponse()
return response;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
ctx.writeAndFlush(request);
//读取服务端消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
//获得消息,并解码为响应结果
response = (RPCResponse) msg;
//让client阻塞的线程继续执行
latch.countDown();
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
ctx.flush();
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.getCause().printStackTrace();
ctx.close();
rpc/client/RPCTest
package com.xray.rpc.client;
import com.xray.rpc.common.service.IDiff;
import com.xray.rpc.common.service.ISum;
import com.xray.rpc.common.registry.Constant;
import com.xray.rpc.common.registry.ServiceDiscovery;
public class RPCTest
public static void main(String[] args)
//创建动态代理对象
RPCProxy rpcProxy = new RPCProxy(new ServiceDiscovery(Constant.ZK_CONNECT));
//创建接口的动态代理
IDiff diff = rpcProxy.create(IDiff.class);
//通过动态代理调用远程方法
double result = diff.diff(1321, 32.2);
//同上
ISum sum = rpcProxy.create(ISum.class);
int result2 = sum.sum(1000, 1000);
System.out.println(result+":"+result2);
总结
通过文章我们了解到RPC是一种计算机的远程调用技术,能够像调用本地方法那样,简单的调用远程服务器上的方法,大大减少了服务器之间数据通信的复杂性。
但是RPC的内部实现是比较复杂的,需要掌握Netty、Zookeeper、Protobuff、动态代理等技术,总而言之是“把简单留给用户,把复杂留给自己”。通过本文最后的案例,我们也大致了解了RPC框架底层的总体逻辑,不过该案例只实现了基本的RPC功能,如果需要像Dubbo那样通过配置注解实现,还需要在后面的版本中进行完善。
以上是关于使用动态代理+Netty+Zookeeper+Protobuff手撸一个RPC框架的主要内容,如果未能解决你的问题,请参考以下文章