Okio源码解析

Posted zhuliyuan丶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Okio源码解析相关的知识,希望对你有一定的参考价值。

Okio是对java原生io的封装,旨在简化api同时优化io操作性能。接下来我会从下面几个方面介绍

  1. Okio特性概述
  2. 读写流程源码查看
  3. Buffer精华操作
  4. Timeout超时处理

1. Okio特性概述

java已经提供了很多io实现类供我们在不同场景使用,而Okio并不是一种新的io,而是对原生io的一次封装,为的是解决原生io的一些缺陷,下面我们介绍Okio的特性

1.1 api简化

我们知道java的io相关的类非常多,有针对字节和字符的输入输出接口,有实现缓存的Bufferedxxx,以及各种子类实现比如文件的(FileInputStream和FileOutputStream),数据的(DataInputStream和DataOutputStream),对象的(ObjectInputStream和ObjectOutputStream)等等,针对不同的场景我们需要使用不同的类,是非常复杂的。

而Okio简化了这一流程,统一通过Okio这个工厂类初始化,内部是通过重载方法根据传入的参数不同初始化不同的流。

举个板栗

        File file = new File(Environment.getExternalStorageState(), "test");
        Okio.buffer(Okio.sink(file)).writeString("aaaa", Util.UTF_8).close();

由于传入的参数是File,内部是通过FileOutputStream进行io写操作,并且支持链式操作。

1.2 缓存优化

原生的io缓存比较粗暴,Okio在这方面做了很多优化。

以BufferedOutputStream为例,是利用一个长度为8192的byte数组缓存,当要写入数据长度大于缓存最大容量时跳过缓存直接进行io写操作,当写入数据长度大于缓存数组剩余容量的时候先把缓存写入输出流,再将数据写入缓存,其他情况直接写入缓存。

然后原生的输入输出流之间的buffer是没有办法直接建立联系的,输入流中的数据转移到输出流中是:输入buf -> 临时byte数组 -> 输出buf,经历两次拷贝

        //原生
        File file = new File(Environment.getExternalStorageState(), "test");
        try 
            BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
            BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file));
            int count = -1;
            byte[] array = new byte[1024];//临时byte数组
            while ((count = bis.read(array)) != -1) 
                bos.write(array, 0, count);
            
            bis.close();
            bos.close();
         catch (FileNotFoundException e) 
            e.printStackTrace();
         catch (IOException e) 
            e.printStackTrace();
        

okio的话则是用一个Buffer对象去负责缓存,内部维护了一个Segment双向链表,而这个Segment则是真正的数据缓存载体,内部利用一个长度为8192的byte数组去缓存,当要写入的数据超过8192的时候则会创建多个Segment对象在链表中有序存储。当Segment使用完毕的时候又会利用SegmentPool进行回收,减少Segment对象创建。

对于输入流流到输出流有专门的优化,直接将输入流buffer中segment数据转移到输出流的buffer中只有一次数据的操作,相比原生粗暴的buffer做了很多优化。(而实际操作是,如果是整段的segment数据转移则是直接修改指针指到输出流的buffer上,如果只转移输入流Segment部分数据则根据输出Segment能不能装得下,装得下的话则进行数据拷贝,否则拆分输入流Segment为两个然后将要转移数据的Segment指针指向输出流Buffer,总之Okio在这块做了很多优化,这个后面会细说)

1.3 超时操作

原生进行io操作的时候是阻塞式的,没有超时的处理,除非发生异常才会停止,okio增加了超时处理,推出了Timeout机制,提供了同步超时和异步超时处理。

同步超时Timeout:是在每次进行读写前检测是否超时,如果超时则抛出异常,那如果检测完之后操作阻塞了很久是没法停止的,只有等到下一次操作的时候才会抛出异常停止操作。

异步超时AsyncTimeout:是在每次要进行读写操作的时候创建一个AsymcTimeout对象,然后通过一个链表存储,根据即将超时时间排序,快要超时的排在链表头部,然后启动一个Watchdog线程进行检测,优先从链表头部取出AsyncTimeout判断是否超时,超时了的话则调用AsyncTimeout#timeout()方法。okio给用socket创建流提供了默认实现,timeout的时候直接关闭Socket。

1.4 相关类介绍

  • Source:Okio对输入流的抽象接口,提供了read方法将数据读到buffer中
  • Sink:Okio对输出流的抽象接口,提供了write方法将数据写到buffer中
  • BufferedSource和BufferedSink:是Okio对Buffer的接口抽象,分别继承Source和Sink
  • RealBufferedSource:BufferedSource实现类,读操作都是由该类来完成,可以通过Okio工厂获取
  • RealBufferedSink:BufferedSink实现类,写操作都是由该类来完成,可以通过Okio工厂获取
  • Okio:okio的工厂来用来获取buffer实现类和流的实现类
  • Segment:Buffer中存储数据的载体,内部通过一个8K的byte数组存储数据
  • SegmentPool:管理Segment创建和回收,最多存储64K的数据也就是8个Segment,当池中存在Segment的时候会复用减少对象的创建

