RPC基本原理以及如何用Netty来实现RPC

Posted 清幽之地的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC基本原理以及如何用Netty来实现RPC相关的知识,希望对你有一定的参考价值。

前言

在微服务大行其道的今天,分布式系统越来越重要,实现服务化首先就要考虑服务之间的通信问题。这里面涉及序列化、反序列化、寻址、连接等等问题。。不过,有了RPC框架,我们就无需苦恼。

一、什么是RPC?

RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。

值得注意是,两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样。 

RPC框架有很多,比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift。当然了,还有Spring Cloud,不过对于Spring Cloud来说,RPC只是它的一个功能模块。

复杂的先不讲,如果要实现一个基本功能、简单的RPC,要涉及哪些东西呢?

  • 动态代理

  • 反射

  • 序列化、反序列化

  • 网络通信

  • 编解码

  • 服务发现和注册

  • 心跳与链路检测

  • ......

下面我们一起通过代码来分析,怎么把这些技术点串到一起,实现我们自己的RPC。

二、环境准备

在开始之前,笔者先介绍一下所用到的软件环境。

SpringBoot、Netty、zookeeper、zkclient、fastjson

  • SpringBoot 项目的基础框架,方便打成JAR包,便于测试。

  • Netty 通信服务器

  • zookeeper 服务的发现与注册

  • zkclient zookeeper客户端

  • fastjson 序列化、反序列化

三、RPC生产者

1、服务接口API

整个RPC,我们分为生产者和消费者。首先它们有一个共同的服务接口API。在这里,我们搞一个操作用户信息的service接口。

 
   
   
 
  1. public interface InfoUserService {

  2.    List<InfoUser> insertInfoUser(InfoUser infoUser);

  3.    InfoUser getInfoUserById(String id);

  4.    void deleteInfoUserById(String id);

  5.    String getNameById(String id);

  6.    Map<String,InfoUser> getAllUser();

  7. }

2、服务类实现

作为生产者,它当然要有实现类,我们创建InfoUserServiceImpl实现类,并用注解把它标注为RPC的服务,然后注册到Spring的Bean容器中。在这里,我们把infoUserMap当做数据库,存储用户信息。

 
   
   
 
  1. package com.viewscenes.netsupervisor.service.impl;


  2. @RpcService

  3. public class InfoUserServiceImpl implements InfoUserService {


  4.    Logger logger = LoggerFactory.getLogger(this.getClass());

  5.    //当做数据库,存储用户信息

  6.    Map<String,InfoUser> infoUserMap = new HashMap<>();


  7.    public List<InfoUser> insertInfoUser(InfoUser infoUser) {

  8.        logger.info("新增用户信息:{}", JSONObject.toJSONString(infoUser));

  9.        infoUserMap.put(infoUser.getId(),infoUser);

  10.        return getInfoUserList();

  11.    }

  12.    public InfoUser getInfoUserById(String id) {

  13.        InfoUser infoUser = infoUserMap.get(id);

  14.        logger.info("查询用户ID:{}",id);

  15.        return infoUser;

  16.    }


  17.    public List<InfoUser> getInfoUserList() {

  18.        List<InfoUser> userList = new ArrayList<>();

  19.        Iterator<Map.Entry<String, InfoUser>> iterator = infoUserMap.entrySet().iterator();

  20.        while (iterator.hasNext()){

  21.            Map.Entry<String, InfoUser> next = iterator.next();

  22.            userList.add(next.getValue());

  23.        }

  24.        logger.info("返回用户信息记录数:{}",userList.size());

  25.        return userList;

  26.    }

  27.    public void deleteInfoUserById(String id) {

  28.        logger.info("删除用户信息:{}",JSONObject.toJSONString(infoUserMap.remove(id)));

  29.    }

  30.    public String getNameById(String id){

  31.        logger.info("根据ID查询用户名称:{}",id);

  32.        return infoUserMap.get(id).getName();

  33.    }

  34.    public Map<String,InfoUser> getAllUser(){

  35.        logger.info("查询所有用户信息{}",infoUserMap.keySet().size());

  36.        return infoUserMap;

  37.    }

  38. }

