线程间通讯-管道输入输出流(源码分析)

Posted 神奇的鸭鸭

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程间通讯-管道输入输出流(源码分析)相关的知识,希望对你有一定的参考价值。

管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要
用于线程之间的数据传输,而传输的媒介为内存。

管道输入/输出流主要包括了如下4种具体实现:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前两种面向字节,而后两种面向字符。

 1public class Piped {
2    public static void main(String[] args) throws IOException {
3        PipedWriter out = new PipedWriter();
4        PipedReader in = new PipedReader();
5        // 将输出流和输入流进行连接,否则在使用时会抛出IOException
6        out.connect(in);
7        Thread printThread = new Thread(new Print(in), "PrintThread");
8        printThread.start();
9        int receive = 0;
10        try {
11            while ((receive = System.in.read()) != -1) {
12                out.write(receive);
13            }
14        } finally {
15            out.close();
16        }
17    }
18
19    static class Print implements Runnable {
20        private PipedReader in;
21        public Print(PipedReader in) {
22            this.in = in;
23        }
24        @Override
25        public void run() {
26            int receive = 0;
27            try {
28                while ((receive = in.read()) != -1) {
29                    System.out.println((char) receive);
30                }
31            } catch (IOException e) {
32            }
33        }
34    }
35}

一般我们都是先定义一个管道输入流对象和管道输出流对象,然后将它们关联起来,建立一条“通道”:

 1// 使用connect()方法
2PipedInputStream pipedInputStream = new PipedInputStream();
3PipedOutputStream pipedOutputStream = new PipedOutputStream();
4try {
5    pipedInputStream.connect(pipedOutputStream);
6} catch (IOException e) {
7    e.printStackTrace();
8}
9// 不使用connect()方法
10PipedInputStream pipedInputStream = new PipedInputStream();
11PipedOutputStream pipedOutputStream = null;
12try {
13    pipedOutputStream = new PipedOutputStream(pipedInputStream);
14} catch (IOException e) {
15    e.printStackTrace();
16}

注意:只能选择其中一个流而不能两个流都调用connect,否则会抛出java.io.IOException: Already connected

1PipedInputStream pipedInputStream = new PipedInputStream();
2PipedOutputStream pipedOutputStream = new PipedOutputStream();
3try {
4    pipedInputStream.connect(pipedOutputStream);
5    pipedOutputStream.connect(pipedInputStream);
6} catch (IOException e) {
7    e.printStackTrace();
8}

下面我们分析建立“管道”的过程中都做了什么。下面截取PipedInputStream和PipedOutPutStream的代码片段:

 1// PipedInputStream ...
2private static final int DEFAULT_PIPE_SIZE = 1024;
3
4protected byte buffer[]; // 将传入数据放置到的循环缓冲区。
5
6public PipedInputStream() {
7    initPipe(DEFAULT_PIPE_SIZE);
8}
9
10private void initPipe(int pipeSize) {
11    if (pipeSize <= 0) {
12        throw new IllegalArgumentException("Pipe Size <= 0");
13    }
14    /*
15     * new一个PipedInputStream对象后,初始化了一个1024字节的数组,
16     * 用于保存管道输出流写入的数据,并且向管道输入流提供数据
17     */

18    buffer = new byte[pipeSize];
19}
20
21// PipedOutputStream 的无参构造方法什么都没做
22public PipedOutputStream() {
23}

下面看connect方法,可以看到PipedInputStream中的connect方法是通过PipedOutputStream的connect方法实现的:

 1// PipedInputStream ...
2
3/*
4 * 代表当从连接的管道输出流接收时,数据的下一个字节将被存储在循环缓冲区中的位置的索引
5 * 当in<0时意味着缓冲数组是空的,当in==out说明缓冲数组已经满了
6 */