以读写文件为例,读是先文件内容读到buffer中,然后再从buffer中获取数据,写是先写数据到buffer中,然后将将buffer中的数据写入文件,而buffe中数据是通过一个个Segment存储的,Segment则是通过SegmentPool创建和回收,每个类各司其职。

2. 读写流程源码

2.1 读流程

		File file = new File(Environment.getExternalStorageState(), "test");
    try 
         BufferedSource source = Okio.buffer(Okio.source(file));
         byte[] array = new byte[1024];
         source.read(array);
         source.close();
      catch (Exception e) 
         e.printStackTrace();
     

根据上面的例子我们分几段查看源码

  1. Okio.source()
  2. Okio.buffer()
  3. BufferedSource.read()

2.1.1 Okio.source()

  //Okio#source
	public static Source source(File file) throws FileNotFoundException 
    if (file == null) throw new IllegalArgumentException("file == null");
    return source(new FileInputStream(file));
  

可以看到由于我们传入的是file所以内部实际是通过FileInputStream读文件

  public static Source source(InputStream in) 
    return source(in, new Timeout());
  

  private static Source source(final InputStream in, final Timeout timeout) 
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() 
      @Override public long read(Buffer sink, long byteCount) throws IOException 
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try 
          timeout.throwIfReached();//判断是否超时
          Segment tail = sink.writableSegment(1);//从buffer中拿到一个可写的Segment
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);//byteCount是要读取的数据总量,Segment.SIZE - tail.limit是Segment可以装的数据量,取最小值
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);//然后通过FileInputStream将文件数据读到segment的data中
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
         catch (AssertionError e) 
          if (isandroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        
      

      @Override public void close() throws IOException 
        in.close();
      

      @Override public Timeout timeout() 
        return timeout;
      

      @Override public String toString() 
        return "source(" + in + ")";
      
    ;
  

上面这段就是创建了一个Source实现类,read的话则是先获取buffer中可写的segment,然后用FileInputStream将文件的数据读到segment中。对于Segment和Buffer在下面会说到

2.1.2 Okio.Buffer()

  //Okio#buffer
	public static BufferedSource buffer(Source source) 
    return new RealBufferedSource(source);
  

由于我们传入的是Source所以会创建一个RealBufferedSource实例,Okio也都是通过它来做读操作,我们先来看下这个类

final class RealBufferedSource implements BufferedSource 
  public final Buffer buffer = new Buffer();//buffer对象
  public final Source source;//Source实现类

  RealBufferedSource(Source source) 
    if (source == null) throw new NullPointerException("source == null");
    this.source = source;
  
  
  @Override 
  public int readInt() throws IOException 
    require(4);//最终通过source#read()将数据读到Buffer
    return buffer.readInt();//再从Buffer读取数据
  
  
  @Override 
  public int read(byte[] sink, int offset, int byteCount) throws IOException 
    checkOffsetAndCount(sink.length, offset, byteCount);

    if (buffer.size == 0) 
      long read = source.read(buffer, Segment.SIZE);//先将数据读到Buffer
      if (read == -1) return -1;
    

    int toRead = (int) Math.min(byteCount, buffer.size);
    return buffer.read(sink, offset, toRead);//再从Buffer读取数据
  
  
  @Override 
  public String readString(Charset charset) throws IOException 
    if (charset == null) throw new IllegalArgumentException("charset == null");

    buffer.writeAll(source);//最终通过source#read()将数据读到Buffer
    return buffer.readString(charset);//再从Buffer读取数据
  
  
  //后面省略了很多个read方法

可以看到它先将Okio.buffer(Okio.source(file))传入的Source实现类通过source成员变量存储,然后创建了一个Buffer对象用作缓冲区,而read方法都是一个流程通过source#read()方法将数据写入缓冲区,然后再从缓冲区中读取数据。接下来我们在看下缓冲区Buffer对象

public final class Buffer implements BufferedSource, BufferedSink, Cloneable, ByteChannel 
  @Nullable Segment head;//缓存数据的载体
  long size;//当前缓冲区的数据量单位字节
  public Buffer() 
  
  //下面省略很多读写缓冲区的操作

Buffer是负责管理缓冲区的对象,内部则是通过一个双向环状链表Segment存储数据,然后我们再来看下Segment是如何存储数据的呢

final class Segment 
  static final int SIZE = 8192;//最大存储8K数据
  final byte[] data;//存储数据数组
  int pos;//当前segment数据已经读取到哪了,接下来从pos位置开始读
  int limit;//当前segment数据写到哪了,接下来从limit开始写
  boolean owner;//当前segment是否允许追加数据,也就是能不能多次写入
  Segment next;
  Segment prev;

  Segment() 
    this.data = new byte[SIZE];
    this.owner = true;//默认可以多次写入
    this.shared = false;
  

可以看到segment内部有一个8k的byte数组来存储数据,pos记录的是segment可读数据位置,(pos-1)到0是已经读过的数据,limit是segment可写数据的位置,limit到Segment.SIZE是剩余可写数据量,pos到limit是还未读取的数据。

ok那我们回到本例中的source.read(array)

  @Override public int read(byte[] sink) throws IOException 
    return read(sink, 0, sink.length);
  

  @Override public int read(byte[] sink, int offset, int byteCount) throws IOException 
    checkOffsetAndCount(sink.length, offset, byteCount);//数组相关检查

    if (buffer.size == 0) //如果缓冲区是空的
      long read = source.read(buffer, Segment.SIZE);//将source中的数据读取到buffer中
      if (read == -1) return -1;
    

    int toRead = (int) Math.min(byteCount, buffer.size);
    return buffer.read(sink, offset, toRead);//将数据写入sink也就是byte数组
  

对于source#read我们在2.1.1已经看过就是取出Buffer中的Segment,然后将数据写入。不过细节当时我们没说现在来看下

//source#read()
@Override public long read(Buffer sink, long byteCount) throws IOException 
        try 
          Segment tail = sink.writableSegment(1);//从buffer中拿到一个可写的Segment
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);//byteCount是要读取的数据总量,Segment.SIZE - tail.limit是Segment可写数据量,取最小值
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);//然后通过FileInputStream将文件数据读到segment的data数组中
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;//修改segment可写位置
          sink.size += bytesRead;//修改buffer中数据总量
          return bytesRead;
         catch (AssertionError e) 
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        
      