元注解定义如下:

 
   
   
 
  1. package com.viewscenes.netsupervisor.annotation;


  2. @Target({ElementType.TYPE})

  3. @Retention(RetentionPolicy.RUNTIME)

  4. @Component

  5. public @interface RpcService {}

3、请求信息和返回信息

所有的请求信息和返回信息,我们用两个JavaBean来表示。其中的重点是,返回信息要带有请求信息的ID。

 
   
   
 
  1. package com.viewscenes.netsupervisor.entity;

  2. public class Request {

  3.    private String id;

  4.    private String className;// 类名

  5.    private String methodName;// 函数名称

  6.    private Class<?>[] parameterTypes;// 参数类型

  7.    private Object[] parameters;// 参数列表

  8.    get/set ...

  9. }

 
   
   
 
  1. package com.viewscenes.netsupervisor.entity;

  2. public class Response {

  3.    private String requestId;

  4.    private int code;

  5.    private String error_msg;

  6.    private Object data;

  7.    get/set ...

  8. }

4、Netty服务端

 
   
   
 
  1. TOMCAT端口

  2. server.port=8001

  3. registry.address=192.168.245.131:2181,192.168.245.131:2182,192.168.245.131:2183

  4. rpc.server.address=192.168.197.1:18868

为了方便管理,我们把它也注册成Bean,同时实现ApplicationContextAware接口,把上面@RpcService注解的服务类捞出来,缓存起来,供消费者调用。同时,作为服务器,还要对客户端的链路进行心跳检测,超过60秒未读写数据,关闭此连接。

 
   
   
 
  1. package com.viewscenes.netsupervisor.netty.server;

  2. @Component

  3. public class NettyServer implements ApplicationContextAware,InitializingBean{


  4.    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

  5.    private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);

  6.    private static final EventLoopGroup workerGroup = new NioEventLoopGroup(4);


  7.    private Map<String, Object> serviceMap = new HashMap<>();


  8.    @Value("${rpc.server.address}")

  9.    private String serverAddress;


  10.    @Autowired

  11.    ServiceRegistry registry;


  12.    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

  13.        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class);

  14.        for(Object serviceBean:beans.values()){

  15.            Class<?> clazz = serviceBean.getClass();

  16.            Class<?>[] interfaces = clazz.getInterfaces();

  17.            for (Class<?> inter : interfaces){

  18.                String interfaceName = inter.getName();

  19.                logger.info("加载服务类: {}", interfaceName);

  20.                serviceMap.put(interfaceName, serviceBean);

  21.            }

  22.        }

  23.        logger.info("已加载全部服务接口:{}", serviceMap);

  24.    }

  25.    public void afterPropertiesSet() throws Exception {

  26.        start();

  27.    }

  28.    public void start(){

  29.        final NettyServerHandler handler = new NettyServerHandler(serviceMap);

  30.        new Thread(() -> {

  31.            try {

  32.                ServerBootstrap bootstrap = new ServerBootstrap();

  33.                bootstrap.group(bossGroup,workerGroup).

  34.                        channel(NioserverSocketChannel.class).

  35.                        option(ChannelOption.SO_BACKLOG,1024).

  36.                        childOption(ChannelOption.SO_KEEPALIVE,true).

  37.                        childOption(ChannelOption.TCP_NODELAY,true).

  38.                        childHandler(new ChannelInitializer<SocketChannel>() {

  39.                            //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件

  40.                            protected void initChannel(SocketChannel channel) throws Exception {

  41.                                ChannelPipeline pipeline = channel.pipeline();

  42.                                pipeline.addLast(new IdleStateHandler(0, 0, 60));

  43.                                pipeline.addLast(new JSONEncoder());

  44.                                pipeline.addLast(new JSONDecoder());

  45.                                pipeline.addLast(handler);

  46.                            }

  47.                        });

  48.                String[] array = serverAddress.split(":");

  49.                String host = array[0];

  50.                int port = Integer.parseInt(array[1]);

  51.                ChannelFuture cf = bootstrap.bind(host,port).sync();

  52.                logger.info("RPC 服务器启动.监听端口:"+port);

  53.                registry.register(serverAddress);

  54.                //等待服务端监听端口关闭

  55.                cf.channel().closeFuture().sync();

  56.            } catch (Exception e) {

  57.                e.printStackTrace();

  58.                bossGroup.shutdownGracefully();

  59.                workerGroup.shutdownGracefully();

  60.            }

  61.        }).start();

  62.    }

  63. }

