4_分布式通信框架RMI

Posted binwenhome

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4_分布式通信框架RMI相关的知识,希望对你有一定的参考价值。

简介

  1. 什么是RPC
    • RPC(Remote Procedure Call, 远程过程调用), 一般用来实现部署在不同机器上的系统之间的方法调用. 使程序能像访问本地资源一样, 通过网络传输去访问远端系统资源.
    • 对客户端来说, 传输层用什么协议, 序列化, 反序列化过程都是透明的, 不用管.
  2. Java RMI
    • RMI(Remote Method Invocation, 远程方法调用). 这是一种用于远程过程调用的应用程序编程接口, 纯Java的网络分布式应用系统解决方案.
    • 目前RMI使用Java远程消息交换协议JRMP通信(专为Java对象定制)
    • RMI框架开发的应用系统能部署在任何支持JRE的平台上, 但对于非Java语言开发的应用系统的支持不足, 无法与非Java语言书写的对象进行通信.

RMI代码实践

  • 创建远程接口, 并且继承java.rmi.Remote接口
    public interface iHelloService extends Remote {
    
        String sayHello(String msg) throws RemoteException;
    }
  • 远程对象必须实现UnicastRemoteObject, 这样才能保证客户端访问获得远程对象时, 该远程对象会把自身的一个拷贝以Socket的形式传输给客户端
    public class HelloServiceImpl extends UnicastRemoteObject implements iHelloService {
    
    
        protected HelloServiceImpl() throws RemoteException {
            super();
        }
    
        @Override
        public String sayHello(String msg) throws RemoteException {
            return "Hello" + msg;
        }
    }
  • 此时服务端本身已存在的远程对象称为"skeleton", 而skeleton是服务端的一个代理, 用于接收客户端的请求后调用远程方法来响应客户端请求.
    public class Server {
    
        public static void main(String[] args) {
            try {
                iHelloService helloService = new HelloServiceImpl();

    //创建服务器程序:注册 LocateRegistry.createRegistry(
    1099); Naming.rebind("rmi://127.0.0.1/Hello", helloService); System.out.println("服务启动成功"); } catch (RemoteException e) { e.printStackTrace(); } catch (MalformedURLException e) { e.printStackTrace(); } } }
  • 而客户端获得的拷贝称为"stub", 它是客户端的一个代理.
    public class ClientDemo  {
    
        public static void main(String[] args) throws RemoteException, NotBoundException, MalformedURLException {
    //创建客户端程序 iHelloService helloService
    = (iHelloService) Naming.lookup("rmi://127.0.0.1/Hello"); System.out.println(helloService.sayHello("MR. White")); } }
  • 这里说一下RMI的底层实现
    • 技术图片

