4_分布式通信框架RMI
Posted binwenhome
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4_分布式通信框架RMI相关的知识,希望对你有一定的参考价值。
简介
- 什么是RPC
- RPC(Remote Procedure Call, 远程过程调用), 一般用来实现部署在不同机器上的系统之间的方法调用. 使程序能像访问本地资源一样, 通过网络传输去访问远端系统资源.
- 对客户端来说, 传输层用什么协议, 序列化, 反序列化过程都是透明的, 不用管.
- Java RMI
- RMI(Remote Method Invocation, 远程方法调用). 这是一种用于远程过程调用的应用程序编程接口, 纯Java的网络分布式应用系统解决方案.
- 目前RMI使用Java远程消息交换协议JRMP通信(专为Java对象定制)
- RMI框架开发的应用系统能部署在任何支持JRE的平台上, 但对于非Java语言开发的应用系统的支持不足, 无法与非Java语言书写的对象进行通信.
RMI代码实践
- 创建远程接口, 并且继承java.rmi.Remote接口
public interface iHelloService extends Remote { String sayHello(String msg) throws RemoteException; }
- 远程对象必须实现UnicastRemoteObject, 这样才能保证客户端访问获得远程对象时, 该远程对象会把自身的一个拷贝以Socket的形式传输给客户端
public class HelloServiceImpl extends UnicastRemoteObject implements iHelloService { protected HelloServiceImpl() throws RemoteException { super(); } @Override public String sayHello(String msg) throws RemoteException { return "Hello" + msg; } }
- 此时服务端本身已存在的远程对象称为"skeleton", 而skeleton是服务端的一个代理, 用于接收客户端的请求后调用远程方法来响应客户端请求.
public class Server { public static void main(String[] args) { try { iHelloService helloService = new HelloServiceImpl();
//创建服务器程序:注册 LocateRegistry.createRegistry(1099); Naming.rebind("rmi://127.0.0.1/Hello", helloService); System.out.println("服务启动成功"); } catch (RemoteException e) { e.printStackTrace(); } catch (MalformedURLException e) { e.printStackTrace(); } } } - 而客户端获得的拷贝称为"stub", 它是客户端的一个代理.
public class ClientDemo { public static void main(String[] args) throws RemoteException, NotBoundException, MalformedURLException {
//创建客户端程序 iHelloService helloService = (iHelloService) Naming.lookup("rmi://127.0.0.1/Hello"); System.out.println(helloService.sayHello("MR. White")); } } - 这里说一下RMI的底层实现
源码解读
- 类图
- 远程对象的发布
- 远程引用层
- 远程对象的发布
-
发布远程对象
- 从上面类图可知, 这里会发布两个远程对象, 一个RegistryImpl, 另一个是自己写的RMI实现类对象.
- 从HelloServiceImpl构造器看起, 调用了父类UnicastRemoteObject的给构造方法.
protected UnicastRemoteObject(int port) throws RemoteException { this.port = port; exportObject((Remote) this, port); }
- 追溯到了私有方法exportObject()
- 这里做了个判断, 判断服务的实现类是不是UnicastRemoteObject的子类, 如果是, 则直接把传入的UnicastServerRef对象赋值给其ref(RemoteRef)对象.
- 反之, 则调用UnicastServerRef的exportObject()方法
public static Remote exportObject(Remote obj, int port) throws RemoteException { //创建UnicastServerRef对象, 对象内引用了LiveRef(tcp通信) return exportObject(obj, new UnicastServerRef(port)); }
private static Remote exportObject(Remote obj, UnicastServerRef sref) throws RemoteException { // if obj extends UnicastRemoteObject, set its ref. if (obj instanceof UnicastRemoteObject) { ((UnicastRemoteObject) obj).ref = sref; } return sref.exportObject(obj, null, false); }
public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException { Class var4 = var1.getClass(); //创建远程代理类, 该代理是对OperationImpl对象的代理, getClientRef提供的InvocationHandler中提供了TCP连接. Remote var5; try { var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse); } catch (IllegalArgumentException var7) { throw new ExportException("remote object implements illegal remote interface", var7); } if (var5 instanceof RemoteStub) { this.setSkeleton(var1); } //包装实际对象, 并将其暴露在TCP端口上, 等待客户端调用 Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3); this.ref.exportObject(var6); this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4); return var5; } - 因为HelloServiceImpl继承了UnicastRemoteObject, 所以服务启动的时候, 会通过UnicastRemoteObject的构造方法把对象进行发布.
- 服务端启动Registry
- LocateRegistry.createRegistry(1099);
- 从上面这行代码入手, 开始往下看. 可以发现服务端创建 了一个RegistryImpl对象, 这里做了一个判断,
- 如果服务端指定的端口是1099并且系统开启了安全管理器, 那么就可以在限定的权限集内绕过系统的安全校验, 这样纯粹是为了提高效率.
public RegistryImpl(final int var1) throws RemoteException { this.bindings = new Hashtable(101); if (var1 == 1099 && System.getSecurityManager() != null) { try { AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { public Void run() throws RemoteException { LiveRef var1x = new LiveRef(RegistryImpl.id, var1); RegistryImpl.this.setup(new UnicastServerRef(var1x, (var0) -> { return RegistryImpl.registryFilter(var0); })); return null; } }, (AccessControlContext)null, new SocketPermission("localhost:" + var1, "listen,accept")); } catch (PrivilegedActionException var3) { throw (RemoteException)var3.getException(); } } else { LiveRef var2 = new LiveRef(id, var1); this.setup(new UnicastServerRef(var2, RegistryImpl::registryFilter)); } }
- 如果没有, 将执行this.setup(), setup方法将传入的UnicastServerRef对象赋值给正在初始化的RegistryImpl对象的远程引用ref. 这里涉及到了向上转型, 然后继续执行UnicastServerRef的exportObject方法
private void setup(UnicastServerRef var1) throws RemoteException { this.ref = var1; var1.exportObject(this, (Object)null, true); }
- 如果服务端指定的端口是1099并且系统开启了安全管理器, 那么就可以在限定的权限集内绕过系统的安全校验, 这样纯粹是为了提高效率.
- 进入到UnicastServerRef的exportObject()方法,
- 这里首先为传入的RegistryImpl创建一个代理, 这个代理我们可以推断出就是后面服务于客户端的RegistryImpl的Stub (RegistryImpl_Stub)对象.
- 然后将UnicastServerRef 的skel(skeleton)对象设置为当前RegistryImpl对象.
- 最后用 skeleton, stub, UnicastServerRef 对象, id和一个boolean值构造了一个Target对象, 也就是这个Target对象基本包含了全部的信息, 等待TCP的调用.
public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException { Class var4 = var1.getClass(); Remote var5; try { var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse); } catch (IllegalArgumentException var7) { throw new ExportException("remote object implements illegal remote interface", var7); } if (var5 instanceof RemoteStub) { this.setSkeleton(var1); } Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3); this.ref.exportObject(var6); this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4); return var5; }
- 到上面为止, 我们看到的都是一些变量的赋值和创建工作, 还没有到连接层, 这些引用对象将会被Stub和Skeleton对象使用. 接下来就是连接层上的了.
- 追溯LiveRef的exportObject()方法, 很容易找到了TCPTransport的exportObject(), 这个方法做的事情就是将上面构造的Target对象暴露出去.
//TCPEndpoint类中 public void exportObject(Target var1) throws RemoteException { this.transport.exportObject(var1); } //TCPTransport类中 public void exportObject(Target var1) throws RemoteException { synchronized(this) { this.listen(); ++this.exportCount; } boolean var2 = false; boolean var12 = false; try { var12 = true; super.exportObject(var1); var2 = true; var12 = false; } finally { if (var12) { if (!var2) { synchronized(this) { this.decrementExportCount(); } } } } if (!var2) { synchronized(this) { this.decrementExportCount(); } } }
- 调用TCPTransport的listen()方法, listen()方法创建了一个ServerSocket, 并且启动了一条线程等待客户端的请求.
private void listen() throws RemoteException { assert Thread.holdsLock(this); TCPEndpoint var1 = this.getEndpoint(); int var2 = var1.getPort(); if (this.server == null) { if (tcpLog.isLoggable(Log.BRIEF)) { tcpLog.log(Log.BRIEF, "(port " + var2 + ") create server socket"); } try { this.server = var1.newServerSocket(); Thread var3 = (Thread)AccessController.doPrivileged(new NewThreadAction(new TCPTransport.AcceptLoop(this.server), "TCP Accept-" + var2, true)); var3.start(); } catch (BindException var4) { throw new ExportException("Port already in use: " + var2, var4); } catch (IOException var5) { throw new ExportException("Listen failed on port: " + var2, var5); } } else { SecurityManager var6 = System.getSecurityManager(); if (var6 != null) { var6.checkListen(var2); } } }
- 接着调用父类Transport的exportObject()将Target对象存放进ObjectTable中.
- 追溯LiveRef的exportObject()方法, 很容易找到了TCPTransport的exportObject(), 这个方法做的事情就是将上面构造的Target对象暴露出去.
- 此时, 我们已经将RegistryImpl对象创建并起了服务等待客户端的请求.
- 客户端获取服务端Registry代理
-
iHelloService helloService = (iHelloService) Naming.lookup("rmi://127.0.0.1/Hello");
- 从上述代码看起 (CleintDemo中), 很容易追溯到LocateRegistry的getRegistry()方法, 该方法通过传入的host和port构造RemoteRef对象, 并创建了一个本地代理, 这个代理对象其实是RegistryImpl_Stub对象. 这样客户端便有了服务端的RegistryImpl的代理(取决于ignoreStubClasses变量).
private static Registry getRegistry(ParsedNamingURL parsed) throws RemoteException { return LocateRegistry.getRegistry(parsed.host, parsed.port); } //======================================== public static Registry getRegistry(String host, int port, RMIClientSocketFactory csf) throws RemoteException { Registry registry = null;
//获取仓库地址 if (port <= 0) port = Registry.REGISTRY_PORT; if (host == null || host.length() == 0) { // If host is blank (as returned by "file:" URL in 1.0.2 used in // java.rmi.Naming), try to convert to real local host name so // that the RegistryImpl‘s checkAccess will not fail. try { host = java.net.InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { // If that failed, at least try "" (localhost) anyway... host = ""; } } LiveRef liveRef = new LiveRef(new ObjID(ObjID.REGISTRY_ID), new TCPEndpoint(host, port, csf, null), false); RemoteRef ref = (csf == null) ? new UnicastRef(liveRef) : new UnicastRef2(liveRef);
//创建远程代理类, 引用liveRef, 好让动态代理时, 能进行tcp通信 return (Registry) Util.createProxy(RegistryImpl.class, ref, false); } - 但注意此时这个代理其实还没有和服务端的RegistryImpl对象关联, 毕竟是两个VM上面的对象, 这里我们也可以猜测, 代理和远程的Registry对象之间是通过socket消息完成的.
- 调用RegistryImpl_Stub的ref(RemoteRef)对象的newCall()方法, 将RegistryImpl_Stub对象传了进去, 不要忘了构造它的时候我们将服务器的主机端口等信息传了进去, 也就是我们把服务器相关的信息也传进了newCall()方法.
- newCall()方法做的事情简单来看就是建立了跟远程RegistryImpl的Skeleton对象的连接(上面我们说到过服务端通过TCPTransport的exportObject()方法等待着客户端的请求)
public RemoteCall newCall(RemoteObject var1, Operation[] var2, int var3, long var4) throws RemoteException { clientRefLog.log(Log.BRIEF, "get connection"); Connection var6 = this.ref.getChannel().newConnection(); try { clientRefLog.log(Log.VERBOSE, "create call context"); if (clientCallLog.isLoggable(Log.VERBOSE)) { this.logClientCall(var1, var2[var3]); } StreamRemoteCall var7 = new StreamRemoteCall(var6, this.ref.getObjID(), var3, var4); try { this.marshalCustomCallData(var7.getOutputStream()); } catch (IOException var9) { throw new MarshalException("error marshaling custom call data"); } return var7; } catch (RemoteException var10) { this.ref.getChannel().free(var6, false); throw var10; } }
- newCall()方法做的事情简单来看就是建立了跟远程RegistryImpl的Skeleton对象的连接(上面我们说到过服务端通过TCPTransport的exportObject()方法等待着客户端的请求)
- 连接建立之后就是发送请求了, 我们知道客户端终究只是拥有Registry对象的代理, 而不是真正的位于服务端Registry对象本身, 他们位于不同的虚拟机实例之中, 无法直接调用, 必然是通过消息进行交互的.
- 看看super.ref.invoke()做了什么
- 容易追溯到StreamRemoteCall的executeCall()方法. 看似本地调用, 但其实很容易从代码中看出来是通过tcp连接发送消息到服务端, 由服务端解析并且处理调用.
public void invoke(RemoteCall var1) throws Exception { try { clientRefLog.log(Log.VERBOSE, "execute call"); var1.executeCall(); } catch (RemoteException var3) { clientRefLog.log(Log.BRIEF, "exception: ", var3); this.free(var1, false); throw var3; } catch (Error var4) { clientRefLog.log(Log.BRIEF, "error: ", var4); this.free(var1, false); throw var4; } catch (RuntimeException var5) { clientRefLog.log(Log.BRIEF, "exception: ", var5); this.free(var1, false); throw var5; } catch (Exception var6) { clientRefLog.log(Log.BRIEF, "exception: ", var6); this.free(var1, true); throw var6; } }
- 至此, 客户端的查询服务请求发出了.
-
- 服务器接收客户端的服务查询请求并返回给客户端结果
- ######
- 客户端获取通过lookUp()查询获得的客户端HelloServiceImpl的Stub对象.
- 客户端通过Lookup查询获得的是客户端HelloServiceImpl的Stub对象(这一块我们看不到, 因为Skeleton为我们屏蔽了),
- 然后后续的处理仍然是通过HelloServiceImpl_Stub代理对象通过socket网络请求到服务端,
- 通过服务端的HelloServiceImpl_Stub(Skeleton)进行代理, 将请求通过Dispatcher转发到对应的服务端方法获得结果以后再次通过socket把结果返回到客户端.
RPC的基本原理
-
- 要说明, 在RMI Client实施正式的RMI调用前, 它必须通过LocateRegistry或者Naming方式到RMI注册表寻找要调用的RMI注册信息. 找到RMI事务注册信息后, Client会从RMI注册表获取这个RMI Remote Service的Stub信息. 这个过程成功后, RMI Client才能开始正式的调用过程.
- 另外要说明的是RMI Client正式调用过程, 也不是由RMI Client直接访问Remote Service, 而是由客户端获取的Stub作为RMI Client的代理访问Remote Service的代理Skeleton, 如上图所示的顺序. 也就是说真实的请求调用是在Stub-Skeleton之间进行的. Registry并不参与具体的Stub-Skeleton的调用过程, 只负责记录“哪个服务名”使用哪一个Stub, 并在 Remote Client询问它时将这个Stub拿给Client.
以上是关于4_分布式通信框架RMI的主要内容,如果未能解决你的问题,请参考以下文章