上面的代码就把Netty服务器启动了,在处理器中的构造函数中,我们先把服务Bean的Map传进来,所有的处理要基于这个Map才能找到对应的实现类。在channelRead中,获取请求方法的信息,然后通过反射调用方法获取返回值。

 
   
   
 
  1. package com.viewscenes.netsupervisor.netty.server;

  2. @ChannelHandler.Sharable

  3. public class NettyServerHandler extends ChannelInboundHandlerAdapter {


  4.    private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

  5.    private final Map<String, Object> serviceMap;


  6.    public NettyServerHandler(Map<String, Object> serviceMap) {

  7.        this.serviceMap = serviceMap;

  8.    }

  9.    public void channelActive(ChannelHandlerContext ctx)   {

  10.        logger.info("客户端连接成功!"+ctx.channel().remoteAddress());

  11.    }

  12.    public void channelInactive(ChannelHandlerContext ctx)   {

  13.        logger.info("客户端断开连接!{}",ctx.channel().remoteAddress());

  14.        ctx.channel().close();

  15.    }

  16.    public void channelRead(ChannelHandlerContext ctx, Object msg)   {

  17.        Request request = JSON.parseObject(msg.toString(),Request.class);


  18.        if ("heartBeat".equals(request.getMethodName())) {

  19.            logger.info("客户端心跳信息..."+ctx.channel().remoteAddress());

  20.        }else{

  21.            logger.info("RPC客户端请求接口:"+request.getClassName()+"   方法名:"+request.getMethodName());

  22.            Response response = new Response();

  23.            response.setRequestId(request.getId());

  24.            try {

  25.                Object result = this.handler(request);

  26.                response.setData(result);

  27.            } catch (Throwable e) {

  28.                e.printStackTrace();

  29.                response.setCode(1);

  30.                response.setError_msg(e.toString());

  31.                logger.error("RPC Server handle request error",e);

  32.            }

  33.            ctx.writeAndFlush(response);

  34.        }

  35.    }

  36.    /**

  37.     * 通过反射,执行本地方法

  38.     * @param request

  39.     * @return

  40.     * @throws Throwable

  41.     */

  42.    private Object handler(Request request) throws Throwable{

  43.        String className = request.getClassName();

  44.        Object serviceBean = serviceMap.get(className);


  45.        if (serviceBean!=null){

  46.            Class<?> serviceClass = serviceBean.getClass();

  47.            String methodName = request.getMethodName();

  48.            Class<?>[] parameterTypes = request.getParameterTypes();

  49.            Object[] parameters = request.getParameters();


  50.            Method method = serviceClass.getMethod(methodName, parameterTypes);

  51.            method.setAccessible(true);

  52.            return method.invoke(serviceBean, getParameters(parameterTypes,parameters));

  53.        }else{

  54.            throw new Exception("未找到服务接口,请检查配置!:"+className+"#"+request.getMethodName());

  55.        }

  56.    }

  57.    /**

  58.     * 获取参数列表

  59.     * @param parameterTypes

  60.     * @param parameters

  61.     * @return

  62.     */

  63.    private Object[] getParameters(Class<?>[] parameterTypes,Object[] parameters){

  64.        if (parameters==null || parameters.length==0){

  65.            return parameters;

  66.        }else{

  67.            Object[] new_parameters = new Object[parameters.length];

  68.            for(int i=0;i<parameters.length;i++){

  69.                new_parameters[i] = JSON.parseObject(parameters[i].toString(),parameterTypes[i]);

  70.            }

  71.            return new_parameters;

  72.        }

  73.    }

  74.    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {

  75.        if (evt instanceof IdleStateEvent){

  76.            IdleStateEvent event = (IdleStateEvent)evt;

  77.            if (event.state()== IdleState.ALL_IDLE){

  78.                logger.info("客户端已超过60秒未读写数据,关闭连接.{}",ctx.channel().remoteAddress());

  79.                ctx.channel().close();

  80.            }

  81.        }else{

  82.            super.userEventTriggered(ctx,evt);

  83.        }

  84.    }

  85.    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)   {

  86.        logger.info(cause.getMessage());

  87.        ctx.close();

  88.    }

  89. }

