实战RPC是什么?如何实现?

Posted ITSK

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实战RPC是什么?如何实现?相关的知识,希望对你有一定的参考价值。

越努力越幸运!努力是奇迹的别名!哈哈【实战】RPC是什么?如何实现?【实战】RPC是什么?如何实现?

【实战】RPC是什么?如何实现?

纯音乐-钢琴版.mp3 From ITSK 04:05

本篇文章主要分享一下RPC相关知识,包括:RPC是什么?调用过程具体是什么样的以及如何实现?


RPC是什么?


RPC核心组件
完整的RPC框架主要包含4个核心的组件:Client、Server、Client Stub以及Server Stub,具体如下图所示:

【实战】RPC是什么?如何实现?

  • 客户端(Client):服务调用方;
  • 服务端(Server):服务提供者;
  • 客户端存根(Client Stub):存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方;
  • 服务端存根(Server Stub):接收客户端发送过来的消息,将消息解包,并调用本地的方法。

RPC的调用过程?
RPC调用过程如下图:

【实战】RPC是什么?如何实现?


【实战】RPC是什么?如何实现?

  1. 客户端(Client)以本地调用方式调用服务(依赖服务接口,以接口的方式调用)。
  2. 客户端存根(Client Stub)接收到调用请求后,负责将方法、参数等组装成能够进行网络传输的消息体(将消息体对象序列化为二进制)。
  3. 客户端通过Socket将消息发送到服务端。
  4. 服务端存根(Server Stub)收到消息后对消息进行解码(将消息对象反序列化)。
  5. 服务端存根(Server Stub)根据解码结果调用本地的服务(利用反射原理)。
  6. 本地服务执行并将结果返回给服务端存根(Server Stub)。
  7. 服务端存根(Server Stub)将返回结果打包成消息(将结果消息对象序列化)。
  8. 服务端(Server)通过Socket将消息发送到客户端。
  9. 客户端存根(Client Stub)接收到结果消息,并进行解码(将结果消息反序列化)。
  10. 客户端(Client)得到最终结果。


RPC框架调用分类

RPC调用主要分为两种:同步调用和异步调用

1、同步调用:

① 同步调用:客户端调用服务端方法,等待直到服务端返回结果或者超时,再继续自己的操作。

 RPC的同步调用确保请求送达对方并收到对方响应,若没有收到响应,则框架抛出Timeout超时异常。这种情况下,调用方是无法确定调用是成功还是失败的,需要根据业务场景(是否可重入,幂等)选择重试和补偿策略。
②  同步调用在分布式微服务架构中,一个业务的调用会跨N(N≥2)个服务进程,整个调用链路上的同步调用等待的瓶颈会由最慢(或脆弱)的服务决定。比如A-B-C这样一个调用链路,A同步调用B并等待返回结果,B同步调用C并等待返回结果,以此类推,就像一组齿轮链,级级传动,这很容易产生雪崩效应。若C服务挂了,则会导致前面的服务全部因为等待超时而占用大量不必要的线程资源。
③ 对于雪崩效应,常用解决方法有:使用超时策略和熔断器机制
(1)超时策略:在一个服务调用链中,某个服务的故障可能会导致级联故障。调用服务的操作可以配置为执行超时,如果服务未能在这个时间内响应,就回复一个失败消息。
这种策略可能会导致许多并发请求到同一个操作被阻塞,直到超时期限届满。这些阻塞的请求可能会存储关键的系统资源,如内存、线程、数据库连接等。因此,这些资源可能会枯竭,导致需要使用相同的资源系统出现故障。设置较短的超时可能有助于解决这个问题,但是一个请求从发出到收到成功或者失败的消息需要的时间是不确定的。在分布式微服务架构下,我们需要根据成功调用一个服务调用链的平均时间来合理配置服务接口超时时间。
(2)熔断器机制:熔断器的模式使用断路器来检测故障是否已得到解决,防止请求反复尝试执行一个可能会失败的操作,从而减少等待纠正故障的时间,相对于超时策略更加灵活。
2、异步调用:
异步调用:客户端调用服务端方法,不再等待服务端返回,直接继续自己的操作。
RPC的异步调用意味着RPC框架不阻塞调用方线程,调用方不需要立刻拿到返回结果,甚至调用方根本就不关心返回结果。RPC的异步交互场景举例如下图所示:

【实战】RPC是什么?如何实现?

调用方通过服务网关发起调用并等待结果,随后网关派发调用请求给后续服务,其主调用链路为A-B-C,其内部为异步调用,链路上不等待,最后由C返回结果给服务网关。其中,B又依赖两个子服务:B1和B2,B需要B1和B2的返回结果才能发起C调用,因此在支线上B针对B1和B2调用就需要是同步的。
② 使用场景:
在微服务架构下,大部分的服务调用都是同步调用,异步调用的使用场景:上游服务不会实时关注下游服务的调用结果,比如通过异步调用记录日志。通过异步调用在一定程度上可以提升服务性能,但不可滥用,否则会产生不可预料的结果。


基于Java的原生实现

我们基于Java 实现一个简单的RPC调用例子:基本工作流程如下:

① 客户端发起调用请求;

② 将调用的内容序列化后通过网络发给服务端;

