Android源码解析RPC系列(一)---Binder原理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android源码解析RPC系列(一)---Binder原理相关的知识,希望对你有一定的参考价值。

参考技术A

看了几天的Binder,决定有必要写一篇博客,记录一下学习成果,Binder是android中比较综合的一块知识了,目前的理解只限于JAVA层。首先Binder是干嘛用的?不用说,跨进程通信全靠它,操作系统的不同进程之间,数据不共享,对于每个进程来说,它都天真地以为自己独享了整个系统,完全不知道其他进程的存在,进程之间需要通信需要某种系统机制才能完成,在Android整个系统架构中,采用了大量的C/S架构的思想,所以Binder的作用就显得非常重要了,但是这种机制为什么是Binder呢?在Linux中的RPC方式有管道,消息队列,共享内存等,消息队列和管道采用存储-转发方式,即数据先从发送方缓存区拷贝到内核开辟的缓存区中,然后再从内核缓存区拷贝到接收方缓存区,这样就有两次拷贝过程。共享内存不需要拷贝,但控制复杂,难以使用。Binder是个折中的方案,只需要拷贝一次就行了。其次Binder的安全性比较好,好在哪里,在下还不是很清楚,基于安全性和传输的效率考虑,选择了Binder。Binder的英文意思是粘结剂,Binder对象是一个可以跨进程引用的对象,它的实体位于一个进程中,这个进程一般是Server端,该对象提供了一套方法用以实现对服务的请求,而它的引用却遍布于系统的各个进程(Client端)之中,这样Client通过Binder的引用访问Server,所以说,Binder就像胶水一样,把系统各个进程粘结在一起了,废话确实有点多。

为了从而保障了系统的安全和稳定,整个系统被划分成内核空间和用户空间
内核空间:独立于普通的应用程序,可以访问受保护的内存空间,有访问底层硬件设备的所有权限。
用户空间:相对与内核空间,上层运用程序所运行的空间就是用户空间,用户空间访问内核空间的唯一方式就是系统调用。一个4G的虚拟地址空间,其中3G是用户空间,剩余的1G是内核空间。如果一个用户空间想与另外一个用户空间进行通信,就需要内核模块支持,这个运行在内核空间的,负责各个用户进程通过Binder通信的内核模块叫做Binder驱动,虽然叫做Binder驱动,但是和硬件并没有什么关系,只是实现方式和设备驱动程序是一样的,提供了一些标准文件操作。

在写AIDL的时候,一般情况下,我们有两个进程,一个作为Server端提供某种服务,然后另外一个进程作为Client端,连接Server端之后,就 可以使用Server里面定义的服务。这种思想是一种典型的C/S的思想。值得注意的是Android系统中的Binder自身也是C/S的架构,也有Server端与Client端。一个大的C/S架构中,也有一个小的C/S架构。

先笼统的说一下,在整个Binder框架中,由系列组件组成,分别是Client、Server、ServiceManager和Binder驱动程序,其中Client、Server和ServiceManager运行在用户空间,Binder驱动程序运行内核空间。运行在用户空间中的Client、Server和ServiceManager,是在三个不同进程中的,Server进程中中定义了服务提供给Client进程使用,并且Server中有一个Binder实体,但是Server中定义的服务并不能直接被Client使用,它需要向ServiceManager注册,然后Client要用服务的时候,直接向ServiceManager要,ServiceManager返回一个Binder的替身(引用)给Client,这样Client就可以调用Server中的服务了。

场景 :进程A要调用进程B里面的一个draw方法处理图片。