4、服务注册

 
   
   
 
  1. package com.viewscenes.netsupervisor.registry;

  2. @Component

  3. public class ServiceRegistry {

  4.    Logger logger = LoggerFactory.getLogger(this.getClass());

  5.    @Value("${registry.address}")

  6.    private String registryAddress;

  7.    private static final String ZK_REGISTRY_PATH = "/rpc";


  8.    public void register(String data) {

  9.        if (data != null) {

  10.            ZkClient client = connectServer();

  11.            if (client != null) {

  12.                AddRootNode(client);

  13.                createNode(client, data);

  14.            }

  15.        }

  16.    }

  17.    //连接zookeeper

  18.    private ZkClient connectServer() {

  19.        ZkClient client = new ZkClient(registryAddress,20000,20000);

  20.        return client;

  21.    }

  22.    //创建根目录/rpc

  23.    private void AddRootNode(ZkClient client){

  24.        boolean exists = client.exists(ZK_REGISTRY_PATH);

  25.        if (!exists){

  26.            client.createPersistent(ZK_REGISTRY_PATH);

  27.            logger.info("创建zookeeper主节点 {}",ZK_REGISTRY_PATH);

  28.        }

  29.    }

  30.    //在/rpc根目录下,创建临时顺序子节点

  31.    private void createNode(ZkClient client, String data) {

  32.        String path = client.create(ZK_REGISTRY_PATH + "/provider", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

  33.        logger.info("创建zookeeper数据节点 ({} => {})", path, data);

  34.    }

  35. }

有一点需要注意,子节点必须是临时节点。这样,生产者端停掉之后,才能通知到消费者,把此服务从服务列表中剔除。到此为止,生产者端已经完成。我们看一下它的启动日志:

 
   
   
 
  1. 加载服务类: com.viewscenes.netsupervisor.service.InfoUserService

  2. 已加载全部服务接口:{com.viewscenes.netsupervisor.service.InfoUserService=com.viewscenes.netsupervisor.service.impl.InfoUserServiceImpl@46cc127b}

  3. Initializing ExecutorService 'applicationTaskExecutor'

  4. Tomcat started on port(s): 8001 (http) with context path ''

  5. Started RpcProviderApplication in 2.003 seconds (JVM running for 3.1)

  6. RPC 服务器启动.监听端口:18868

  7. Starting ZkClient event thread.

  8. Socket connection established to 192.168.245.131/192.168.245.131:2183, initiating session

  9. Session establishment complete on server 192.168.245.131/192.168.245.131:2183, sessionid = 0x367835b48970010, negotiated timeout = 4000

  10. zookeeper state changed (SyncConnected)

  11. 创建zookeeper主节点 /rpc

  12. 创建zookeeper数据节点 (/rpc/provider0000000000 => 192.168.197.1:28868)

四、RPC消费者

首先,我们需要把生产者端的服务接口API,即InfoUserService。以相同的目录放到消费者端。路径不同,调用会找不到的哦。

1、代理

RPC的目标其中有一条,《程序员无需额外地为这个交互作用编程。》所以,我们在调用的时候,就像调用本地方法一样。就像下面这样:

 
   
   
 
  1. @Controller

  2. public class IndexController {    

  3.    @Autowired

  4.    InfoUserService userService;


  5.    @RequestMapping("getById")

  6.    @ResponseBody

  7.    public InfoUser getById(String id){

  8.        logger.info("根据ID查询用户信息:{}",id);

  9.        return userService.getInfoUserById(id);

  10.    }

  11. }

那么,问题来了。消费者端并没有此接口的实现,怎么调用到的呢?这里,首先就是代理。笔者这里用的是Spring的工厂Bean机制创建的代理对象,涉及的代码较多,就不在文章中体现了,如果有不懂的同学,请想象一下,MyBatis中的Mapper接口怎么被调用的。可以参考笔者文章:Mybatis源码分析(四)mapper接口方法是怎样被调用到的

总之,在调用userService方法的时候,会调用到代理对象的invoke方法。在这里,封装请求信息,然后调用Netty的客户端方法发送消息。然后根据方法返回值类型,转成相应的对象返回。

 
   
   
 
  1. package com.viewscenes.netsupervisor.configurer.rpc;


  2. @Component

  3. public class RpcFactory<T> implements InvocationHandler {


  4.    @Autowired

  5.    NettyClient client;


  6.    Logger logger = LoggerFactory.getLogger(this.getClass());

  7.    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

  8.        Request request = new Request();

  9.        request.setClassName(method.getDeclaringClass().getName());

  10.        request.setMethodName(method.getName());

  11.        request.setParameters(args);

  12.        request.setParameterTypes(method.getParameterTypes());

  13.        request.setId(IdUtil.getId());


  14.        Object result = client.send(request);

  15.        Class<?> returnType = method.getReturnType();


  16.        Response response = JSON.parseObject(result.toString(), Response.class);

  17.        if (response.getCode()==1){

  18.            throw new Exception(response.getError_msg());

  19.        }

  20.        if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)){

  21.            return response.getData();

  22.        }else if (Collection.class.isAssignableFrom(returnType)){

  23.            return JSONArray.parseArray(response.getData().toString(),Object.class);

  24.        }else if(Map.class.isAssignableFrom(returnType)){

  25.            return JSON.parseObject(response.getData().toString(),Map.class);

  26.        }else{

  27.            Object data = response.getData();

  28.            return JSONObject.parseObject(data.toString(), returnType);

  29.        }

  30.    }

  31. }

