RPC深入分析

Posted JudyGirl

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC深入分析相关的知识,希望对你有一定的参考价值。

本文章内容是从奈学教育授课所学~

王雪芬


目录

1.RPC介绍与工作原理

2.RPC调用实现

3.RPC核心功能设计

基础架构图

开发框架:
网关,业务逻辑层,数据访问层
平台 :
服务管理平台:用于管理服务的注册与发现.
请求跟踪平台:用于跟踪服务之间的调用情况,当A服务调用B服务,也要可能B服务会调用A服务,当业务负责的时候无论是定位问题还是看服务之间的关系我们都是非常不方便,所以有请求跟踪平台,会帮我们很好的解决这些问题
日志查询平台:在查日志的时候便捷,例如阿里云的日志平台
配置中心:用于管理的我们的配置,常见的有config,nacos
任务调度平台JOB:调度任务多的时候,我们可以通过任务调度平台进行管理,例如xxl-job
消息中心平台: 在申请资源,异步处理数据的时候非常方便
基础组件
分布式事物中间件:保证我们事物的一致性
数据库中间件:例如分库分表,mycat
存储层
固话KV存储mongdb,关系存储mysql, NewSQL数据ITDB:他集合了kv存储在查询上有满足了关系型数据库, 缓存系统:Redis
MIS支撑
权限管理系统Auth:用于权限管理
用户认证系统SSO:单点登录
整个图

RPC深入分析

当大家看到这张图的时候,有没有思考过RPC在那个地方调用? 那紧接着引出的什么是RPC,RPC启动什么作用?

RPC是什么?
RPC全称是Remote Procedure Call 远程过程调用,首先它是一种协议,是计算机通信协议.
它的好处在于调用远程服务可以像调用本地方法一样简单, 由于调用不是在同一个内存空间,你可以理解为不在同一台机器上,所以在信息沟通必须通过网络才可以,那么设计到网络连接,代理,信息封装,信息解析,信息发送都需要有规范.
当模块没有单独拆出来的时候是这个样子的

RPC深入分析


当某一个某一个模块流量访问大或者业务逻辑复杂的时候,我们需要把某一个模块 拆出来,在画圆圈的地方就是RPC帮我们做的事情.


RPC深入分析


通过图我们来看RPC帮我们做了什么?

1RPC包含组包解包

2RPC包含数据发送和接受

3RPC帮助开发人员提高效率


RPC核心组成是什么?

1. 远程方法对象代理

当你看到核心组成的时候有没有相关为什么会有它,它的作用是什么?需要对象代理帮我们去调用远端的方法

2. 连接管理:当多个线程调用去调用的远程方法,如何管理连接,可以与数据库连接联想一下

3. 序列化和反序列化: 当经过网络层到传输层传输的时候,我们需要把数据转换而二进制,或者二进制转换为对象信息

4. 寻址与负载均衡,当节点是集群部署的时候我们需要找到机器进行通信


RPC调用方式

1 同步调用

RPC深入分析

2 异步调用


RPC深入分析


从图中可以看出同步调用和异步调用的区别

RPC远程调用过程?

远程代理
序列化
网络传输
反序列化

RPC深入分析



假如没有RPC调用实现,应如何实现远程调用?
非常巧的一件事情就是我以前就在没有RPC方式下进行了远程调用,我非常能够理解复杂性,对于拆包和解包还是ip地址等问题, 双方需要规定好协议,然后根据协议进行拆包和解包,如果对包再加一层加密算法,更加麻烦,我简单的介绍我当时的项目是如何来处理的.

1首先应该定义消息的结构,根据消息的结构在序列化和反序列化的时候根据字节在的位置不同来进行表示信息.

RPC深入分析


RPC深入分析


RPC深入分析

消息的连接和发送解包等我是通过netty实现的,所以相对来说还是减少了一点工作量的