7protected int in = -1;
8/* 代表该管道输入流下一个要读取的字节在循环缓冲数组中的位置 */
9protected int out = 0;
10/* 表示该管道输入流是否与管道输出流建立了链接,true为已连接  */
11boolean connected = false;
12
13public void connect(PipedOutputStream src) throws IOException {
14    src.connect(this);
15}
16
17// PipedOutputStream ...
18
19/* 代表与该管道输出流建立了连接的管道输入流PipedInputStream对象 */
20private PipedInputStream sink;
21public synchronized void connect(PipedInputStream snk) throws IOException {
22    if (snk == null) {
23        throw new NullPointerException();
24    } else if (sink != null || snk.connected) {
25        throw new IOException("Already connected");
26    }
27    sink = snk;
28    snk.in = -1;
29    snk.out = 0;
30    snk.connected = true;
31}

PipedOutputStream的connect方法是一个同步方法,需要事先获取PipedOutputStream对象锁,从代码可以看出:

  • 不能既调用PipedInputStream对象的connect方法,又调用PipedOutputStream的connect方法。

  • 一个管道输入流只能对应一个管道输出流。

死锁问题

Java在它的jdk文档中提到不要在一个线程中同时使用PipeInpuStream和PipeOutputStream,这会造成死锁:

 1public static void main(String[] args) {
2    PipedInputStream pipedInputStream = new PipedInputStream();
3    PipedOutputStream pipedOutputStream = new PipedOutputStream();
4    try {
5        pipedInputStream.connect(pipedOutputStream); // 默认一次最多只能写入1024字节
6        byte[] data = new byte[1000];
7        byte[] store = new byte[20];
8        Arrays.fill(data, (byte)1);
9        System.out.println("first writing data"); // 每次写1000字节数据
10        pipedOutputStream.write(data, 0, data.length);
11        System.out.println("finish first writing");
12        int count = 1;
13        while (count < 100) {
14            System.out.println(count + "times read data");
15            pipedInputStream.read(store, 0, store.length); // 每次读20字节
16            System.out.println(count + "times finish reading data");
17            System.out.println((count + 1) + "times write data");
18            pipedOutputStream.write(data); // 每次写1000字节数据
19            System.out.println((count + 1) + "finish writing data");
20            count ++;
21        }
22    } catch (IOException e) {
23        e.printStackTrace();
24    }
25}

输出结果:

1first writing data
2finish first writing
31times read data
41times finish reading data
52times write data

第二次尝试通过管道输出流PipedOutputStream写数据的时候阻塞,同时也无法从管道输入流PipedInputStream读取数据,我们通过源码看下这是怎么发生的:

首先我们看第一次往“管道”写入1000字节数据:

 1public void write(byte b[], int off, int len) throws IOException {
2    /* 如果管道处于毁坏或未连接状态,或者发生 I/O 错误,抛出异常 */
3    if (sink == null) {
4        throw new IOException("Pipe not connected");
5    } else if (b == null) {
6        throw new NullPointerException();
7    } else if ((off < 0) || (off > b.length) || (len < 0) ||
8                ((off + len) > b.length) || ((off + len) < 0)) {
9        throw new IndexOutOfBoundsException();
10    } else if (len == 0) {
11        return;
12    }
13    /* 将数据写入到缓冲区 */
14    sink.receive(b, off, len);
15}