2、服务发现

 
   
   
 
  1. package com.viewscenes.netsupervisor.connection;


  2. @Component

  3. public class ServiceDiscovery {


  4.    @Value("${registry.address}")

  5.    private String registryAddress;

  6.    @Autowired

  7.    ConnectManage connectManage;


  8.    private volatile List<String> addressList = new ArrayList<>();

  9.    private static final String ZK_REGISTRY_PATH = "/rpc";

  10.    private ZkClient client;


  11.    Logger logger = LoggerFactory.getLogger(this.getClass());


  12.    @PostConstruct

  13.    public void init(){

  14.        client = connectServer();

  15.        if (client != null) {

  16.            watchNode(client);

  17.        }

  18.    }


  19.    //连接zookeeper

  20.    private ZkClient connectServer() {

  21.        ZkClient client = new ZkClient(registryAddress,30000,30000);

  22.        return client;

  23.    }

  24.    //监听子节点数据变化

  25.    private void watchNode(final ZkClient client) {

  26.        List<String> nodeList = client.subscribeChildChanges(ZK_REGISTRY_PATH, (s, nodes) -> {

  27.            logger.info("监听到子节点数据变化{}",JSONObject.toJSONString(nodes));

  28.            addressList.clear();

  29.            getNodeData(nodes);

  30.            updateConnectedServer();

  31.        });

  32.        getNodeData(nodeList);

  33.        logger.info("已发现服务列表...{}", JSONObject.toJSONString(addressList));

  34.        updateConnectedServer();

  35.    }

  36.    //连接生产者端服务

  37.    private void updateConnectedServer(){

  38.        connectManage.updateConnectServer(addressList);

  39.    }


  40.    private void getNodeData(List<String> nodes){

  41.        logger.info("/rpc子节点数据为:{}", JSONObject.toJSONString(nodes));

  42.        for(String node:nodes){

  43.            String address = client.readData(ZK_REGISTRY_PATH+"/"+node);

  44.            addressList.add(address);

  45.        }

  46.    }

  47. }

3、Netty客户端