buffer#writableSegment(1)如何获取Segment的

  Segment writableSegment(int minimumCapacity) 
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

    if (head == null) //如果buffer中没有Segment
      head = SegmentPool.take(); //从SegmentPool中获取
      return head.next = head.prev = head;//维护Buffer内部的Segment双向环状链表
    

    Segment tail = head.prev;//拿到最后一个Segment
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) //如果最后一个Segment可写位置limit+需要的最小容量>Segment.SIZE || 该Segment不支持追加写入
      tail = tail.push(SegmentPool.take()); // 添加一个新的Segment到尾部
    
    return tail;
  

如果Buffer中没有Segment则直接从SegmentPool获取,如果有则获取链表尾部也就是最新的Segment,判断数据是否存的下,存不下的话从SegmentPool获取一个新的插到链表尾部

接下来看下SegmentPool#take如何创建Segment的

final class SegmentPool 
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.最大容量64K
  static @Nullable Segment next;//通过一个单链表存储回收的Segment
  static long byteCount;//当前SegmentPool容量

  private SegmentPool() 
  

  static Segment take() 
    synchronized (SegmentPool.class) 
      if (next != null) //如果当前有回收的Segment
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      
    
    return new Segment(); //否则直接创建
  

  static void recycle(Segment segment) 
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    if (segment.shared) return; //当前Segment如果有共享数据不能回收(这个后面说)
    synchronized (SegmentPool.class) 
      if (byteCount + Segment.SIZE > MAX_SIZE) return; //当前SegmentPool满了的话则不能回收
      byteCount += Segment.SIZE;//容量增加
      segment.next = next;//加到链表中
      segment.pos = segment.limit = 0;//可写和可读位置都清零
      next = segment;
    
  


SegmentPool#take就是看当前的池子中有缓存的Segment的么,有直接使用,没有则创建一个。ok在回到最前面RealBufferedSource#read

@Override 
  public int read(byte[] sink, int offset, int byteCount) throws IOException 
    checkOffsetAndCount(sink.length, offset, byteCount);

    if (buffer.size == 0) 
      long read = source.read(buffer, Segment.SIZE);//先将数据读到Buffer
      if (read == -1) return -1;
    

    int toRead = (int) Math.min(byteCount, buffer.size);
    return buffer.read(sink, offset, toRead);//再从Buffer读取数据
  

第一块source#read(buffer, Segment.SIZE)已经梳理了一遍就是通过FileInputStream将数据读到Buffer的Segment中,然后再来buffer#read将数据读到byte数组中

  @Override public int read(byte[] sink, int offset, int byteCount) 
    checkOffsetAndCount(sink.length, offset, byteCount);

    Segment s = head;//拿到第一个Segment
    if (s == null) return -1;
    int toCopy = Math.min(byteCount, s.limit - s.pos);//判断要读取的数据量,取byteCount和当前segment可读的数据量s.limit - s.pos
    System.arraycopy(s.data, s.pos, sink, offset, toCopy);//将数据拷贝到数组中

    s.pos += toCopy;//移动Segment已经读过的数据指针pos
    size -= toCopy;//当前Buffer容量减去读过数据量

    if (s.pos == s.limit) //如果当前Segment已经读完
      head = s.pop()

以上是关于Okio源码解析的主要内容,如果未能解决你的问题,请参考以下文章

Okio源码分析

13 | Android 高级进阶(源码剖析篇) Square 高效易用的 IO 框架 okio

09 | Android 高级进阶(源码剖析篇) Square 高效易用的 IO 框架 okio

12 | Android 高级进阶(源码剖析篇) Square 高效易用的 IO 框架 okio

10 | Android 高级进阶(源码剖析篇) Square 高效易用的 IO 框架 okio

08 | Android 高级进阶(源码剖析篇) Square 高效易用的 IO 框架 okio