分析 :在这种场景下,进程A作为Client端,进程B做为Server端,但是A/B不在同一个进程中,怎么来调用B进程的draw方法呢,首先进程B作为Server端创建了Binder实体,为其取一个字符形式,可读易记的名字,并将这个Binder连同名字以数据包的形式通过Binder驱动发送给ServiceManager,也就是向ServiceManager注册的过程,告诉ServiceManager,我是进程B,拥有图像处理的功能,ServiceManager从数据包中取出名字和引用以一个注册表的形式保留了Server进程的注册信息。为什么是以数据包的形式呢,因为这是两个进程,直接传递对象是不行滴,只能是一些描述信息。现在Client端进程A联系ServiceManager,说现在我需要进程B中图像处理的功能,ServiceManager从注册表中查到了这个Binder实体,但是呢,它并不是直接把这个Binder实体直接给Client,而是给了一个Binder实体的代理,或者说是引用,Client通过Binder的引用访问Server。分析到现在,有个关键的问题需要说一下,ServiceManager是一个进程,Server是另一个进程,Server向ServiceManager注册Binder必然会涉及进程间通信。当前实现的是进程间通信却又要用到进程间通信,这就好象蛋可以孵出鸡前提却是要找只鸡来孵蛋,确实是这样的,ServiceManager中预先有了一个自己的Binder对象(实体),就是那只鸡,然后Server有个Binder对象的引用,就是那个蛋,Server需要通过这个Binder的引用来实现Binder的注册。鸡就一只,蛋有很多,ServiceManager进程的Binder对象(实体)仅有一个,其他进程所拥有的全部都是它的代理。同样一个Server端Binder实体也应该只有一个,对应所有Client端全部都是它的代理。

我们再次理解一下Binder是什么?在Binder通信模型的四个角色里面;他们的代表都是“Binder”,一个Binder对象就代表了所有,包括了Server,Client,ServiceManager,这样,对于Binder通信的使用者而言,不用关心实现的细节。对Server来说,Binder指的是Binder实体,或者说是本地对象,对于Client来说,Binder指的是Binder代理对象,也就是Binder的引用。对于Binder驱动而言,在Binder对象进行跨进程传递的时候,Binder驱动会自动完成这两种类型的转换。

简单的总结一下,通过上面一大段的分析,一个Server在使用的时候需要经历三个阶段

1、定义一个AIDL文件
Game.aidl

GameManager .aidl

2、定义远端服务Service
在远程服务中的onBind方法,实现AIDL接口的具体方法,并且返回Binder对象

3、本地创建连接对象

以上就是一个远端服务的一般套路,如果是在两个进程中,就可以进程通信了,现在我们分析一下,这个通信的流程。重点是GameManager这个编译生成的类。

从类的关系来看,首先接口GameManager 继承 IInterface ,IInterface是一个接口,在GameManager内部有一个内部类Stub,Stub继承了Binder,(Binder实现了IBinder),并且实现了GameManager接口,在Stub中还有一个内部类Proxy,Proxy也实现了GameManager接口,一个整体的结构是这样的

现在的问题是,Stub是什么?Proxy又是什么?在上面说了在Binder通信模型的四个角色里面;他们的代表都是“Binder”,一个Binder对象就代表了所有,包括了Server,Clinet,ServiceManager,为了两个进程的通信,系统给予的内核支持是Binder,在抽象一点的说,Binder是系统开辟的一块内存空间,两个进程往这块空间里面读写数据就行了,Stub从Binder中读数据,Proxy向Binder中写数据,达到进程间通信的目的。首先我们分析Stub。

Stub 类继承了Binder ,说明了Stub有了跨进程传输的能力,实现了GameManager接口,说明它有了根据游戏ID查询一个游戏的能力。我们在bind一个Service之后,在onServiceConnecttion的回调里面,就是通过asInterface方法拿到一个远程的service的。

asInterface调用queryLocalInterface。

mDescriptor,mOwner其实是Binder的成员变量,Stub继承了Binder,在构造函数的时候,对着两个变量赋的值。

如果客户端和服务端是在一个进程中,那么其实queryLocalInterface获取的就是Stub对象,如果不在一个进程queryLocalInterface查询的对象肯定为null,因为不同进程有不同虚拟机,肯定查不到mOwner对象的,所以这时候其实是返回的Proxy对象了。拿到Stub对象后,通常在onServiceConnected中,就把这个对象转换成我们多定义AIDL接口。

比如我们这里会转换成GameManager,有了GameManager对象,就可以调用后querryGameById方法了。如果是一个进程,那直接调用的是自己的querryGameById方法,如果不是一个进程,那调用了就是代理的querryGameById方法了。