Netty客户端有两个方法比较重要,一个是根据IP端口连接服务器,返回Channel,加入到连接管理器;一个是用Channel发送请求数据。同时,作为客户端,空闲的时候还要往服务端发送心跳信息。

 
   
   
 
  1. package com.viewscenes.netsupervisor.netty.client;


  2. @Component

  3. public class NettyClient {

  4.    Logger logger = LoggerFactory.getLogger(this.getClass());

  5.    private EventLoopGroup group = new NioEventLoopGroup(1);

  6.    private Bootstrap bootstrap = new Bootstrap();

  7.    @Autowired

  8.    NettyClientHandler clientHandler;

  9.    @Autowired

  10.    ConnectManage connectManage;


  11.    public Object send(Request request) throws InterruptedException{


  12.        Channel channel = connectManage.chooseChannel();

  13.        if (channel!=null && channel.isActive()) {

  14.            SynchronousQueue<Object> queue = clientHandler.sendRequest(request,channel);

  15.            Object result = queue.take();

  16.            return JSONArray.toJSONString(result);

  17.        }else{

  18.            Response res = new Response();

  19.            res.setCode(1);

  20.            res.setError_msg("未正确连接到服务器.请检查相关配置信息!");

  21.            return JSONArray.toJSONString(res);

  22.        }

  23.    }

  24.    public Channel doConnect(SocketAddress address) throws InterruptedException {

  25.        ChannelFuture future = bootstrap.connect(address);

  26.        Channel channel = future.sync().channel();

  27.        return channel;

  28.    }

  29.    ....其他方法略

  30. }

我们必须重点关注send方法,它是在代理对象invoke方法调用到的。首先从连接器中轮询选择一个Channel,然后发送数据。但是,Netty是异步操作,我们还要转为同步,就是说要等待生产者端返回数据才往下执行。笔者在这里用的是同步队列SynchronousQueue,它的take方法会阻塞在这里,直到里面有数据可读。然后在处理器中,拿到返回信息写到队列中,take方法返回。

 
   
   
 
  1. package com.viewscenes.netsupervisor.netty.client;

  2. @Component

  3. @ChannelHandler.Sharable

  4. public class NettyClientHandler extends ChannelInboundHandlerAdapter {


  5.    @Autowired

  6.    NettyClient client;

  7.    @Autowired

  8.    ConnectManage connectManage;

  9.    Logger logger = LoggerFactory.getLogger(this.getClass());

  10.    private ConcurrentHashMap<String,SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();


  11.    public void channelActive(ChannelHandlerContext ctx)   {

  12.        logger.info("已连接到RPC服务器.{}",ctx.channel().remoteAddress());

  13.    }

  14.    public void channelInactive(ChannelHandlerContext ctx)   {

  15.        InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress();

  16.        logger.info("与RPC服务器断开连接."+address);

  17.        ctx.channel().close();

  18.        connectManage.removeChannel(ctx.channel());

  19.    }

  20.    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {

  21.        Response response = JSON.parseObject(msg.toString(),Response.class);

  22.        String requestId = response.getRequestId();

  23.        SynchronousQueue<Object> queue = queueMap.get(requestId);

  24.        queue.put(response);

  25.        queueMap.remove(requestId);

  26.    }

  27.    public SynchronousQueue<Object> sendRequest(Request request,Channel channel) {

  28.        SynchronousQueue<Object> queue = new SynchronousQueue<>();

  29.        queueMap.put(request.getId(), queue);

  30.        channel.writeAndFlush(request);

  31.        return queue;

  32.    }

  33.    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {

  34.        logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息...");

  35.        if (evt instanceof IdleStateEvent){

  36.            IdleStateEvent event = (IdleStateEvent)evt;

  37.            if (event.state()== IdleState.ALL_IDLE){

  38.                Request request = new Request();

  39.                request.setMethodName("heartBeat");

  40.                ctx.channel().writeAndFlush(request);

  41.            }

  42.        }else{

  43.            super.userEventTriggered(ctx,evt);

  44.        }

  45.    }

  46.    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){

  47.        logger.info("RPC通信服务器发生异常.{}",cause);

  48.        ctx.channel().close();

  49.    }

  50. }