③ 服务端接收到调用请求,执行具体服务并获得结果;

④ 将结果序列化后通过网络返回给客户端。

具体代码如下:

服务端的接口:

public interface IUserService { String getUserName(String name);}
public class UserServiceImpl implements IUserService { @Override public String getUserName(String name) { return name; }}

服务端的服务类:

public class RpcServer { //定义一个服务接口注册表:key:接口名 value:对应的接口实现类 private static final HashMap<String, Class<?>> serviceRegistry = new HashMap<String, Class<?>>(); //服务监听的端口 private int port;
public RpcServer(int port) { this.port = port; }
/** * 注册服务 * * @param service 接口的class对象 * @param serviceImpl 接口实现类的class对象 * @return */ public RpcServer registryServices(Class service, Class serviceImpl) { serviceRegistry.put(service.getName(), serviceImpl); return this; }
/** * 处理请求、返回结果 * * @throws IOException */ public void run() throws IOException { //创建并启动监听port端口的服务 ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); System.out.println("start server"); Socket socket = null; ObjectInputStream ois = null; ObjectOutputStream oos = null; try { while (true) { //监听请求该服务端的客户端(阻塞等待) socket = serverSocket.accept(); //读取客户端请求内容:接口名称、方法名、请求参数 ois = new ObjectInputStream(socket.getInputStream()); String serviceName = ois.readUTF(); String methodName = ois.readUTF(); Class<?>[] parameterTypes = (Class<?>[]) ois.readObject(); Object[] arguments = (Object[]) ois.readObject(); //在接口服务注册表中通过接口名获取对应的接口实现类 Class<?> serviceClazz = serviceRegistry.get(serviceName); if (null == serviceClazz) { throw new ClassNotFoundException("接口 serviceName 不存在。。。"); } //通过反射获取接口实现类的方法对象 Method method = serviceClazz.getMethod(methodName, parameterTypes); //通过反射调用服务 Object result = method.invoke(serviceClazz.newInstance(), arguments); //将结果返回给客户端 oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeObject(result); } } catch (Exception e) { e.printStackTrace(); } finally { if (null != socket) { socket.close(); } if (null != oos) { oos.close(); } if (null != ois) { ois.close(); } } }
public static void main(String[] args) throws IOException { new RpcServer(8888).registryServices(IUserService.class, UserServiceImpl.class).run(); }}
服务端的实(使用反射)现步骤如下:
① 服务端写一个接口和一个接口的实现。
② 服务端维护一个map,key为接口的类名,value为接口的实现类。
③ 服务端通过 ServerSocket 接收客户端发送过来的数据(接口的类名,方法名,请求参数)
④ 服务端根据接口的类名和方法名得到对应实现类的方法,通过反射调用服务
⑤ 把处理之后的结果通过 ServerSocket  返回给客户端

客户端的实现代码如下:
public class RpcClientProxy<T> implements InvocationHandler {
//要请求的接口 private Class<T> serviceInterface; //请求的接口的地址:ip+端口 private SocketAddress address;
public RpcClientProxy(Class<T> serviceInterface, String ip, int port) { this.serviceInterface = serviceInterface; address = new InetSocketAddress(ip, port); }
/** * 获取要调用的接口的代理类 * * @return */ public T getClientIntance() { return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, this); }
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream oos = null; ObjectInputStream ois = null;
try { socket = new Socket(); //连接到指定服务端 socket.connect(address); oos = new ObjectOutputStream(socket.getOutputStream()); //将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者 oos.writeUTF(serviceInterface.getName()); oos.writeUTF(method.getName()); oos.writeObject(method.getParameterTypes()); oos.writeObject(args);
//同步阻塞等待服务器返回结果,获取返回结果后直接返回 ois = new ObjectInputStream(socket.getInputStream()); return ois.readObject(); } catch (Exception e) { e.printStackTrace(); } finally { if (null != socket) { socket.close(); } if (null != oos) { oos.close(); } if (null != ois) { ois.close(); } } return null; }
public static void main(String[] args) { RpcClientProxy<IUserService> rpcClientProxy = new RpcClientProxy<IUserService>(IUserService.class, "127.0.0.1", 8888); IUserService userServiceProxy = rpcClientProxy.getClientIntance(); System.out.println(userServiceProxy.getUserName("张三")); }}
客户端(动态代理)实现步骤如下:
① 把服务端的接口copy过来(只copy接口,不copy实现)
② 写一个代理类,调用服务端方法的时候,通过代理类的 invoke 方法把服务需要的 接口类名,方法名,请求参数 通过 Socket 发送给服务端;接收服务端处理的结果。

本文总结到这里,想要交流的小伙伴欢迎评论区留言哦【实战】RPC是什么?如何实现?【实战】RPC是什么?如何实现?【实战】RPC是什么?如何实现?


ITSK

以上是关于实战RPC是什么?如何实现?的主要内容,如果未能解决你的问题,请参考以下文章

服务化实战之 dubbodubboxmotanthriftgrpc等RPC框架选型

Netty实战高性能分布式RPC(Dubbo分布式底层实现)

外观模式Facade-实战dubbo rpc接口

外观模式Facade-实战dubbo rpc接口

什么是RPC以及RPC的简单实现

RabbitMQ之RPC实现