看到其中关键的一行是

mRemote就是一个IBinder对象,相对于Stub,Proxy 是组合关系(HAS-A),内部有一个IBinder对象mRemote,Stub是继承关系(IS-A),直接实现了IBinder接口。

transact是个native方法,最终还会回掉JAVA层的onTransact方法。

onTransact根据调用号(每个AIDL函数都有一个编号,在跨进程的时候,不会传递函数,而是传递编号指明调用哪个函数)调用相关函数;在这个例子里面,调用了Binder本地对象的querryGameById方法;这个方法将结果返回给驱动,驱动唤醒挂起的Client进程里面的线程并将结果返回。于是一次跨进程调用就完成了。

***Please accept mybest wishes for your happiness and success ! ***

Hbase源码系列之scan源码解析及调优

一,hbasescan基本使用问题介绍

HbaseScan方法是基于Rowkey进行数据扫描的,过程中client会将我们的请求,转化为向服务端的RPC请求。那么这个时候我们可以考虑的优化,那么主要有一下三点:

A,减少带宽(通过过滤器减少无用数据的 传输);

B,减少RPC请求的次数;

C,加缓存。

具体的转化为scan相关的操作如下:

1scan可以设置过滤器

过滤器可以减少数据网络传输的数据量。

过滤器可以用来扫描ROWkey不连续的数据。

2scan可以设置每批次的扫描行数

Scan.setCaching(20);设置一个批次应该请求几行数据。几个Result

3scan可以设置每批次扫描的列数。

Scan.setBatch(1);设置每一行请求几列的数据。一个Result几个cell

通过23可以减少RPC请求的次数,这样可以提升扫描性能,但是也会带来GC的风险。

重要的计算公式:

Rpc次数=(行数×每行的列数)/Min(每行的列数,批量大小)/扫描器缓存

合理设置2,3可以降低RPC请求次数,提升性能。

4,对于一次扫描,频繁使用的数据呢可以设置缓存。

Scan.setCacheBlocks(false);不建议使用。

5scan占用内存

Scan的并发数*cache*单个Result大小。合理使用2,3.

往往,为了不产生热点问题,会将Rowkey离散化,然后造成扫描不连续,这个时候每次扫描就会转化为n多个scan并发进行。此时就要考虑内存压力,防止GC,导致不正常的程序退出。

 

二,源码相关的重要类

1HTable

重点是Hbase类初始化的时候提交的请求的任务的线程池的初始化

HConnectionImplementationgetThreadPool方法。

getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
    conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);

具体内容如下:

private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
    BlockingQueue<Runnable> passedWorkQueue) {
  // shared HTable thread executor not yet initialized
  if (maxThreads == 0) {
    maxThreads = Runtime.getRuntime().availableProcessors() * 8;
  }
  if (coreThreads == 0) {
    coreThreads = Runtime.getRuntime().availableProcessors() * 8;
  }
  long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
  //设置阻塞任务队列
  BlockingQueue<Runnable> workQueue = passedWorkQueue;
  if (workQueue == null) {
    workQueue =
      new LinkedBlockingQueue<Runnable>(maxThreads *
          conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
  }
  ThreadPoolExecutor tpe = new ThreadPoolExecutor(
      coreThreads, //核心线程数
      maxThreads,//最大线程数
      keepAliveTime,//线程池中超过corePoolSize数目的空闲线程最大存活时间;
      TimeUnit.SECONDS,//keepAliveTime时间单位
      workQueue,//阻塞任务队列
      Threads.newDaemonThreadFactory(toString() + nameHint));//新建线程工厂
  tpe.allowCoreThreadTimeOut(true);//allowCoreThreadTimeOut为true该值为true,则线程池数量最后销毁到0个。
  //allowCoreThreadTimeOut为false销毁机制:超过核心线程数时,而且(超过最大值或者timeout过),就会销毁。
  return tpe;
}

2ClientSimpleScanner

继承关系如下:

class ClientSimpleScanner extends ClientScanner

abstract class ClientScanner extends AbstractClientScanner

abstract class AbstractClientScanner implements ResultScanner

主要是实现了scan的动作

3HRegionServer

为客户端提供HRegions,并向master注册自身,一个集群中会有很多个HRegionServer.

在此篇文章中主要关注点

createClusterConnection方法中,注册了,客户端RPC的时候服务端的实现类对应关系。

ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
  serverName, rpcServices, rpcServices);