至此,消费者端也基本完成。同样的,我们先看一下启动日志:

 
   
   
 
  1. Waiting for keeper state SyncConnected

  2. Opening socket connection to server 192.168.139.129/192.168.139.129:2181. Will not attempt to authenticate using SASL (unknown error)

  3. Socket connection established to 192.168.139.129/192.168.139.129:2181, initiating session

  4. Session establishment complete on server 192.168.139.129/192.168.139.129:2181, sessionid = 0x100000273ba002c, negotiated timeout = 20000

  5. zookeeper state changed (SyncConnected)

  6. /rpc子节点数据为:["provider0000000015"]

  7. 已发现服务列表...["192.168.100.74:18868"]

  8. 加入Channel到连接管理器./192.168.100.74:18868

  9. 已连接到RPC服务器./192.168.100.74:18868

  10. Initializing ExecutorService 'applicationTaskExecutor'

  11. Tomcat started on port(s): 7002 (http) with context path ''

  12. Started RpcConsumerApplication in 4.218 seconds (JVM running for 5.569)

五、测试

我们以Controller里面的两个方法为例,先开启100个线程调用insertInfoUser方法,然后开启1000个线程调用查询方法getAllUser。

 
   
   
 
  1. public class IndexController {


  2.    Logger logger = LoggerFactory.getLogger(this.getClass());

  3.    @Autowired

  4.    InfoUserService userService;


  5.    @RequestMapping("insert")

  6.    @ResponseBody

  7.    public List<InfoUser> getUserList() throws InterruptedException {

  8.        long start = System.currentTimeMillis();

  9.        int thread_count = 100;

  10.        CountDownLatch countDownLatch = new CountDownLatch(thread_count);

  11.        for (int i=0;i<thread_count;i++){

  12.            new Thread(() -> {

  13.                InfoUser infoUser = new InfoUser(IdUtil.getId(),"Jeen","BeiJing");

  14.                List<InfoUser> users = userService.insertInfoUser(infoUser);

  15.                logger.info("返回用户信息记录:{}", JSON.toJSONString(users));

  16.                countDownLatch.countDown();

  17.            }).start();

  18.        }

  19.        countDownLatch.await();

  20.        long end = System.currentTimeMillis();

  21.        logger.info("线程数:{},执行时间:{}",thread_count,(end-start));

  22.        return null;

  23.    }

  24.    @RequestMapping("getAllUser")

  25.    @ResponseBody

  26.    public Map<String,InfoUser> getAllUser() throws InterruptedException {


  27.        long start = System.currentTimeMillis();

  28.        int thread_count = 1000;

  29.        CountDownLatch countDownLatch = new CountDownLatch(thread_count);

  30.        for (int i=0;i<thread_count;i++){

  31.            new Thread(() -> {

  32.                Map<String, InfoUser> allUser = userService.getAllUser();

  33.                logger.info("查询所有用户信息:{}",JSONObject.toJSONString(allUser));

  34.                countDownLatch.countDown();

  35.            }).start();

  36.        }

  37.        countDownLatch.await();

  38.        long end = System.currentTimeMillis();

  39.        logger.info("线程数:{},执行时间:{}",thread_count,(end-start));


  40.        return null;

  41.    }

  42. }

结果如下: 

六、总结

本文简单介绍了RPC的整个流程,如果你正在学习RPC的相关知识,可以根据文中的例子,自己实现一遍。相信写完之后,你会对RPC会有更深一些的认识。

生产者端流程:

  • 加载服务,并缓存

  • 启动通讯服务器(Netty)

  • 反射,本地调用

消费者端流程:

  • 代理服务接口

  • 远程调用(轮询生产者服务列表,发送消息)


以上是关于RPC基本原理以及如何用Netty来实现RPC的主要内容,如果未能解决你的问题,请参考以下文章

RPC-非阻塞通信下的同步API实现原理,以Dubbo为例

netty实现rpc框架

RPC核心原理

手写一个RPC框架

基于Netty手写RPC框架进阶版(下)——注册中心及服务的动态扩容

如何用Netty写一个高性能的分布式服务框架