一个轻量级分布式RPC框架--NettyRpc

Posted 阿凡卢

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个轻量级分布式RPC框架--NettyRpc相关的知识,希望对你有一定的参考价值。

1、背景

最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章《轻量级分布式 RPC 框架》,作者用Zookeeper、Netty和Spring写了一个轻量级的分布式RPC框架。花了一些时间看了下他的代码,写的干净简单,写的RPC框架可以算是一个简易版的dubbo。这个RPC框架虽小,但是麻雀虽小,五脏俱全,有兴趣的可以学习一下。

本人在这个简易版的RPC上添加了如下特性:

  • 异步调用,支持Future机制,支持回调函数callback
  • 客户端使用TCP长连接(在多次调用共享连接)
  • TCP心跳连接检测
  • 服务端异步多线程处理RPC请求
  • 支持不同的load balance策略
  • 支持不同的序列化/反序列化

项目地址:https://github.com/luxiaoxun/NettyRpc

2、简介

RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一样。RPC可以很好的解耦系统,如WebService就是一种基于Http协议的RPC。

这个RPC整体框架如下:

这个RPC框架使用的一些技术所解决的问题:

服务发布与订阅:服务端使用Zookeeper注册服务地址,客户端从Zookeeper获取可用的服务地址。

通信:使用Netty作为通信框架。

Spring:使用Spring配置服务,加载Bean,扫描注解。

动态代理:客户端使用代理模式透明化服务调用。

消息编解码:使用Protostuff序列化和反序列化消息。

3、服务端发布服务

使用注解标注要发布的服务

服务注解

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
    Class<?> value();
}

一个服务接口:

public interface HelloService {
    String hello(String name);
    String hello(Person person);
}

一个服务实现:使用注解标注

@RpcService(HelloService.class)
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String name) {
        return "Hello! " + name;
    }

    @Override
    public String hello(Person person) {
        return "Hello! " + person.getFirstName() + " " + person.getLastName();
    }
}

服务在启动的时候扫描得到所有的服务接口及其实现:

@Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
        if (MapUtils.isNotEmpty(serviceBeanMap)) {
            for (Object serviceBean : serviceBeanMap.values()) {
                String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                handlerMap.put(interfaceName, serviceBean);
            }
        }
    }

在Zookeeper集群上注册服务地址:

    private void createNode(ZooKeeper zk, String data) {
        try {
            byte[] bytes = data.getBytes();
            //Must be a EPHEMERAL node
            String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            logger.debug("Create zookeeper node ({} => {})", path, data);
            logger.info("Registry new service: " + data);
        } catch (KeeperException e) {
            logger.error(e.toString());
        } catch (InterruptedException ex) {
            logger.error(ex.toString());
        }
    }

这里在原文的基础上加了AddRootNode()判断服务父节点是否存在,如果不存在则添加一个PERSISTENT的服务父节点,这样虽然启动服务时多了点判断,但是不需要手动命令添加服务父节点了。

关于Zookeeper的使用原理,可以看这里《ZooKeeper基本原理》。

4、客户端调用服务

使用代理模式调用服务:

    @SuppressWarnings("unchecked")
    public static <T> T createService(Class<T> interfaceClass) {
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new ObjectProxy<T>(interfaceClass)
        );
    }

    public static <T> RpcService createAsyncService(Class<T> interfaceClass) {
        return new ObjectProxy<T>(interfaceClass);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (Object.class == method.getDeclaringClass()) {
            String name = method.getName();
            if ("equals".equals(name)) {
                return proxy == args[0];
            } else if ("hashCode".equals(name)) {
                return System.identityHashCode(proxy);
            } else if ("toString".equals(name)) {
                return proxy.getClass().getName() + "@" +
                        Integer.toHexString(System.identityHashCode(proxy)) +
                        ", with InvocationHandler " + this;
            } else {
                throw new IllegalStateException(String.valueOf(method));
            }
        }

        RpcRequest request = new RpcRequest();
        request.setRequestId(UUID.randomUUID().toString());
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParameters(args);

        RpcClientHandler handler = ConnectManage.getInstance().chooseHandler();
        RpcFuture rpcFuture = handler.sendRequest(request);
        return rpcFuture.get();
    }

这里每次使用代理远程调用服务,从Zookeeper上获取可用的服务地址,通过RpcClient send一个Request,等待该Request的Response返回。