客户端使用的是BlockingStub,服务端使用的是RSRpcServices。共同的父类是ClientService.BlockingInterFace

4RSRpcServices

实现了regionserverRPC服务。

本文只关注scan的方法实现。

5RegionScannerImpl

负责合并多个storescanner。内部维护的有KeyValueHeap,维护了一个优先队列存放的是StoreScanner

6StoreScanner

可以扫描memstorestore中的数据。

重要的内部成员KeyValueHeap,内部的优先队列,维护的是MemStoreScannerStoreFileScanner

7KeyValueHeap

内部维护了一个PriorityQueue<KeyValueScanner>队列,存储的就是InternalScannerStoreScannerInternalScanner的子类。

三,scan的源码实现

Scan的源码实现过程,主要是帮助大家更好的阅读源码。

主要分成两个大节:

A,客户端scan的过程

B),服务端scan的过程

由于源码内容比较多,本文只会贴出讲解重点环节的源码。

1hbase scan过程客户端的实现

入口是HTablegetScan方法

t.getScanner(s)

实际上是构建了一个

ClientSimpleScanner

数据读的入口是ClientSimpleScanner的父类ClientScannernext方法。

//cache读取结束的时候直接进入rpc
if (cache.size() == 0) {
  loadCache();
}
//假如cache有的话直接读取,
if (cache.size() > 0) {
  return cache.poll();
}

loadCache方法里面实际上直接调用的是,自身的call方法获取结果

values = call(callable, caller, scannerTimeout, true);

经过一层封装,调用的是ScannerCallableWithReplicascall

提交并执行call

// submit call for the primary replica.
addCallsForCurrentReplica(cs, rl); //提交call

执行的callerScannerCallable,被封装成了QueueingFuture,扔到线程池里执行

public void submit(RetryingCallable<V> task, int callTimeout, int id) {
  QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout, id);
  executor.execute(Trace.wrap(newFuture));
  tasks[id] = newFuture;
}

进入ScannerCallablecall方法之前实际上是会先调用其prepare方法(QueueingFuture),主要做一些准备工作

RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
    id, getConnection(), getTableName(), getRow());
location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (location == null || location.getServerName() == null) {
  // With this exception, there will be a retry. The location can be null for a replica
  //  when the table is created or after a split.
  throw new HBaseIOException("There is no location for replica id #" + id);
}
ServerName dest = location.getServerName();
setStub(super.getConnection().getClient(dest));
if (!instantiated || reload) {
  checkIfRegionServerIsRemote();
  instantiated = true;
}

然后正式进入call

ScanResponse response;
if (this.scannerId == -1L) {
  //第一次
  response = openScanner(); //
} else {
//第一次后
  response = next();//进行下一次rpc
}

openScannernext方法中,在构建不同的Request之后其它处理都是一样的

ScanResponse response = getStub().scan(controller, request);//blockingstub

然后就进入了server端并获取ScanResponse

 

2hbase scan服务端的实现

Hbase scan的客户端发送Rpc请求之后,进入服务端RSRpcServices对应的scan方法

ScanResponse scan(final RpcController controller, final ScanRequest request)

 

首先,会判断是否已经构建了scannerid也即是否是第一次请求

if (request.hasScannerId()) {

//不是第一次,直解获取
  rsh = getRegionScanner(request);
} else {

//第一次进行构建
  rsh = newRegionScanner(request, builder);
}

然后进入真正的scan

scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
  results, builder, lastBlock, context);

接着在scan方法中,也对行数是否超过请求行数做了限制

// Collect values to be returned here
moreRows = scanner.nextRaw(values, scannerContext);

checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
 builder);