write方法进行了一些校验,可以发现管道输入流是通过与其建立了连接的管道输入流PipedInputStream对象来写入数据的,我们接着看输入流的receive方法:

 1synchronized void receive(byte b[], int off, int len)  throws IOException {
2    checkStateForReceive(); // 检查PipedInputStream状态,如果不正常,则抛出异常
3    writeSide = Thread.currentThread(); // 获取将数据写入管道的线程
4    int bytesToTransfer = len; // 需要接收的数据量
5    while (bytesToTransfer > 0) {
6        if (in == out) // 满,即缓冲区数据已读取完,进行等待
7            awaitSpace();
8        int nextTransferAmount = 0; // 下次要传输的字节数
9        if (out < in) {
10            nextTransferAmount = buffer.length - in; // 下次要传输的字节数赋值(值为缓冲区还剩余的空间)
11        } else if (in < out) {
12            if (in == -1) { // 首次进行初始化
13                in = out = 0;
14                nextTransferAmount = buffer.length - in;
15            } else { // 此时只有out到in的区间位置可用
16                nextTransferAmount = out - in;
17            }
18        }
19        // 如果下次传输的字节数大于需要接收的字节数,这时只需要bytesToTransfer大小的空间即可
20        if (nextTransferAmount > bytesToTransfer)
21            nextTransferAmount = bytesToTransfer;
22        assert(nextTransferAmount > 0);
23        // 将nextTransferAmount个字节从b中复制到缓冲区数组中
24        System.arraycopy(b, off, buffer, in, nextTransferAmount);
25        // 重新计算需要传输的字节数,已经传输了nextTransferAmount字节,所以减去nextTransferAmount
26        bytesToTransfer -= nextTransferAmount;
27        off += nextTransferAmount; // 偏移量向后移动nextTransferAmount位
28        in += nextTransferAmount; // 将in索引后移nextTransferAmount位
29        if (in >= buffer.length) { // 如果in已经超出缓冲区大小,将in置0,从头开始写入
30            in = 0;
31        }
32    }
33}

receive是一个同步方法,也就是说当在一个PipedInputStream对象上调用其receive方法时,该对象所在的线程必须先获取这个PipedInputStream对象的锁,才能进入该方法。,因为我们只有一个线程且第一次调用所以获得了PipedInputStream对象的对象锁。receive方法首先调用checkStateForReceive方法:

1private void checkStateForReceive() throws IOException {
2    if (!connected) { // 没有连接输出流,抛出异常
3        throw new IOException("Pipe not connected");
4    } else if (closedByWriter || closedByReader) { // 如果输入流或输出流关闭,抛出异常
5        throw new IOException("Pipe closed");
6    } else if (readSide != null && !readSide.isAlive()) { // 如果管道输入流所在的线程不为空,且线程已死,抛出异常
7        throw new IOException("Read end dead");
8    }
9}

checkStateForReceive做了一些事前检查,注意这里的readSide != null && !readSide.isAlive()如果管道输入流所在的线程不为空,且线程已死,则抛出异常,由于我们还没有读,所以readSide==null。

检查完成后,将当前线程保存至成员变量writeSide,即管道输出流所在的线程,接下来进行写入:

参数 写入前 写入后
buffer 1024 1024
in -1 1000
out 0 0

接下来进入1times read data,通过管道输入流读取数据,我们代码中读取20个字节数据:

 1// pipedInputStream.read(store, 0, store.length);
2public synchronized int read(byte b[], int off, int len)  throws IOException {
3    /* 参数合法性校验 */
4    if (b == null) {
5        throw new NullPointerException();
6    } else if (off < 0 || len < 0 || len > b.length - off) {
7        throw new IndexOutOfBoundsException();
8    } else if (len == 0) {
9        return 0;
10    }
11
12    /* possibly wait on the first character */
13    int c = read(); // 读取第一个字节
14    if (c < 0) { // 如果到达缓冲区末位,返回-1
15        return -1;
16    }
17    b[off] = (byte) c; // 将读到的数据存入b中
18    int rlen = 1; // 记录读到的字节数
19    while ((in >= 0) && (len > 1)) {
20        // 记录可读的字节数
21        int available;
22
23        if (in > out) { // 计算可读字节数
24            available = Math.min((buffer.length - out), (in - out));
25        } else { // 如果缓冲区已满,计算可读字节数
26            available = buffer.length - out;
27        }
28
29        // A byte is read beforehand outside the loop
30        if (available > (len - 1)) {
31            // 如果可读字节大于len-1,那么读取len个字节
32            available = len - 1;
33        }
34        // 将available个字节从缓冲区中读取b中
35        System.arraycopy(buffer, out, b, off + rlen, available);
36        out += available; // 将偏移量后移available位
37        rlen += available; // 记录读到的字节数
38        len -= available; // ?
39
40        if (out >= buffer.length) { // 如果out已经超出缓冲区范围,将out置为0,从头开始读
41            out = 0;
42        }
43        if (in == out) { // 如果已经读完了,即读到了缓冲区末尾,in赋值-1
44            /* now empty */
45            in = -1;
46        }
47    }
48    return rlen; // 返回实际读到的字节数
49}