这里原文有个比较严重的bug,在原文给出的简单的Test中是很难测出来的,原文使用了obj的wait和notifyAll来等待Response返回,会出现“假死等待”的情况:一个Request发送出去后,在obj.wait()调用之前可能Response就返回了,这时候在channelRead0里已经拿到了Response并且obj.notifyAll()已经在obj.wait()之前调用了,这时候send后再obj.wait()就出现了假死等待,客户端就一直等待在这里。使用CountDownLatch可以解决这个问题。

注意:这里每次调用的send时候才去和服务端建立连接,使用的是短连接,这种短连接在高并发时会有连接数问题,也会影响性能,优化后使用TCP长连接进行通信。

从Zookeeper上获取服务地址:

    private void watchNode(final ZooKeeper zk) {
        try {
            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        watchNode(zk);
                    }
                }
            });
            List<String> dataList = new ArrayList<>();
            for (String node : nodeList) {
                byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
                dataList.add(new String(bytes));
            }
            logger.debug("Node data: {}", dataList);
            this.dataList = dataList;

            logger.debug("Service discovery triggered updating connected server node.");
            //Update the service info based on the latest data
            UpdateConnectedServer();

        } catch (KeeperException | InterruptedException e) {
            logger.error("", e);
        }
    }

每次服务地址节点发生变化,都需要再次watchNode,获取新的服务地址列表。

5、消息编码

请求消息:RpcRequest

响应消息:RpcResponse

消息序列化和反序列化接口:Serializer

由于处理的是TCP消息,本人加了TCP的粘包处理Handler

channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0))

消息编解码时开始4个字节表示消息的长度,也就是消息编码的时候,先写消息的长度,再写消息。

6、性能改进

1)服务端请求异步处理

Netty本身就是一个高性能的网络框架,从网络IO方面来说并没有太大的问题。

从这个RPC框架本身来说,在原文的基础上把Server端处理请求的过程改成了多线程异步:

    public void channelRead0(final ChannelHandlerContext ctx, final RpcRequest request) {
        // filter beat ping
        if (Beat.BEAT_ID.equalsIgnoreCase(request.getRequestId())) {
            logger.info("Server read beat-ping.");
            return;
        }

        serverHandlerPool.execute(new Runnable() {
            @Override
            public void run() {
                logger.info("Receive request " + request.getRequestId());
                RpcResponse response = new RpcResponse();
                response.setRequestId(request.getRequestId());
                try {
                    Object result = handle(request);
                    response.setResult(result);
                } catch (Throwable t) {
                    response.setError(t.toString());
                    logger.error("RPC Server handle request error", t);
                }
                ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        logger.info("Send response for request " + request.getRequestId());
                    }
                });
            }
        });
    }

Netty 4中的Handler处理在IO线程中,如果Handler处理中有耗时的操作(如数据库相关),会让IO线程等待,影响性能。

2)客户端长连接的管理

客户端保持和服务进行长连接,不需要每次调用服务的时候进行连接,长连接的管理(通过Zookeeper获取有效的地址)。

通过监听Zookeeper服务节点值的变化,动态更新客户端和服务端保持的长连接。

使用负载均衡(load balance)策略,选取一个服务地址做请求通信。

3)客户端请求异步处理

客户端请求异步处理的支持,不需要同步等待:发送一个异步请求,返回Feature,通过Feature的callback机制获取结果。

RpcService client = rpcClient.createAsyncService(HelloService.class);
RpcFuture helloFuture = client.call("hello", Integer.toString(i));
String result = (String) helloFuture.get(3000, TimeUnit.MILLISECONDS);

项目持续更新中。

项目地址:https://github.com/luxiaoxun/NettyRpc

 

参考:

轻量级分布式 RPC 框架:http://my.oschina.net/huangyong/blog/361751

你应该知道的RPC原理:http://www.cnblogs.com/LBSer/p/4853234.html

 

以上是关于一个轻量级分布式RPC框架--NettyRpc的主要内容,如果未能解决你的问题,请参考以下文章

C++实现轻量级RPC分布式网络通信框架

轻量级分布式 RPC 框架

轻量级分布式 RPC 框架

轻量级分布式 RPC 框架

轻量级分布式 RPC 框架(转)

轻量级分布式 RPC 框架