1 建立连接通过netty实现,由于我们是充当的服务端,所以只需要提供端口号就行
 /** * 启动UDP服务-王雪芬-2018年7月9日09:12:22 * @throws Exception */ public static void UdpRun() throws Exception { //使用log4j配置文件 LogConfig(); //启动NIO服务,使用NioEventLoopGroup接受处理连接 EventLoopGroup workerGroup = new NioEventLoopGroup(); //DefaultEventExecutorGroup使用它每次都会生成新的线程,所以解决方案制定线程并且是final类型的
final EventExecutorGroup group = new DefaultEventExecutorGroup(16); try { //udp不能使用ServerBootstrap Bootstrap b = new Bootstrap(); //把channel指定类型为NIo形式 b.group(workerGroup).channel(NioDatagramChannel.class)//设置UDP通道 //设置udp的管道工厂 .handler(new ChannelInitializer<NioDatagramChannel>(){ //NioDatagramChannel标志着是UDP格式的 @Override protected void initChannel(NioDatagramChannel ch) throws Exception { // TODO Auto-generated method stub //创建一个执行Handler的容器 ChannelPipeline pipeline = ch.pipeline(); //执行具体的处理器,调用专门用于处理业务的线程EventExecutorGroup pipeline.addLast(group,"handler",new UdpServerHandler());//消息处理器 }
})//初始化处理器 //true / false 多播模式(UDP适用),可以向多个主机发送消息 .option(ChannelOption.SO_BROADCAST, true)// 支持广播 .option(ChannelOption.SO_RCVBUF, 2048 * 1024)// 设置UDP读缓冲区为2M .option(ChannelOption.SO_SNDBUF, 1024 * 1024);// 设置UDP写缓冲区为1M
// 绑定端口,开始接收进来的连接 ,绑定的端口9999 //执行异步操作的时候使用,sync会造成当前线程的阻塞,直到绑定完为止 ChannelFuture f = b.bind(Constants.PORT).sync(); //获取channel通道 channel=f.channel(); log.info("UDP Server 启动!"); // 等待服务器 socket 关闭 。 f.channel().closeFuture().sync(); }finally { // 优雅退出 释放线程池资源 group.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
2 定义协议,简单列出当时使用的属性
 public class GPSbaseinfo { public static final Logger log = Logger.getLogger(GPSbaseinfo.class); ByteBuffer byteBuf; public GPSbaseinfo() { this.byteBuf = ByteBuffer.allocate(1024); } private short Call; //报警标志 private String isnoSpeedcall; //速度报警 private Boolean isnotemperaturecall; //温度报警 private Boolean isnoBatterycall; //电池电量异常报警 private short Status; //状态 private String isnoSport; //设备是静止还是运动 private String isnoBattery; //设备是否充电 private String isnoGps; //gps定位是否正常 private String isnoworktime; // 设备是否在工作时段内 }
3 接收数据
在netty中有channelRead0()用来接收数据,当读取到数据之后做饭反序列化操作转换为bytebufe,然后再去做处理,这是列举的一些代码
 ByteBuf buf = (ByteBuf) packet.copy().content();//读取客户端的消息

byte[] readbytes = new byte[readbyte.length - 2]; //去除0x7e减一,因为是从0开始的,所以再减一System.arraycopy(readbyte, 1, readbytes, 0, readbyte.length - 2);byte[] checkbyte = readbytes;byte[] clientcheck = CheckUtil.CheckClient(readbyte); //取出校验位byte[] sendcheck = new byte[readbytes.length - 1]; //减去头部ox7e,然后最后一个减一System.arraycopy(readbytes, 0, sendcheck, 0, readbytes.length - 1);
byte servercheck = Tast.getXor(sendcheck); //消息头和消息尾进行校验byte[] servercheckbyte = Utilities.getBooleanArray(servercheck);    //得到本身的校验

在回复客户端的时候需要把信息进行序列化操作,转换为字节,然后通过netty的writeAndFlush()方法发送数据
 /**     * 封装数据包 * @return */ public byte[] toData(short plaReplyNO,byte result) { b.clear(); //应答流水号 b.putShort(plaReplyNO); //时间 String date=new SimpleDateFormat("yyyyMMddHHmmss").format(Calendar.getInstance().getTime()); ResponseSerTime = new byte[7]; this.ResponseSerTime= Utilities.str2Bcd(date); b.put(this.ResponseSerTime); //回复结果 b.put(result); return b.array(); }

RPC核心功能剖析?
东老师说的好,仅仅是有上面的功能是不足够的, 从产品的角度出发还有很长的距离
客户端
1 连接管理
2 负载均衡
3 请求路由
4 超时处理
5 健康检查

服务端
1 队列
2 超时丢弃
3 优雅关闭
4 过载保护

1 连接管理

RPC深入分析

在连接管理这方法我们可以在初始化时机和连接数维护,心跳做处理
在初始化连接我们可以在初始化的时候把所以模块都建立连接,也可以在调用的时候进行连接,这两种方式都可以,不同情况使用不同,如果你是网关模块下面有很多服务,如果在初始化的时候就建立连接,会浪费资源,这个时候就适合用在访问的时候建立连接

2 在模块与模块之间访问的时候采用的是同步方式
我们通过数据库连接池举例子,当数据库连接访问多的时候这个时候系统会会抛出连接不够用,这是因为连接池里的连接已经用完, 通过这一点就可以说明线程之间的连接的独立的, 如果不独立,使用一个线程那么数据会混乱,可以会收到对方的数据
那对于RPC来说是否是独立的. 大概我们都没有收到过服务端的连接不够用, 最多是连接超时,也许使我们的ip地址不对,端口没有开放等问题, 我们可以看见图中有阻塞, 这个阻塞不是指的网络阻塞,指的是work线程的阻塞, 如果是同步调用方式, 那么网络的连接是如何管理的呢? 一会再跟大家分析

RPC深入分析


客户端的线程模型采用的是多路复用的方式,客户端把数据放到队列里面,然后多路复用userservice调用,返回数据去唤醒对应等待的线程

RPC深入分析


2负载均衡


.............未完待续,明天继续补充,今天有点累了.....

                                                     


以上是关于RPC深入分析的主要内容,如果未能解决你的问题,请参考以下文章

深入浅出 RPC - 浅出篇

RPC 专题深入理解 RPC 之服务注册与发现篇

稳定性 耗时 监控原因分析-- dubbo rpc 框架 的线程池,io 连接模型. 客户端,服务端

Hbase源码分析:RPC概况

自定义RPC的完整实现---深入理解rpc内部原理

Spark RPC 框架源码分析运行时序