管道输入流的read方法也是一个同步方法,因此PipedInputStream对象所在的线程也要先获得PipedInputStream对象的对象锁才能进入,可以看到在读取一个数组大小的数据时,read方法先读取了一个字节的数据:

 1public synchronized int read()  throws IOException {
2    /* 检查PipedInputStream状态,如果不可用,抛出异常 */
3    if (!connected) {
4        throw new IOException("Pipe not connected");
5    } else if (closedByReader) {
6        throw new IOException("Pipe closed");
7    } else if (writeSide != null && !writeSide.isAlive()
8                && !closedByWriter && (in < 0)) {
9        throw new IOException("Write end dead");
10    }
11
12    readSide = Thread.currentThread(); // 获取从管道读取数据的线程
13    int trials = 2; // 蛇皮参数
14    while (in < 0) { // 如果写入缓冲区中的数据已经读完
15        if (closedByWriter) {
16            /* closed by writer, return EOF */
17            return -1;
18        }
19        // 如果写入数据的线程不为null且不活跃且trials<=0,说明管道损坏,抛出异常
20        if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
21            throw new IOException("Pipe broken");
22        }
23        /* might be a writer waiting */
24        notifyAll();  // 通知可能在等待的管道输出流
25        try {
26            wait(1000); // 进入等待
27        } catch (InterruptedException ex) {
28            throw new java.io.InterruptedIOException();
29        }
30    }
31    int ret = buffer[out++] & 0xFF; // 读取下一个字节
32    if (out >= buffer.length) { // 如果out已经超出缓冲区范围,将out置为0
33        out = 0;
34    }
35    if (in == out) { // 如果写入缓冲区中的数据已经读完,in赋值-1
36        /* now empty */
37        in = -1;
38    }
39
40    return ret;
41}

先读一个字节是为了当缓冲区没有数据的时候,输入流所在的线程能够阻塞,并通知可能在等待的管道输出流。当第一次读20字节的数据后,buffer分布如下:

参数 读取前 读取后
buffer 1024 1024
in 1000 1000
out 0 20

再次往里面写入1000字节数据,来到receive方法,由于不能一次性将1000个字节写入循环缓冲数组中,因此第一次循环先写入了24字节数据,buffer分布如下:

参数 读取前 读取后
buffer 1024 1024
in 1000 1024
out 20 20

由于数组前面还有20个字节空间,同时in!=out,进入第二次循环写入20字节数据,buffer分布如下:

参数 读取前 读取后
buffer 1024 1024
in 1024 20
out 20 20

再次进入循环,由于in==out,进入awaitSpace():

 1private void awaitSpace() throws IOException {
2    while (in == out) {
3        checkStateForReceive();
4
5        /* full: kick any waiting readers */
6        notifyAll();
7        try {
8            wait(1000);
9        } catch (InterruptedException ex) {
10            throw new java.io.InterruptedIOException();
11        }
12    }
13}

问题就出在这里,由于当前缓存数组已经写满,所以awaitSpace会唤醒可能在等待的读线程,然后让当前线程wait。但由于只有单个线程在执行,所以不管是否notifyAll或wait后,当前线程继续顺序执行,程序陷入不断的循环中。从而产生死锁问题。

参考资料:

  • 《Java并发编程的艺术》

  •  Java里的管道输入流 PipedInputStream与管道输出流 PipedOutputStream


以上是关于线程间通讯-管道输入输出流(源码分析)的主要内容,如果未能解决你的问题,请参考以下文章

JDK源码阅读之PipedInoutStream与PipedOutputStream

Java多线程-管道流实现线程间通信

Java IO PipedInputStream 和 PipedOutputStream

字节输入流/输出流-----PipedInputStream/PipedOutputStream

Java使用PipedStream管道流通信

java多线程通过管道流实现不同线程之间的通信