JDK源码阅读之BufferedInputStream

Posted lls101

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK源码阅读之BufferedInputStream相关的知识,希望对你有一定的参考价值。

前言:

  BufferedInputStream 的作用是为另一个输入流添加一些功能,例如,提供“缓冲功能”以及支持“mark()标记”和“reset()重置方法”

BufferedInputStream

  现在看源码分析

技术图片
  1 public
  2 class BufferedInputStream extends FilterInputStream 
  3 
  4     //默认的缓存数组大小
  5     private static int DEFAULT_BUFFER_SIZE = 8192;
  6 
  7     /*
  8                      要分配的最大数组大小。
  9        有些VM会在数组中保留一些头信息。
 10        尝试分配更大的数组可能会导致
 11        OutOfMemoryError:请求的数组大小超过VM限制
 12      */
 13     private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
 14 
 15     //缓存数组,变化对线程可见
 16     protected volatile byte buf[];
 17 
 18     /*  原子更新操作
 19                        与buf数组的volatile关键字共同组成了buf数组的原子更新功能实现
 20      */
 21     private static final
 22         AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
 23         AtomicReferenceFieldUpdater.newUpdater
 24         (BufferedInputStream.class,  byte[].class, "buf");
 25 
 26    //当前缓冲区的有效字节数
 27    //比缓冲区中最后一个有效字节的索引大 1 的索引。此值始终处于 0 到 buf.length 的范围内;
 28         //从 buf[0] 到 buf[count-1] 的元素包含从底层输入流中获取的缓冲输入数据。
 29     protected int count;
 30 
 31    /*
 32     *   缓冲区中的当前位置。这是将从 buf 数组中读取的下一个字符的索引。
 33          此值始终处于 0 到 count 的范围内。如果此值小于 count,则 buf[pos] 将作为下一个输入字节;
 34          如果此值等于 count,则下一次 read 或 skip 操作需要从包含的输入流中读取更多的字节。
 35     */
 36     protected int pos;
 37 
 38     /*
 39      * 最后一次调用 mark 方法时 pos 字段的值。
 40        此值始终处于 -1 到 pos 的范围内。如果输入流中没有被标记的位置,则此字段为 -1。
 41        如果输入流中有被标记的位置,则 buf[markpos] 将用作 reset 操作后的第一个输入字节。
 42        如果 markpos 不是 -1,则从位置 buf[markpos] 到 buf[pos-1] 之间的所有字节都必须保留在缓冲区数组中(尽管对 count、pos 和 markpos 的值进行适当调整后,
 43        这些字节可能移动到缓冲区数组中的其他位置);除非 pos 与 markpos 的差超过 marklimit,否则不能将其丢弃。
 44      */
 45     protected int markpos = -1;
 46 
 47 /*
 48  * 调用 mark 方法后,在后续调用 reset 方法失败之前所允许的最大提前读取量。
 49  * 只要 pos 与 markpos 之差超过 marklimit,就可以通过将 markpos 设置为 -1 来删除该标记。
 50  */
 51     protected int marklimit;
 52 
 53    //获取输入流
 54     private InputStream getInIfOpen() throws IOException 
 55         InputStream input = in;
 56         if (input == null)
 57             throw new IOException("Stream closed");
 58         return input;
 59     
 60 
 61     //获取缓存数组
 62     private byte[] getBufIfOpen() throws IOException 
 63         byte[] buffer = buf;
 64         if (buffer == null)
 65             throw new IOException("Stream closed");
 66         return buffer;
 67     
 68 
 69   //创建默认大小的缓存输入流,包装指定的输入流
 70     //关于装饰器模式即装饰器模式在Java I/O里的应用,请看下一篇博文
 71     public BufferedInputStream(InputStream in) 
 72         this(in, DEFAULT_BUFFER_SIZE);
 73     
 74 
 75   //创建指定大小的缓存输入流,包装指定的输入流
 76     public BufferedInputStream(InputStream in, int size) 
 77         super(in);
 78         if (size <= 0) 
 79             throw new IllegalArgumentException("Buffer size <= 0");
 80         
 81         buf = new byte[size];
 82     
 83 
 84  /*
 85   * fill()函数是整个 BufferedInputStream类的核心所在,我们重点来分析这个函数
 86   */
 87     private void fill() throws IOException 
 88         byte[] buffer = getBufIfOpen();
 89         
 90         if (markpos < 0)
 91             pos = 0;            /* no mark: throw away the buffer */
 92         else if (pos >= buffer.length)
 93             /* no room left in buffer */
 94                 if (markpos > 0)   /* can throw away early part of the buffer */
 95                 int sz = pos - markpos;
 96                 System.arraycopy(buffer, markpos, buffer, 0, sz);
 97                 pos = sz;
 98                 markpos = 0;
 99                 
