分布式思想和rpc解决方案介绍

Posted codetree

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式思想和rpc解决方案介绍相关的知识,希望对你有一定的参考价值。

1.RPC的诞生

RPC(Remote Procedure Call)远程过程调用,通过这个rpc协议,调用远程计算机上的服务,就像调用本地的服务一样。

技术分享图片

不同的服务部署在不同的机器上面,并且在启动后在注册中心进行注册,如果要调用,可以通过rpc调用对应的服务。
如图,在不同的Controller中可以从注册中心(可以使用eureka,zookeeper实现,本文例子使用简单的hash
map作为实现)获取可以调用的服务,然后通过rpc进行调用。

2.java远程的远程调用-RMI(Remote method Invoke)

java提供了远程的对于远程服务调用的支持:RMI(Remote method Invoke)。

3.手写一个RPC框架

3.1 实现的技术方案

设计技术点:Socket通讯、动态代理与反射、Java序列化

RPC本质是使用动态代理,通过网络通信技术进行增强。

技术分享图片

3.2代码实现

3.2.1 客户端代码

技术分享图片

    package main.java.rpc;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    
    //rpc框架的客户端代理部分
    public class RpcClientFrame {
           /*动态代理类,实现了对远程服务的访问*/
        private static class DynProxy implements InvocationHandler{
            //远程调用的服务
            private Class serviceClass;
            //远程调用地址
            private final InetSocketAddress addr;
            public DynProxy(Class serviceClass,InetSocketAddress addr) {
                this.serviceClass = serviceClass;
                this.addr = addr;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                ObjectInputStream inputStream = null;
                ObjectOutputStream outputStream = null;
                Socket socket = null;
                try {
                    socket = new Socket();
                    socket.connect(addr);
                    //类名 方法名 方法类型列表  方法入参列表
                    outputStream = new ObjectOutputStream(socket.getOutputStream());
                    outputStream.writeUTF(serviceClass.getSimpleName());
                    outputStream.writeUTF(method.getName());
                    outputStream.writeObject(method.getParameterTypes());
                    outputStream.writeObject(args);
                    outputStream.flush();
                    
                    inputStream = new ObjectInputStream(socket.getInputStream());
                     //我们要把调用的细节打印出来
                    System.out.println("远程调用成功!" + serviceClass.getName());
                    //最后要网络的请求返回给返回
                    return inputStream.readObject();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    socket.close();
                    inputStream.close();
                    outputStream.close();
                }
                return null;
            }
            
        }
        //定义客户端要定义的服务
        package enjoyedu.service;

        /**
         * 享学课堂
         *类说明:服务员接口
         */
        public interface TechInterface {
            //洗脚服务
            String XJ(String name);
        }
    package main.java;

    import main.java.rpc.RpcClientFrame;
    import main.java.service.TechInterface;
    
    /**
     * rpc的客户端调用远程服务
     * @author hasee
     *
     */
    public class Client {
        public static void main(String[] args) {
            //动态代理获取我们的对象
            TechInterface techInterface = (TechInterface) RpcClientFrame.getProxyObject(TechInterface.class);
            //进远程调用我们的对象
            System.out.println(techInterface.XJ("luke"));
        }
    }

3.2.2服务端和注册中心代码

技术分享图片

1.//服务端定义要调用的服务接口
package service;
public interface TechInterface {
    //洗脚服务
    String XJ(String name);
}

2.//服务端定义要调用的服务的接口实现类
package service.impl;
import service.TechInterface;
public class TechImpl implements TechInterface {
      public String XJ(String name) {

            return "您好,13号技师为你服务:"+name;
        }
}

package server;
import java.io.IOException;
import javax.imageio.spi.RegisterableService;
import register.RegisterCenter;
import service.TechInterface;
import service.impl.TechImpl;

/**
 * rpc的服务端,提供服务
 * @author hasee
 *
 */
public class Server {
    public static void main(String[] args) throws IOException {
        RegisterCenter registerCenter = new RegisterCenter(8888);
        //注册技师对象至注册中心
        registerCenter.register(TechInterface.class, TechImpl.class);
        registerCenter.start();
    }
}

package register;
/**
 * 注册中心,这个例子使用一个hashmap作为实现
 * @author hasee
 *
 */

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RegisterCenter {
    //线程池
    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    //定义注册中心的静态对象
    private static Map<String, Class> serviceRegistry = new HashMap<String, Class>();
    //服务端口
    private static int port = 8888;
    
    /**
     *  注册服务
     * @param serviceInterface 接口名字
     * @param impl 实现类的class对象
     */
    public void register(Class serviceInterface, Class impl) {
        //服务的注册:socket通讯+反射
        serviceRegistry.put(serviceInterface.getSimpleName(), impl);
    }
    
    public RegisterCenter(int port) {
        this.port = port;
    }
    
     
    /**
     * 启动服务端
     * @throws IOException
     */
    public static void start() throws IOException {
        // 创建ServerSocket实例监听端口
        ServerSocket serverSocket = new ServerSocket(port);
        System.out.println("start server");
         // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行,并且同时将socket送入(server.accept()=socket)
        try {
            while (true) {
                //serverSocket.accept()会阻塞直到服务端接受到客户端的请求。
                executorService.execute(new ServiceTask(serverSocket.accept()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 将客户端的每一个请求都封装成一个线程ServiceTask,投放到线程池里面进行执行。
     * @author hasee
     *
     */
    private static class ServiceTask implements Runnable {
        private Socket client;
        public ServiceTask(Socket client) {
            this.client = client;
        }
        public void run() {
            //读取socket中的流数据
            ObjectInputStream inputStream = null;
            ObjectOutputStream outputStream = null;
            try {
                // 类名、方法名、参数类型、参数值
                inputStream = new ObjectInputStream(client.getInputStream());
                //获取调用服务名称
                String serviceName = inputStream.readUTF();
                //获取调用方法的名称
                String methodName = inputStream.readUTF();
                //获取参数类型列表
                Class<?>[] requiresTypes = (Class<?>[]) inputStream.readObject();
                //获取参数列表
                Object[] args = (Object[]) inputStream.readObject();
                Class serviceClass = serviceRegistry.get(serviceName);
                //反射调用方法
                Method method = serviceClass.getMethod(methodName, requiresTypes);
                Object result = method.invoke(serviceClass.newInstance(), args);
                //把结果反馈到客户端
                outputStream = new ObjectOutputStream(client.getOutputStream());
                outputStream.writeObject(result);
                outputStream.flush();
                //关闭io资源
                inputStream.close();
                client.close();
                
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
}

3.2.3 测试结果

  • 先启动服务端
  • 其次启动客户端

输出结果:您好,13号技师为你服务:luke



以上是关于分布式思想和rpc解决方案介绍的主要内容,如果未能解决你的问题,请参考以下文章

串烧RPC

DUBBO分布式RPC远程调用思想

Dubbo系列_RPC介绍

RPC简单介绍

带你手写基于 Spring 的可插拔式 RPC 框架介绍

Spark原理图解:Rpc通信