此处的scannerRegionScannerImpl,然后进入其nextInternal方法,初始化或者清空处理函数,这个是记录状态,比如当前batch大小,结果的size等。

if (scannerContext.getKeepProgress()) {
  // Progress should be kept. Reset to initial values seen at start of method invocation.
  scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
    initialTimeProgress);
} else {
  scannerContext.clearProgress();
}

然后重点的方法是populateResult方法

populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);

具体重点内容

scannerContext.setKeepProgress(true);
heap.next(results, scannerContext);
scannerContext.setKeepProgress(tmpKeepProgress);

 

进入KeyValueHeapnext方法,

InternalScanner currentAsInternal = (InternalScanner)this.current;
boolean moreCells = currentAsInternal.next(result, scannerContext);

 

进入StoreScannernext方法

当前rowkey获取去的列数加1,然后判断取出的当前列簇的总列数是否超过设置的大小列。

this.countPerRow++;
if (storeLimit > -1 &&
    this.countPerRow > (storeLimit + storeOffset)) {

在正常范围内,将结果cell加入返回的列表

if (this.countPerRow > storeOffset) {
  outResult.add(cell);

更新已经获取的当前Rowkeybatch大小和结果的size

// Update the progress of the scanner context    scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
//            更新当前获取的batch
            scannerContext.incrementBatchProgress(1);

判断batchsize是否超限,超限的话切入下一个RowkeyBatch参数起作用的地方。

if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
  break LOOP;
}
if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
  break LOOP;
}

我们这个scan就讲解到这个地方。

其实,应该关注点比较多,贴源码比较累赘,我这是大致骨架都有了,大家可以根据这个骨架结合源码去看,节省时间。

四,总结

1,对Scanner嵌套关系的总结

ARegionScannerImpl包含了内部维护的有KeyValueHeap,维护了一个优先队列存放的是StoreScanner

BStoreScanner的重要的内部成员KeyValueHeap,内部的优先队列,维护的是MemStoreScannerStoreFileScanner

C,获取数据首先是从RegionScannerImpl的队列中取出,StoreScanner。然后从StoreScanner中取出一个MemStoreScannerStoreFileScanner,然后调用其next方法,将结果放入返回的list中。

2,对于filter的使用,请大家先参考hbase权威指南,后面浪尖再接个各个filter和源码讲解。

3,对于Rpc数据请求次数调节

Scan.setCaching(20);//控制一次rpc返回几行,即几个Result

Scan.setBatch(1);//控制一次rpc返回几列,即几个cell

Rpc次数=(行数×每行的列数)/Min(每行的列数,批量大小)/扫描器缓存

4,对于缓存。

Scan.setCacheBlocks(false);不建议使用。

5,对于客户端的内存占用

Scan的并发数*cache(Result)*单个Result大小。

注意,要结合35进行调节,既要避免频繁的RPC请求,又要避免客户端GC。要求是要了解每条数据的大致大小。

 

本文中设计到另一个重点就是RPCRPC是想在spark前夕讲解,这里希望读者假如不了解的话可以去网上先了解一下。重点要记住的几个点是客户端的方法和服务端的方法实现,及是如何对应的。

对于hbase 1.0.0,列举以下几种,方便大家自己去阅读相关源码。无论是管理,还是数据请求,最终的regionserver相关的RPC实现都是RSRpcServices类里面对应的具体方法(参数要一致哦)

Client方法

RSRpcServices方法

multi

multi

execRegionServerService

execRegionServerService

execService

execService

bulkLoadHFile

bulkLoadHFile

scan

scan

mutate

mutate

get

get

 


以上是关于Android源码解析RPC系列(一)---Binder原理的主要内容,如果未能解决你的问题,请参考以下文章

微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端

Android 常用开源框架源码解析 系列 Rxjava 异步框架

谈谈-Android-PickerView系列之源码解析

Android源码解析Window系列第(一)篇---Window的基本认识和Activity的加载流程

Android 常用开源框架源码解析 系列 picasso 图片框架

Hadoop RPC远程过程调用源码解析及实例