基于分布式思想下的rpc解决方案

Posted zqlovesym

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于分布式思想下的rpc解决方案相关的知识,希望对你有一定的参考价值。

手写RPC:

1.客户端代码

接口:

/**
 * 
 *类说明:服务员接口
 */
public interface TechInterface {
    //洗脚服务
    String XJ(String name);
}

 

package enjoyedu.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * 
 *类说明:rpc框架的客户端代理部分
 */
public class RpcClientFrame {

    /*远程服务的代理对象,参数为客户端要调用的的服务*/
    public static <T> T getRemoteProxyObj(final Class<?> serviceInterface)
            throws Exception {
        // 默认端口8888
        InetSocketAddress serviceAddr = new InetSocketAddress("127.0.0.1",8888);
        // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
        //进行实际的服务调用(动态代理)
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},new DynProxy(serviceInterface,serviceAddr));
    }

    /*动态代理类,实现了对远程服务的访问*/
    private static class DynProxy implements InvocationHandler {
        //接口
        private final Class<?> serviceInterface;
        //远程调用地址
        private final InetSocketAddress addr;

        //构造函数
        public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) {
            this.serviceInterface = serviceInterface;
            this.addr = addr;
        }

        /*动态代理类,增强:实现了对远程服务的访问*/
        public Object invoke(Object proxy, Method method, Object[] args)
                throws Throwable {
            /* 网络增强部分*/
            Socket socket = null;
            //因为传递的大部分是 方法、参数,所以我们使用Object流对象
            ObjectInputStream objectInputStream = null;
            ObjectOutputStream objectOutputStream = null;
            try {
                //新建一个Socket
                socket = new Socket();
                //连接到远程的地址和端口
                socket.connect(addr);
                //往远端 发送数据,按照顺序发送数据:类名、方法名、参数类型、参数值
                //拿到输出的流
                objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                //发送 调用方法的 类名,使用UTF-8避免乱码
                objectOutputStream.writeUTF(serviceInterface.getName());
                //发送 方法名
                objectOutputStream.writeUTF(method.getName());
                //发送 参数类型,使用Object
                objectOutputStream.writeObject(method.getParameterTypes());
                //发送 参数的值,使用UTF-8避免乱码
                objectOutputStream.writeObject(args);
                //刷新缓冲区,使得数据立马发送
                objectOutputStream.flush();
                //立马拿到远程执行的结果
                objectInputStream = new ObjectInputStream(socket.getInputStream());
                //我们要把调用的细节打印出来
                System.out.println("远程调用成功!" + serviceInterface.getName());
                //最后要网络的请求返回给返回
                return objectInputStream.readObject();
            } finally {

                //最后记得关闭
                socket.close();
                objectOutputStream.close();
                objectInputStream.close();

            }
        }
    }
}

 

package enjoyedu;


import enjoyedu.rpc.RpcClientFrame;
import enjoyedu.service.TechInterface;

/**
 * 
 *类说明:rpc的客户端,调用远端服务
 */
public class Client {
    public static void main(String[] args) throws Exception {
        //动态代理获取我们的对象
        TechInterface techInterface = RpcClientFrame.getRemoteProxyObj(TechInterface.class);
        //进远程调用我们的对象
        System.out.println(techInterface.XJ("king"));

    }
}

2.服务端代码:

/**
 * 
 *类说明:服务接口
 */
public interface TechInterface {
    //洗脚服务
    String XJ(String name);
}
package enjoyedu.service.impl;


import enjoyedu.service.TechInterface;

/**
 *
 * 类说明:服务实现类
 */
public class TechImpl implements TechInterface {
    @Override
    public String XJ(String name) {

        return "您好,13号技师为你服务:"+name;
    }
}
package enjoyedu.register;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 *类说明:服务注册中心
 */
public class RegisterCenter {
    //线程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    //定义注册中心的静态对象
    private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();

    private static boolean isRunning = false;

    private static int port;

    public RegisterCenter(int port) {

        this.port = port;
    }

    //服务注册中心启动
    public void start() throws IOException {
        //服务器监听
        ServerSocket server = new ServerSocket();
        //监听绑定端口
        server.bind(new InetSocketAddress(port));
        System.out.println("start server");
        try {
            while (true) {
                // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行,并且同时将socket送入(server.accept()=socket)
                executor.execute(new ServiceTask(server.accept()));
            }
        } finally {
            server.close();
        }
    }
    //服务的注册:socket通讯+反射
    public void register(Class serviceInterface, Class impl) {

        serviceRegistry.put(serviceInterface.getName(), impl);
    }

    //服务的获取运行
    private static class ServiceTask implements Runnable {
        //客户端socket
        Socket clent = null;

        public ServiceTask(Socket client) {
            this.clent = client;
        }
        //远程请求达到服务端,我们需要执行请求结果,并且把请求结果反馈至客户端,使用Socket通讯
        public void run() {
            //反射
            //同样适用object流
            ObjectInputStream inputStream = null;
            ObjectOutputStream outputStream = null;
            try {
                //1.客户端发送的object对象拿到,2.在采用反射的机制进行调用,3.最后给返回结果
                inputStream = new ObjectInputStream(clent.getInputStream());
                //顺序发送数据:类名、方法名、参数类型、参数值
                //拿到接口名
                String  serviceName = inputStream.readUTF();
                //拿到方法名
                String  methodName = inputStream.readUTF();
                //拿到参数类型
                Class<?>[] paramTypes = ( Class<?>[])inputStream.readObject();
                //拿到参数值
                Object[] arguments = (Object[])inputStream.readObject();
                //要到注册中心根据 接口名,获取实现类
                Class serviceClass =serviceRegistry.get(serviceName);
                //使用反射的机制进行调用
                Method method = serviceClass.getMethod(methodName,paramTypes);
                //反射调用方法,把结果拿到
                Object result = method.invoke(serviceClass.newInstance(),arguments);
                //通过执行socket返回给客户端
                outputStream = new ObjectOutputStream(clent.getOutputStream());
                // /把结果返回给客户端
                outputStream.writeObject(result);
                //记得关闭
                outputStream.close();
                inputStream.close();
                clent.close();

            }catch (Exception e){
                e.printStackTrace();
            }

        }

    }
}
package enjoyedu;


import enjoyedu.register.RegisterCenter;
import enjoyedu.service.TechInterface;
import enjoyedu.service.impl.TechImpl;

/**
 * 
 *类说明:rpc的服务端,提供服务
 */
public class Server {
    public static void main(String[] args) throws  Exception{
        new Thread(new Runnable() {
            public void run() {
                try {
                    //起一个服务中心
                    RegisterCenter serviceServer = new RegisterCenter(8888);
                    //注册技师对象至注册中心
                    serviceServer.register(TechInterface.class, TechImpl.class);
                    //运行我们的服务
                    serviceServer.start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

 

以上是关于基于分布式思想下的rpc解决方案的主要内容,如果未能解决你的问题,请参考以下文章

分布式集群下的RPC限流方案

分布式集群下的RPC限流方案

分布式集群下的RPC限流方案

分布式架构Dubbo

Rpc基于开源Dubbo分布式RPC服务框架的部署整合

DUBBO分布式RPC远程调用思想