线程间通讯-管道输入输出流(源码分析)
Posted 神奇的鸭鸭
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程间通讯-管道输入输出流(源码分析)相关的知识,希望对你有一定的参考价值。
管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要
用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下4种具体实现:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前两种面向字节,而后两种面向字符。
1public class Piped {
2 public static void main(String[] args) throws IOException {
3 PipedWriter out = new PipedWriter();
4 PipedReader in = new PipedReader();
5 // 将输出流和输入流进行连接,否则在使用时会抛出IOException
6 out.connect(in);
7 Thread printThread = new Thread(new Print(in), "PrintThread");
8 printThread.start();
9 int receive = 0;
10 try {
11 while ((receive = System.in.read()) != -1) {
12 out.write(receive);
13 }
14 } finally {
15 out.close();
16 }
17 }
18
19 static class Print implements Runnable {
20 private PipedReader in;
21 public Print(PipedReader in) {
22 this.in = in;
23 }
24 @Override
25 public void run() {
26 int receive = 0;
27 try {
28 while ((receive = in.read()) != -1) {
29 System.out.println((char) receive);
30 }
31 } catch (IOException e) {
32 }
33 }
34 }
35}
一般我们都是先定义一个管道输入流对象和管道输出流对象,然后将它们关联起来,建立一条“通道”:
1// 使用connect()方法
2PipedInputStream pipedInputStream = new PipedInputStream();
3PipedOutputStream pipedOutputStream = new PipedOutputStream();
4try {
5 pipedInputStream.connect(pipedOutputStream);
6} catch (IOException e) {
7 e.printStackTrace();
8}
9// 不使用connect()方法
10PipedInputStream pipedInputStream = new PipedInputStream();
11PipedOutputStream pipedOutputStream = null;
12try {
13 pipedOutputStream = new PipedOutputStream(pipedInputStream);
14} catch (IOException e) {
15 e.printStackTrace();
16}
注意:只能选择其中一个流而不能两个流都调用connect,否则会抛出java.io.IOException: Already connected
:
1PipedInputStream pipedInputStream = new PipedInputStream();
2PipedOutputStream pipedOutputStream = new PipedOutputStream();
3try {
4 pipedInputStream.connect(pipedOutputStream);
5 pipedOutputStream.connect(pipedInputStream);
6} catch (IOException e) {
7 e.printStackTrace();
8}
下面我们分析建立“管道”的过程中都做了什么。下面截取PipedInputStream和PipedOutPutStream的代码片段:
1// PipedInputStream ...
2private static final int DEFAULT_PIPE_SIZE = 1024;
3
4protected byte buffer[]; // 将传入数据放置到的循环缓冲区。
5
6public PipedInputStream() {
7 initPipe(DEFAULT_PIPE_SIZE);
8}
9
10private void initPipe(int pipeSize) {
11 if (pipeSize <= 0) {
12 throw new IllegalArgumentException("Pipe Size <= 0");
13 }
14 /*
15 * new一个PipedInputStream对象后,初始化了一个1024字节的数组,
16 * 用于保存管道输出流写入的数据,并且向管道输入流提供数据
17 */
18 buffer = new byte[pipeSize];
19}
20
21// PipedOutputStream 的无参构造方法什么都没做
22public PipedOutputStream() {
23}
下面看connect方法,可以看到PipedInputStream中的connect方法是通过PipedOutputStream的connect方法实现的:
1// PipedInputStream ...
2
3/*
4 * 代表当从连接的管道输出流接收时,数据的下一个字节将被存储在循环缓冲区中的位置的索引
5 * 当in<0时意味着缓冲数组是空的,当in==out说明缓冲数组已经满了
6 */
7protected int in = -1;
8/* 代表该管道输入流下一个要读取的字节在循环缓冲数组中的位置 */
9protected int out = 0;
10/* 表示该管道输入流是否与管道输出流建立了链接,true为已连接 */
11boolean connected = false;
12
13public void connect(PipedOutputStream src) throws IOException {
14 src.connect(this);
15}
16
17// PipedOutputStream ...
18
19/* 代表与该管道输出流建立了连接的管道输入流PipedInputStream对象 */
20private PipedInputStream sink;
21public synchronized void connect(PipedInputStream snk) throws IOException {
22 if (snk == null) {
23 throw new NullPointerException();
24 } else if (sink != null || snk.connected) {
25 throw new IOException("Already connected");
26 }
27 sink = snk;
28 snk.in = -1;
29 snk.out = 0;
30 snk.connected = true;
31}
PipedOutputStream的connect方法是一个同步方法,需要事先获取PipedOutputStream对象锁,从代码可以看出:
不能既调用PipedInputStream对象的connect方法,又调用PipedOutputStream的connect方法。
一个管道输入流只能对应一个管道输出流。
死锁问题
Java在它的jdk文档中提到不要在一个线程中同时使用PipeInpuStream和PipeOutputStream,这会造成死锁:
1public static void main(String[] args) {
2 PipedInputStream pipedInputStream = new PipedInputStream();
3 PipedOutputStream pipedOutputStream = new PipedOutputStream();
4 try {
5 pipedInputStream.connect(pipedOutputStream); // 默认一次最多只能写入1024字节
6 byte[] data = new byte[1000];
7 byte[] store = new byte[20];
8 Arrays.fill(data, (byte)1);
9 System.out.println("first writing data"); // 每次写1000字节数据
10 pipedOutputStream.write(data, 0, data.length);
11 System.out.println("finish first writing");
12 int count = 1;
13 while (count < 100) {
14 System.out.println(count + "times read data");
15 pipedInputStream.read(store, 0, store.length); // 每次读20字节
16 System.out.println(count + "times finish reading data");
17 System.out.println((count + 1) + "times write data");
18 pipedOutputStream.write(data); // 每次写1000字节数据
19 System.out.println((count + 1) + "finish writing data");
20 count ++;
21 }
22 } catch (IOException e) {
23 e.printStackTrace();
24 }
25}
输出结果:
1first writing data
2finish first writing
31times read data
41times finish reading data
52times write data
第二次尝试通过管道输出流PipedOutputStream写数据的时候阻塞,同时也无法从管道输入流PipedInputStream读取数据,我们通过源码看下这是怎么发生的:
首先我们看第一次往“管道”写入1000字节数据:
1public void write(byte b[], int off, int len) throws IOException {
2 /* 如果管道处于毁坏或未连接状态,或者发生 I/O 错误,抛出异常 */
3 if (sink == null) {
4 throw new IOException("Pipe not connected");
5 } else if (b == null) {
6 throw new NullPointerException();
7 } else if ((off < 0) || (off > b.length) || (len < 0) ||
8 ((off + len) > b.length) || ((off + len) < 0)) {
9 throw new IndexOutOfBoundsException();
10 } else if (len == 0) {
11 return;
12 }
13 /* 将数据写入到缓冲区 */
14 sink.receive(b, off, len);
15}
write方法进行了一些校验,可以发现管道输入流是通过与其建立了连接的管道输入流PipedInputStream对象来写入数据的,我们接着看输入流的receive方法:
1synchronized void receive(byte b[], int off, int len) throws IOException {
2 checkStateForReceive(); // 检查PipedInputStream状态,如果不正常,则抛出异常
3 writeSide = Thread.currentThread(); // 获取将数据写入管道的线程
4 int bytesToTransfer = len; // 需要接收的数据量
5 while (bytesToTransfer > 0) {
6 if (in == out) // 满,即缓冲区数据已读取完,进行等待
7 awaitSpace();
8 int nextTransferAmount = 0; // 下次要传输的字节数
9 if (out < in) {
10 nextTransferAmount = buffer.length - in; // 下次要传输的字节数赋值(值为缓冲区还剩余的空间)
11 } else if (in < out) {
12 if (in == -1) { // 首次进行初始化
13 in = out = 0;
14 nextTransferAmount = buffer.length - in;
15 } else { // 此时只有out到in的区间位置可用
16 nextTransferAmount = out - in;
17 }
18 }
19 // 如果下次传输的字节数大于需要接收的字节数,这时只需要bytesToTransfer大小的空间即可
20 if (nextTransferAmount > bytesToTransfer)
21 nextTransferAmount = bytesToTransfer;
22 assert(nextTransferAmount > 0);
23 // 将nextTransferAmount个字节从b中复制到缓冲区数组中
24 System.arraycopy(b, off, buffer, in, nextTransferAmount);
25 // 重新计算需要传输的字节数,已经传输了nextTransferAmount字节,所以减去nextTransferAmount
26 bytesToTransfer -= nextTransferAmount;
27 off += nextTransferAmount; // 偏移量向后移动nextTransferAmount位
28 in += nextTransferAmount; // 将in索引后移nextTransferAmount位
29 if (in >= buffer.length) { // 如果in已经超出缓冲区大小,将in置0,从头开始写入
30 in = 0;
31 }
32 }
33}
receive是一个同步方法,也就是说当在一个PipedInputStream对象上调用其receive方法时,该对象所在的线程必须先获取这个PipedInputStream对象的锁,才能进入该方法。,因为我们只有一个线程且第一次调用所以获得了PipedInputStream对象的对象锁。receive方法首先调用checkStateForReceive方法:
1private void checkStateForReceive() throws IOException {
2 if (!connected) { // 没有连接输出流,抛出异常
3 throw new IOException("Pipe not connected");
4 } else if (closedByWriter || closedByReader) { // 如果输入流或输出流关闭,抛出异常
5 throw new IOException("Pipe closed");
6 } else if (readSide != null && !readSide.isAlive()) { // 如果管道输入流所在的线程不为空,且线程已死,抛出异常
7 throw new IOException("Read end dead");
8 }
9}
checkStateForReceive做了一些事前检查,注意这里的readSide != null && !readSide.isAlive()如果管道输入流所在的线程不为空,且线程已死,则抛出异常,由于我们还没有读,所以readSide==null。
检查完成后,将当前线程保存至成员变量writeSide,即管道输出流所在的线程,接下来进行写入:
参数 | 写入前 | 写入后 |
---|---|---|
buffer | 1024 | 1024 |
in | -1 | 1000 |
out | 0 | 0 |
接下来进入1times read data
,通过管道输入流读取数据,我们代码中读取20个字节数据:
1// pipedInputStream.read(store, 0, store.length);
2public synchronized int read(byte b[], int off, int len) throws IOException {
3 /* 参数合法性校验 */
4 if (b == null) {
5 throw new NullPointerException();
6 } else if (off < 0 || len < 0 || len > b.length - off) {
7 throw new IndexOutOfBoundsException();
8 } else if (len == 0) {
9 return 0;
10 }
11
12 /* possibly wait on the first character */
13 int c = read(); // 读取第一个字节
14 if (c < 0) { // 如果到达缓冲区末位,返回-1
15 return -1;
16 }
17 b[off] = (byte) c; // 将读到的数据存入b中
18 int rlen = 1; // 记录读到的字节数
19 while ((in >= 0) && (len > 1)) {
20 // 记录可读的字节数
21 int available;
22
23 if (in > out) { // 计算可读字节数
24 available = Math.min((buffer.length - out), (in - out));
25 } else { // 如果缓冲区已满,计算可读字节数
26 available = buffer.length - out;
27 }
28
29 // A byte is read beforehand outside the loop
30 if (available > (len - 1)) {
31 // 如果可读字节大于len-1,那么读取len个字节
32 available = len - 1;
33 }
34 // 将available个字节从缓冲区中读取b中
35 System.arraycopy(buffer, out, b, off + rlen, available);
36 out += available; // 将偏移量后移available位
37 rlen += available; // 记录读到的字节数
38 len -= available; // ?
39
40 if (out >= buffer.length) { // 如果out已经超出缓冲区范围,将out置为0,从头开始读
41 out = 0;
42 }
43 if (in == out) { // 如果已经读完了,即读到了缓冲区末尾,in赋值-1
44 /* now empty */
45 in = -1;
46 }
47 }
48 return rlen; // 返回实际读到的字节数
49}
管道输入流的read方法也是一个同步方法,因此PipedInputStream对象所在的线程也要先获得PipedInputStream对象的对象锁才能进入,可以看到在读取一个数组大小的数据时,read方法先读取了一个字节的数据:
1public synchronized int read() throws IOException {
2 /* 检查PipedInputStream状态,如果不可用,抛出异常 */
3 if (!connected) {
4 throw new IOException("Pipe not connected");
5 } else if (closedByReader) {
6 throw new IOException("Pipe closed");
7 } else if (writeSide != null && !writeSide.isAlive()
8 && !closedByWriter && (in < 0)) {
9 throw new IOException("Write end dead");
10 }
11
12 readSide = Thread.currentThread(); // 获取从管道读取数据的线程
13 int trials = 2; // 蛇皮参数
14 while (in < 0) { // 如果写入缓冲区中的数据已经读完
15 if (closedByWriter) {
16 /* closed by writer, return EOF */
17 return -1;
18 }
19 // 如果写入数据的线程不为null且不活跃且trials<=0,说明管道损坏,抛出异常
20 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
21 throw new IOException("Pipe broken");
22 }
23 /* might be a writer waiting */
24 notifyAll(); // 通知可能在等待的管道输出流
25 try {
26 wait(1000); // 进入等待
27 } catch (InterruptedException ex) {
28 throw new java.io.InterruptedIOException();
29 }
30 }
31 int ret = buffer[out++] & 0xFF; // 读取下一个字节
32 if (out >= buffer.length) { // 如果out已经超出缓冲区范围,将out置为0
33 out = 0;
34 }
35 if (in == out) { // 如果写入缓冲区中的数据已经读完,in赋值-1
36 /* now empty */
37 in = -1;
38 }
39
40 return ret;
41}
先读一个字节是为了当缓冲区没有数据的时候,输入流所在的线程能够阻塞,并通知可能在等待的管道输出流。当第一次读20字节的数据后,buffer分布如下:
参数 | 读取前 | 读取后 |
---|---|---|
buffer | 1024 | 1024 |
in | 1000 | 1000 |
out | 0 | 20 |
再次往里面写入1000字节数据,来到receive方法,由于不能一次性将1000个字节写入循环缓冲数组中,因此第一次循环先写入了24字节数据,buffer分布如下:
参数 | 读取前 | 读取后 |
---|---|---|
buffer | 1024 | 1024 |
in | 1000 | 1024 |
out | 20 | 20 |
由于数组前面还有20个字节空间,同时in!=out,进入第二次循环写入20字节数据,buffer分布如下:
参数 | 读取前 | 读取后 |
---|---|---|
buffer | 1024 | 1024 |
in | 1024 | 20 |
out | 20 | 20 |
再次进入循环,由于in==out,进入awaitSpace():
1private void awaitSpace() throws IOException {
2 while (in == out) {
3 checkStateForReceive();
4
5 /* full: kick any waiting readers */
6 notifyAll();
7 try {
8 wait(1000);
9 } catch (InterruptedException ex) {
10 throw new java.io.InterruptedIOException();
11 }
12 }
13}
问题就出在这里,由于当前缓存数组已经写满,所以awaitSpace会唤醒可能在等待的读线程,然后让当前线程wait。但由于只有单个线程在执行,所以不管是否notifyAll或wait后,当前线程继续顺序执行,程序陷入不断的循环中。从而产生死锁问题。
参考资料:
《Java并发编程的艺术》
Java里的管道输入流 PipedInputStream与管道输出流 PipedOutputStream
以上是关于线程间通讯-管道输入输出流(源码分析)的主要内容,如果未能解决你的问题,请参考以下文章
JDK源码阅读之PipedInoutStream与PipedOutputStream
Java IO PipedInputStream 和 PipedOutputStream