Rpc到底是个啥

Posted James的黑板报

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rpc到底是个啥相关的知识,希望对你有一定的参考价值。

什么是Rpc服务

Rpc全称为Remote Procedure Call,中文名为远程过程调用,Rpc是一种计算机通信协议,他允许像调用本地服务一样调用远程服务,且随着互联网应用的量级越来越大,单台计算机的计算能力有限,需要借助集群的算力来实现,在分布式系统中,各个系统中的服务可以通过Rpc来进行互相调用,我们现在学习的许多分布式组件(hdfs,kafka)等底层都是通过Rpc来进行机器之间的互相通信的

Rpc的原理

在Rpc框架中主要有三个角色的存在: Registry,Provider,Consumer,分别是服务注册中心,服务提供方,服务消费方

Rpc的调用过程

Rpc到底是个啥

在上图示例中,我们可以看到整个Rpc服务的调用过程,调用过程如下

下面我们简单实现一个Rpc例子。

服务间远程调用实现整数求和

首先看看,实现这个逻辑,我们需要规划好哪些呢?

1.各个服务之间的职责2.服务之间通信与传输模型如何实现

现在已经明确了整个过程中需要三个服务:Registry,Provider,Consumer,我们先去实现Provider的逻辑

作为服务提供者,我们需要提供一个整数相加的方法暴露给其他服务,在这里我们定义一个接口,新建一个java工程,在src下新建Provider/Methods包,新建一个接口MethodInterface.java

package Provider.Methods;/** * 服务提供者接口 */public interface MethodInterface { int GetSum(int a,int b);}

接下来去实现这个方法,同级目录下新建一个接口实现类MethodInterfaceImpl.java

package Provider.Methods;/** * 服务提供者接口实现方法 **/public class MethodInterfaceImpl implements MethodInterface { @Override public int GetSum(int a, int b) { return a + b; }}
package Registry;import java.io.IOException;/** * 服务注册中心接口 */public interface ServerRegistry { //启动注册中心 public void start() throws IOException;  //停止注册中心 public void stop(); //注册一个方法类 public void register(Class serviceInterface,Class Impl);}

接下来去实现这个注册中心的方法,以下高能预警

package Registry;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 服务注册中心实现类 **/public class ServerRegistryImpl implements ServerRegistry {// private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static ExecutorService executor = Executors.newCachedThreadPool(); private static final HashMap<String,Class> serviceRegistry = new HashMap<String,Class>(); private static int port; public ServerRegistryImpl(int port) { this.port = port; } @Override public void stop() { executor.shutdown(); } @Override public void start() throws IOException { ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress(port)); System.out.println("当前已经注册的服务"); for (Map.Entry<String, Class> entry : serviceRegistry.entrySet()) { String key = entry.getKey(); String value = entry.getValue().getName(); System.out.println("key=" + key + " value=" + value); } try { while (true) { // 循环监听客户端的tcp连接,接到tcp连接后直接封装为task,由线程池去执行 executor.execute(new ServiceTask(server.accept())); } } finally { server.close(); executor.shutdown(); } } @Override public void register(Class serviceInterface, Class Impl) { // 服务列表注册到注册中心的hash表里 serviceRegistry.put(serviceInterface.getName(),Impl); } // 循环执行服务任务 public static class ServiceTask implements Runnable { Socket client = null; public ServiceTask(Socket client) { this.client = client; } @Override public void run() { // todo 这两个类的方法作用 ObjectInputStream input = null; ObjectOutputStream output = null; try { // 将客户端发送的码流反序列化为对象,反射调用服务实现者,获取执行结果 input = new ObjectInputStream(client.getInputStream()); String serviceName = input.readUTF(); String methodName = input.readUTF(); Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); // 获取注册中心的实体类 Class serviceClass = serviceRegistry.get(serviceName); if (serviceClass == null) { throw new ClassNotFoundException(serviceName + " not found"); } Method method = serviceClass.getMethod(methodName,parameterTypes); Object result = method.invoke(serviceClass.newInstance(),arguments); // 将执行结果反序列化,通过socket发送给客户端 output = new ObjectOutputStream(client.getOutputStream()); output.writeObject(result); } catch (Exception e) { e.printStackTrace(); } finally { if (output != null) { try { output.close(); }catch (IOException e) { e.printStackTrace(); } } if (input != null) { try { input.close(); }catch (IOException e) { e.printStackTrace(); } } if (client != null) { try { client.close(); }catch (IOException e) { e.printStackTrace(); } } } } }}

让我们把目光再回到服务提供者,现在注册中心已经搭建好,可以去注册我们的整数相加的方法了,Provider下新建一个ProviderServer.java文件

package Provider;import Provider.Methods.MethodInterface;import Provider.Methods.MethodInterfaceImpl;import Registry.ServerRegistryImpl;import java.io.IOException;/** * 服务提供者注册方法 **/public class ProviderServer { public static void main(String[] args) throws IOException { ServerRegistryImpl serverRegistry = new ServerRegistryImpl(8088); // 注册一个接口方法,key是接口名,value是接口实现类 serverRegistry.register(MethodInterface.class, MethodInterfaceImpl.class); serverRegistry.start(); }}

现在我们可以启动这个server,可以看到提供者的方法已经注册上去了

Rpc到底是个啥

现在我们去考虑服务消费者端的设计,这里我们要使用一种基于动态代理的设计策略去实现,在src下新建一个Consumer包,在其下新建一个RPCServer.java文件

package Consumer;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.InetSocketAddress;import java.net.Socket;/** * 客户端远程代理对象 **/public class RPCServer<T> { public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) { // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用 return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; try { // 2.创建Socket客户端,根据指定地址连接远程服务提供者 socket = new Socket(); socket.connect(addr); // 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者 output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(serviceInterface.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); // 4.同步阻塞等待服务器返回应答,获取应答后返回 input = new ObjectInputStream(socket.getInputStream()); return input.readObject(); } finally { if (socket != null) { socket.close(); } if (output != null) { output.close(); } if (input != null) { input.close(); } } } }); }}

接下来去实现我们的服务消费者服务端,同样在Consumer下新建一个ConsumerServer.java文件

package Consumer;import Provider.Methods.MethodInterface;import java.net.InetSocketAddress;/** * 服务消费者 **/public class ConsumeServer { public static void main(String[] args) { // 远程调用接口方法 MethodInterface service = RPCServer.getRemoteProxyObj(MethodInterface.class,new InetSocketAddress("localhost",8088)); System.out.println(service.GetSum(1,2)); }}

现在我们执行这个服务消费者的main方法,可以看到,通过Rpc代理简单的封装,我们可以像调用本地方法一样调用远程服务提供者的服务

这就是本文的所有内容,以上,欢迎大家在后台留言讨论哟



以上是关于Rpc到底是个啥的主要内容,如果未能解决你的问题,请参考以下文章

一文带你搞懂RPC到底是个啥

REST到底是个啥

RPC是个啥

吵疯了,Pull Request到底是个啥?

话说Spring 5里的WebFlux到底是个啥?

linux指令 2>&1 到底是个啥