100          else if (buffer.length >= marklimit) 
101                 markpos = -1;   /* buffer got too big, invalidate mark */
102                 pos = 0;        /* drop buffer contents */
103                
104          else if (buffer.length >= MAX_BUFFER_SIZE) 
105                 throw new OutOfMemoryError("Required array size too large");
106                
107        
108          else             /* grow buffer */
109                 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
110                         pos * 2 : MAX_BUFFER_SIZE;
111                                 if (nsz > marklimit)
112                     nsz = marklimit;
113                 byte nbuf[] = new byte[nsz];
114                 System.arraycopy(buffer, 0, nbuf, 0, pos);
115                 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) 
116                  
117                     // Can‘t replace buf if there was an async close.
118                     // Note: This would need to be changed if fill()
119                     // is ever made accessible to multiple threads.
120                     // But for now, the only way CAS can fail is via close.
121                     // assert buf == null;
122                     throw new IOException("Stream closed");
123                 
124                 buffer = nbuf;
125             
126         
127         
128         count = pos;
129         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
130         if (n > 0)
131             count = n + pos;
132     
133 
134    
135     public synchronized int read() throws IOException 
136         // 若已经读完缓冲区中的数据,则调用fill()从输入流读取下一部分数据来填充缓冲区
137         if (pos >= count) 
138             fill();
139             //如果填充操作执行完后pos>=count,这时证明已经无数据向数组中填充
140             if (pos >= count)
141                 return -1;
142         
143         return getBufIfOpen()[pos++] & 0xff;//保持二进制数据一致性
144     
145 
146     private int read1(byte[] b, int off, int len) throws IOException 
147        
148         int avail = count - pos;
149         if (avail <= 0) 
150             /* 如果请求的长度至少与缓冲区一样大,并且
151                 如果没有标记/重置活动,那么就不要费心去复制
152                 字节进入本地缓冲区。 而是直接从流中读取,这其实是一种加快读取的机制*/
153             if (len >= getBufIfOpen().length && markpos < 0) 
154                 return getInIfOpen().read(b, off, len);
155             
156             // 若已经读完缓冲区中的数据,则调用fill()从输入流读取下一部分数据来填充缓冲区
157             fill();
158             avail = count - pos;
159             //流中已经没有数可读
160             if (avail <= 0) return -1;
161         
162         int cnt = (avail < len) ? avail : len;
163         System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
164         pos += cnt;
165         return cnt;
166     
167 
168     // 将缓冲区中的数据写入到字节数组b中。off是字节数组b的起始位置,len是写入长度
169     public synchronized int read(byte b[], int off, int len)
170         throws IOException
171     
172         getBufIfOpen(); // Check for closed stream
173         if ((off | len | (off + len) | (b.length - (off + len))) < 0) 
174             throw new IndexOutOfBoundsException();
175          else if (len == 0) 
176             return 0;
177         
178 
179         int n = 0;
180         for (;;) //不断循环,直到满足读取的长度再跳出
181             int nread = read1(b, off + n, len - n);
182             if (nread <= 0)
183                 return (n == 0) ? nread : n;
184             n += nread;
185             if (n >= len)
186                 return n;
187             // 如果真的没有可以读取的数据了,此时也会返回
188             InputStream input = in;
189             if (input != null && input.available() <= 0)
190                 return n;
191         
192     
193 
194    
195     public synchronized long skip(long n) throws IOException 
196         getBufIfOpen(); // Check for closed stream
197         if (n <= 0) 
198             return 0;
199         
200         long avail = count - pos;
201 
202         if (avail <= 0) 
203             // If no mark position set then don‘t keep in buffer
204             if (markpos <0)
205                 return getInIfOpen().skip(n);
206 
207             // Fill in buffer to save bytes for reset
208             fill();
209             avail = count - pos;
210             if (avail <= 0)
211                 return 0;
212         
213 
214         long skipped = (avail < n) ? avail : n;
215         pos += skipped;
216         return skipped;
217     
218 
219    
220     public synchronized int available() throws IOException 
221         int n = count - pos;
222         int avail = getInIfOpen().available();
223         return n > (Integer.MAX_VALUE - avail)
224                     ? Integer.MAX_VALUE
225                     : n + avail;
226     
227 
228  
229     public synchronized void mark(int readlimit) 
230         marklimit = readlimit;
231         markpos = pos;
232     
233 
234     public synchronized void reset() throws IOException 
235         getBufIfOpen(); // Cause exception if closed
236         if (markpos < 0)
237             throw new IOException("Resetting to invalid mark");
238         pos = markpos;
239     
240 
241   
242     public boolean markSupported() 
243         return true;
244     
245 
246     
247     public void close() throws IOException 
248         byte[] buffer;
249         while ( (buffer = buf) != null) 
250             if (bufUpdater.compareAndSet(this, buffer, null)) 
251                 InputStream input = in;
252                 in = null;
253                 if (input != null)
254                     input.close();
255                 return;
256             
257             // Else retry in case a new buf was CASed in fill()
258         
259     
260 
View Code