源码解读

  • 类图
    • 远程对象的发布
      • 技术图片
    • 远程引用层
      • 技术图片
  •  发布远程对象

    1. 从上面类图可知, 这里会发布两个远程对象, 一个RegistryImpl, 另一个是自己写的RMI实现类对象.
    2. 从HelloServiceImpl构造器看起, 调用了父类UnicastRemoteObject的给构造方法.
      protected UnicastRemoteObject(int port) throws RemoteException
          {
              this.port = port;
              exportObject((Remote) this, port);
          }
    3. 追溯到了私有方法exportObject()
      • 这里做了个判断, 判断服务的实现类是不是UnicastRemoteObject的子类, 如果是, 则直接把传入的UnicastServerRef对象赋值给其ref(RemoteRef)对象.
      • 反之, 则调用UnicastServerRef的exportObject()方法
        public static Remote exportObject(Remote obj, int port)
                throws RemoteException
            {
                //创建UnicastServerRef对象, 对象内引用了LiveRef(tcp通信)
                return exportObject(obj, new UnicastServerRef(port));
            }
        private static Remote exportObject(Remote obj, UnicastServerRef sref) throws RemoteException { // if obj extends UnicastRemoteObject, set its ref. if (obj instanceof UnicastRemoteObject) { ((UnicastRemoteObject) obj).ref = sref; } return sref.exportObject(obj, null, false); }
        public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException { Class var4 = var1.getClass(); //创建远程代理类, 该代理是对OperationImpl对象的代理, getClientRef提供的InvocationHandler中提供了TCP连接. Remote var5; try { var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse); } catch (IllegalArgumentException var7) { throw new ExportException("remote object implements illegal remote interface", var7); } if (var5 instanceof RemoteStub) { this.setSkeleton(var1); } //包装实际对象, 并将其暴露在TCP端口上, 等待客户端调用 Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3); this.ref.exportObject(var6); this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4); return var5; }
      • 因为HelloServiceImpl继承了UnicastRemoteObject, 所以服务启动的时候, 会通过UnicastRemoteObject的构造方法把对象进行发布.
  • 服务端启动Registry
    • LocateRegistry.createRegistry(1099);
    1. 从上面这行代码入手, 开始往下看. 可以发现服务端创建 了一个RegistryImpl对象, 这里做了一个判断,
      • 如果服务端指定的端口是1099并且系统开启了安全管理器, 那么就可以在限定的权限集内绕过系统的安全校验, 这样纯粹是为了提高效率.
        public RegistryImpl(final int var1) throws RemoteException {
                this.bindings = new Hashtable(101);
                if (var1 == 1099 && System.getSecurityManager() != null) {
                    try {
                        AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
                            public Void run() throws RemoteException {
                                LiveRef var1x = new LiveRef(RegistryImpl.id, var1);
                                RegistryImpl.this.setup(new UnicastServerRef(var1x, (var0) -> {
                                    return RegistryImpl.registryFilter(var0);
                                }));
                                return null;
                            }
                        }, (AccessControlContext)null, new SocketPermission("localhost:" + var1, "listen,accept"));
                    } catch (PrivilegedActionException var3) {
                        throw (RemoteException)var3.getException();
                    }
                } else {
                    LiveRef var2 = new LiveRef(id, var1);
                    this.setup(new UnicastServerRef(var2, RegistryImpl::registryFilter));
                }
        
            }
      • 如果没有, 将执行this.setup(), setup方法将传入的UnicastServerRef对象赋值给正在初始化的RegistryImpl对象的远程引用ref. 这里涉及到了向上转型, 然后继续执行UnicastServerRef的exportObject方法
        private void setup(UnicastServerRef var1) throws RemoteException {
                this.ref = var1;
                var1.exportObject(this, (Object)null, true);
            }
    2. 进入到UnicastServerRef的exportObject()方法,
      • 这里首先为传入的RegistryImpl创建一个代理, 这个代理我们可以推断出就是后面服务于客户端的RegistryImpl的Stub (RegistryImpl_Stub)对象. 
      • 然后将UnicastServerRef 的skel(skeleton)对象设置为当前RegistryImpl对象. 
      • 最后用 skeleton, stub, UnicastServerRef 对象, id和一个boolean值构造了一个Target对象, 也就是这个Target对象基本包含了全部的信息, 等待TCP的调用.
        public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {
                Class var4 = var1.getClass();
        
                Remote var5;
                try {
                    var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse);
                } catch (IllegalArgumentException var7) {
                    throw new ExportException("remote object implements illegal remote interface", var7);
                }
        
                if (var5 instanceof RemoteStub) {
                    this.setSkeleton(var1);
                }
        
                Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);
                this.ref.exportObject(var6);
                this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);
                return var5;
            }
    3. 到上面为止, 我们看到的都是一些变量的赋值和创建工作, 还没有到连接层, 这些引用对象将会被Stub和Skeleton对象使用. 接下来就是连接层上的了.
      • 追溯LiveRef的exportObject()方法, 很容易找到了TCPTransport的exportObject(), 这个方法做的事情就是将上面构造的Target对象暴露出去.
        //TCPEndpoint类中
            public void exportObject(Target var1) throws RemoteException {
                this.transport.exportObject(var1);
            }
        
            //TCPTransport类中
            public void exportObject(Target var1) throws RemoteException {
                synchronized(this) {
                    this.listen();
                    ++this.exportCount;
                }
                boolean var2 = false;
                boolean var12 = false;
                try {
                    var12 = true;
                    super.exportObject(var1);
                    var2 = true;
                    var12 = false;
                } finally {
                    if (var12) {
                        if (!var2) {
                            synchronized(this) {
                                this.decrementExportCount();
                            }
                        }
                    }
                }
                if (!var2) {
                    synchronized(this) {
                        this.decrementExportCount();
                    }
                }
            }
      • 调用TCPTransport的listen()方法, listen()方法创建了一个ServerSocket, 并且启动了一条线程等待客户端的请求.
        private void listen() throws RemoteException {
                assert Thread.holdsLock(this);
                TCPEndpoint var1 = this.getEndpoint();
                int var2 = var1.getPort();
                if (this.server == null) {
                    if (tcpLog.isLoggable(Log.BRIEF)) {
                        tcpLog.log(Log.BRIEF, "(port " + var2 + ") create server socket");
                    }
        
                    try {
                        this.server = var1.newServerSocket();
                        Thread var3 = (Thread)AccessController.doPrivileged(new NewThreadAction(new TCPTransport.AcceptLoop(this.server), "TCP Accept-" + var2, true));
                        var3.start();
                    } catch (BindException var4) {
                        throw new ExportException("Port already in use: " + var2, var4);
                    } catch (IOException var5) {
                        throw new ExportException("Listen failed on port: " + var2, var5);
                    }
                } else {
                    SecurityManager var6 = System.getSecurityManager();
                    if (var6 != null) {
                        var6.checkListen(var2);
                    }
                }
            }
      • 接着调用父类Transport的exportObject()将Target对象存放进ObjectTable中.
    4. 此时, 我们已经将RegistryImpl对象创建并起了服务等待客户端的请求.
  • 客户端获取服务端Registry代理
    • iHelloService helloService = (iHelloService) Naming.lookup("rmi://127.0.0.1/Hello");
    1. 从上述代码看起 (CleintDemo中), 很容易追溯到LocateRegistry的getRegistry()方法, 该方法通过传入的host和port构造RemoteRef对象, 并创建了一个本地代理, 这个代理对象其实是RegistryImpl_Stub对象. 这样客户端便有了服务端的RegistryImpl的代理(取决于ignoreStubClasses变量). 
      private static Registry getRegistry(ParsedNamingURL parsed)
              throws RemoteException
          {
              return LocateRegistry.getRegistry(parsed.host, parsed.port);
          }
      
      //========================================
      
          public static Registry getRegistry(String host, int port,
                                             RMIClientSocketFactory csf)
              throws RemoteException
          {
              Registry registry = null;
      
      //获取仓库地址
      if (port <= 0) port = Registry.REGISTRY_PORT; if (host == null || host.length() == 0) { // If host is blank (as returned by "file:" URL in 1.0.2 used in // java.rmi.Naming), try to convert to real local host name so // that the RegistryImpl‘s checkAccess will not fail. try { host = java.net.InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { // If that failed, at least try "" (localhost) anyway... host = ""; } } LiveRef liveRef = new LiveRef(new ObjID(ObjID.REGISTRY_ID), new TCPEndpoint(host, port, csf, null), false); RemoteRef ref = (csf == null) ? new UnicastRef(liveRef) : new UnicastRef2(liveRef);
      //创建远程代理类, 引用liveRef, 好让动态代理时, 能进行tcp通信
      return (Registry) Util.createProxy(RegistryImpl.class, ref, false); }
    2. 但注意此时这个代理其实还没有和服务端的RegistryImpl对象关联, 毕竟是两个VM上面的对象, 这里我们也可以猜测, 代理和远程的Registry对象之间是通过socket消息完成的.
    3. 调用RegistryImpl_Stub的ref(RemoteRef)对象的newCall()方法, 将RegistryImpl_Stub对象传了进去, 不要忘了构造它的时候我们将服务器的主机端口等信息传了进去, 也就是我们把服务器相关的信息也传进了newCall()方法.
      • newCall()方法做的事情简单来看就是建立了跟远程RegistryImpl的Skeleton对象的连接(上面我们说到过服务端通过TCPTransport的exportObject()方法等待着客户端的请求)
        public RemoteCall newCall(RemoteObject var1, Operation[] var2, int var3, long var4) throws RemoteException {
                clientRefLog.log(Log.BRIEF, "get connection");
                Connection var6 = this.ref.getChannel().newConnection();
        
                try {
                    clientRefLog.log(Log.VERBOSE, "create call context");
                    if (clientCallLog.isLoggable(Log.VERBOSE)) {
                        this.logClientCall(var1, var2[var3]);
                    }
        
                    StreamRemoteCall var7 = new StreamRemoteCall(var6, this.ref.getObjID(), var3, var4);
        
                    try {
                        this.marshalCustomCallData(var7.getOutputStream());
                    } catch (IOException var9) {
                        throw new MarshalException("error marshaling custom call data");
                    }
        
                    return var7;
                } catch (RemoteException var10) {
                    this.ref.getChannel().free(var6, false);
                    throw var10;
                }
            }
    4. 连接建立之后就是发送请求了, 我们知道客户端终究只是拥有Registry对象的代理, 而不是真正的位于服务端Registry对象本身, 他们位于不同的虚拟机实例之中, 无法直接调用, 必然是通过消息进行交互的.
      • 看看super.ref.invoke()做了什么
      • 技术图片
      • 容易追溯到StreamRemoteCall的executeCall()方法. 看似本地调用, 但其实很容易从代码中看出来是通过tcp连接发送消息到服务端, 由服务端解析并且处理调用.
        public void invoke(RemoteCall var1) throws Exception {
                try {
                    clientRefLog.log(Log.VERBOSE, "execute call");
                    var1.executeCall();
                } catch (RemoteException var3) {
                    clientRefLog.log(Log.BRIEF, "exception: ", var3);
                    this.free(var1, false);
                    throw var3;
                } catch (Error var4) {
                    clientRefLog.log(Log.BRIEF, "error: ", var4);
                    this.free(var1, false);
                    throw var4;
                } catch (RuntimeException var5) {
                    clientRefLog.log(Log.BRIEF, "exception: ", var5);
                    this.free(var1, false);
                    throw var5;
                } catch (Exception var6) {
                    clientRefLog.log(Log.BRIEF, "exception: ", var6);
                    this.free(var1, true);
                    throw var6;
                }
            }
    5. 至此, 客户端的查询服务请求发出了.
  • 服务器接收客户端的服务查询请求并返回给客户端结果
    • ######
  • 客户端获取通过lookUp()查询获得的客户端HelloServiceImpl的Stub对象.
    1. 客户端通过Lookup查询获得的是客户端HelloServiceImpl的Stub对象(这一块我们看不到, 因为Skeleton为我们屏蔽了),
    2. 然后后续的处理仍然是通过HelloServiceImpl_Stub代理对象通过socket网络请求到服务端,
    3. 通过服务端的HelloServiceImpl_Stub(Skeleton)进行代理, 将请求通过Dispatcher转发到对应的服务端方法获得结果以后再次通过socket把结果返回到客户端.

RPC的基本原理

  1.  技术图片 

  2. 要说明, 在RMI Client实施正式的RMI调用前, 它必须通过LocateRegistry或者Naming方式到RMI注册表寻找要调用的RMI注册信息. 找到RMI事务注册信息后, Client会从RMI注册表获取这个RMI Remote Service的Stub信息. 这个过程成功后, RMI Client才能开始正式的调用过程.
  3. 另外要说明的是RMI Client正式调用过程, 也不是由RMI Client直接访问Remote Service, 而是由客户端获取的Stub作为RMI Client的代理访问Remote Service的代理Skeleton, 如上图所示的顺序. 也就是说真实的请求调用是在Stub-Skeleton之间进行的. Registry并不参与具体的Stub-Skeleton的调用过程, 只负责记录“哪个服务名”使用哪一个Stub, 并在 Remote Client询问它时将这个Stub拿给Client.

以上是关于4_分布式通信框架RMI的主要内容,如果未能解决你的问题,请参考以下文章

ZooKeeper伪分布集群安装及使用 RMI+ZooKeeper实现远程调用框架

分布式架构总汇

Spring框架实现——远程方法调用RMI代码演示

01_JMS概述

分布式架构网络通信

分布式通信的几种方式