对于fill()函数

技术图片
 1 private void fill() throws IOException 
 2         byte[] buffer = getBufIfOpen();
 3         
 4         if (markpos < 0)
 5             pos = 0;            /* no mark: throw away the buffer */
 6         else if (pos >= buffer.length)
 7             /* no room left in buffer */
 8                 if (markpos > 0)   /* can throw away early part of the buffer */
 9                 int sz = pos - markpos;
10                 System.arraycopy(buffer, markpos, buffer, 0, sz);
11                 pos = sz;
12                 markpos = 0;
13                 
14          else if (buffer.length >= marklimit) 
15                 markpos = -1;   /* buffer got too big, invalidate mark */
16                 pos = 0;        /* drop buffer contents */
17                
18          else if (buffer.length >= MAX_BUFFER_SIZE) 
19                 throw new OutOfMemoryError("Required array size too large");
20                
21          else             /* grow buffer */
22                 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
23                         pos * 2 : MAX_BUFFER_SIZE;
24                 if (nsz > marklimit)
25                     nsz = marklimit;
26                 byte nbuf[] = new byte[nsz];
27                 System.arraycopy(buffer, 0, nbuf, 0, pos);
28                 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) 
29                  
30                     // Can‘t replace buf if there was an async close.
31                     // Note: This would need to be changed if fill()
32                     // is ever made accessible to multiple threads.
33                     // But for now, the only way CAS can fail is via close.
34                     // assert buf == null;
35                     throw new IOException("Stream closed");
36                 
37                 buffer = nbuf;
38             
39         
40         
41         count = pos;
42         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
43         if (n > 0)
44             count = n + pos;
45 
View Code

   现在我们先整体观察一下fill方法,这个方法主要是对缓存数组进行操作,可以看到这个方法存在5条判断分支,并且除了抛出异常外,都会执行最后一段代码,所以这里我们可以将fill方法分解成5个小方法分别进行分析!暂时以fill+序号的方式命名这五个函数。

 一

技术图片
1   private void fill0() throws IOException 
2         byte[] buffer = getBufIfOpen();
3         
4         if (markpos < 0)
5             pos = 0;    
6          count = pos;
7         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
8         if (n > 0)
9             count = n + pos;
View Code

 这种情况指的是我们每次将输入流中的数据读取一部分到缓存数组中,当缓存数组中的数据被读完后,此时也没有进行任何标记,那么我们继续将输入流中的数据读取一部分到缓存数组中,结合read方法,我们通过pos >= count来判断缓存数组中的数据是否已经读完,通过markpos<0来判断是否存在标记!getInIfOpen().read用来向缓存数组中放入数据,最后通过count变量记录放入的数据个数。

技术图片
 1 private void fill1() throws IOException 
 2 if (pos >= buffer.length)
 3             /* no room left in buffer */
 4                 if (markpos > 0)   /* can throw away early part of the buffer */
 5                 int sz = pos - markpos;
 6                 System.arraycopy(buffer, markpos, buffer, 0, sz);
 7                 pos = sz;
 8                 markpos = 0;
 9                
10 count = pos;
11         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
12         if (n > 0)
13             count = n + pos;
14 
View Code

这种情况指的是我们读完了缓存数组中的数据(通过pos >=count判断),数组中也没有多余的空间(通过pos >= buffer.length判断),而缓存中存在标记,我们需要将标记位置到数组结尾的数据保存下来已供读取,如下图:数组下标从0到7.

-1

0  data

1  data

2  data

markpos

3 data

4 data

5  data

6 data

7  data

pos

标记及其后面的 数据都需要保留

-1

0 data

1data

2 data

3 data

4 data

5 data

pos

 

 

getInIfOpen().read方法; 从输入流中读取出“buffer.length - pos”的数据,然后填充到buffer中

 

 

技术图片
 1 private void fill2() throws IOException 
 2 byte[] buffer = getBufIfOpen();
 3 if (buffer.length >= marklimit) 
 4                 markpos = -1;   /* buffer got too big, invalidate mark */
 5                 pos = 0;        /* drop buffer contents */
 6                
 7   count = pos;
 8         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
 9         if (n > 0)
10             count = n + pos;
11     
View Code

 

这种情况指的是读取完buffer中的数据,buffer被标记位置=0,buffer中没有多余的空间,并且buffer.length>=marklimit,这时缓存区太大了,标记无效,重置markpos和pos即可!

技术图片
 1 private void fill3() throws IOException 
 2         byte[] buffer = getBufIfOpen();
 3 
 4 if (markpos >= 0 && pos >= buffer.length)
 5             /* grow buffer */
 6                 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
 7                         pos * 2 : MAX_BUFFER_SIZE;
 8                 if (nsz > marklimit)
 9                     nsz = marklimit;
10                 byte nbuf[] = new byte[nsz];
11                 System.arraycopy(buffer, 0, nbuf, 0, pos);
12                 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) 
13                  
14                  
15                     throw new IOException("Stream closed");
16                 
17                 buffer = nbuf;
18             
View Code

这种情况指的是读取完buffer中的数据,buffer被标记位置=0,buffer中没有多余的空间,并且buffer.length<marklimit,这时我们需要扩充数组,对于

int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?

                        pos * 2 : MAX_BUFFER_SIZE;

这条语句,为方便观察,我们改一下

int nsz=(pos<=MAX_BUFFER_SIZE/2) ?pos*2:MAX_BUFFER_SIZE;

很明显,我们扩充的数组不能超过MAX_BUFFER_SIZE我们现在来考虑一种情况

,如果再执行fill()方法的过程中一直要执行fill3()方法,那么缓存数组就会越来越大,需要的内存越来越多,所以我们需要一个变量来控制数组的增长,而这个变量就是marklimit, 当buffer>=marklimit时,就不再保存markpos的值了

 

以上是关于JDK源码阅读之BufferedInputStream的主要内容,如果未能解决你的问题,请参考以下文章

JDK1.8源码分析02之阅读源码顺序

1.4JDK源码阅读之AbstractStringBuilder

JDK源码阅读之Collection

JDK源码阅读之BufferedInputStream

源码阅读系列JDK 8 ConcurrentHashMap 源码分析之 由transfer引发的bug

1.2JDK源码阅读之Object