Java -- 如何从 inputStream (socket/socketServer) 中读取未知数量的字节?

Posted

技术标签:

【中文标题】Java -- 如何从 inputStream (socket/socketServer) 中读取未知数量的字节?【英文标题】:Java -- How to read an unknown number of bytes from an inputStream (socket/socketServer)? 【发布时间】:2011-08-07 03:37:53 【问题描述】:

希望使用 inputStream 通过套接字读取一些字节。服务器发送的字节数可能是可变的,客户端事先并不知道字节数组的长度。这如何实现?


byte b[]; 
sock.getInputStream().read(b);

这会导致来自 Net BzEAnSZ 的“可能未初始化错误”。帮助。

【问题讨论】:

你的问题具体和输入流无关,只是你的数组引用初始化失败导致的编译错误b.答案是初始化。 -1 标题不佳的问题。 【参考方案1】:

简单的答案是:

byte b[] = byte[BIG_ENOUGH];
int nosRead = sock.getInputStream().read(b);

BIG_ENOUGH 足够大。


但总的来说,这是一个很大的问题。单个read 调用不能保证返回另一端写入的所有内容。

如果nosRead 的值为BIG_ENOUGH,您的应用程序无法确定是否还有更多字节要到来;另一端可能正好发送了BIG_ENOUGH 字节......或超过BIG_ENOUGH 字节。在前一种情况下,如果您尝试阅读,您的应用程序将(永远)阻塞。在后一种情况下,您的应用程序必须(至少)执行另一个 read 才能获取其余数据。

如果nosRead 的值小于BIG_ENOUGH,您的应用程序仍然不知道。它可能已经收到所有内容,部分数据可能已延迟(由于网络数据包碎片,网络数据包丢失,网络分区等),或者另一端可能在发送数据的过程中被阻塞或崩溃。

最好的答案是,要么您的应用程序需要事先知道需要多少字节,或者应用程序协议需要以某种方式告诉应用程序需要多少字节或何时发送所有字节。

可能的方法是:

应用程序协议使用固定消息大小(不适用于您的示例) 应用协议消息大小在消息头中指定 应用程序协议使用消息结束标记 应用协议不是基于消息的,另一端关闭连接表示结束

如果没有这些策略中的任何一种,您的应用程序就会被猜测,并且可能偶尔会出错。

然后您使用多个读取调用和(可能)多个缓冲区。

【讨论】:

字。在这个以最低 2G RAM 出货的非常便宜的机器的时代,WTF 是否带有 1K 缓冲区?人们需要走出 80 年代,继续他们的生活。手动内存管理的需求已成为过去。 @Joe - 这根本不是存储管理问题。我可以使缓冲区任意大,但仍然有问题。 同意。大多数 JVM 会在 1.3g 或 1.4g 上呕吐,而不管机器上可用的 RAM 是多少,因为它们依赖于地址空间可用的 RAM。然而,现代操作系统的 getmain 例程就是它们,即使要求一小块也可能会得到一大块。所以我总是默认“预先要求一个非常大的块”并保存细节。 @Joe - 你错过了我的意思。即使 JVM 可以请求并获得 无限大 缓冲区,也会有问题。请阅读我的完整答案。【参考方案2】:

这个问题已经 7 年了,但我有一个类似的问题,同时制作了一个 NIO 和 OIO 兼容的系统(客户端和服务器可能是他们想要的任何东西,OIO 或 NIO)。

由于阻塞 InputStreams,因此退出了挑战。

我找到了一种方法,使之成为可能,我想发布它,以帮助有类似问题的人。

在此处使用 DataInputStream 读取动态 sice 的字节数组,可以简单地将其包裹在 socketInputStream 周围。另外,我不想介绍特定的通信协议(比如首先发送字节大小,然后再发送),因为我想让它尽可能简单。首先,我有一个简单的实用程序 Buffer 类,如下所示:

import java.util.ArrayList;
import java.util.List;

public class Buffer 

    private byte[] core;
    private int capacity;

    public Buffer(int size)
        this.capacity = size;
        clear();
    

    public List<Byte> list() 
        final List<Byte> result = new ArrayList<>();
        for(byte b : core) 
            result.add(b);
        

        return result;
    

    public void reallocate(int capacity) 
        this.capacity = capacity;
    

    public void teardown() 
        this.core = null;
    

    public void clear() 
        core = new byte[capacity];
    

    public byte[] array() 
        return core;
    

这个类只存在,因为愚蠢的方式,字节 Java 中的字节自动装箱与这个列表一起工作。在这个例子中,这根本不需要,但我不想在这个解释中遗漏一些东西。

接下来,两个简单的核心方法。其中, StringBuilder 用作“回调”。它将填充已读取的结果并返回读取的字节数。当然,这可能会有所不同。

private int readNext(StringBuilder stringBuilder, Buffer buffer) throws IOException 
    // Attempt to read up to the buffers size
    int read = in.read(buffer.array());
    // If EOF is reached (-1 read)
    // we disconnect, because the
    // other end disconnected.
    if(read == -1) 
        disconnect();
        return -1;
    
    // Add the read byte[] as
    // a String to the stringBuilder.
    stringBuilder.append(new String(buffer.array()).trim());
    buffer.clear();

    return read;


private Optional<String> readBlocking() throws IOException 
    final Buffer buffer = new Buffer(256);
    final StringBuilder stringBuilder = new StringBuilder();
    // This call blocks. Therefor
    // if we continue past this point
    // we WILL have some sort of
    // result. This might be -1, which
    // means, EOF (disconnect.)
    if(readNext(stringBuilder, buffer) == -1) 
        return Optional.empty();
    
    while(in.available() > 0) 
        buffer.reallocate(in.available());
        if(readNext(stringBuilder, buffer) == -1) 
            return Optional.empty();
        
    

    buffer.teardown();

    return Optional.of(stringBuilder.toString());

第一个方法readNext 将使用来自DataInputStream 的byte[] 填充缓冲区并返回以这种方式读取的字节数。

在第二种方法readBlocking中,我利用了阻塞特性,不用担心consumer-producer-problems。只需readBlocking 将阻塞,直到收到新的字节数组。在我们调用这个阻塞方法之前,我们分配一个 Buffer-size。请注意,我在第一次读取后(在 while 循环内)调用了重新分配。这不是必需的。您可以安全地删除此行,代码仍然可以工作。我这样做了,因为我的问题的独特性。

我没有详细解释的两件事是: 1. 在(DataInputStream 和这里唯一的短变量,抱歉) 2. 断开连接(您的断开连接例程)

总而言之,您现在可以这样使用它了:

// The in has to be an attribute, or an parameter to the readBlocking method
DataInputStream in = new DataInputStream(socket.getInputStream());
final Optional<String> rawDataOptional = readBlocking();
rawDataOptional.ifPresent(string -> threadPool.execute(() -> handle(string)));

这将为您提供一种通过套接字(或任何 InputStream)读取任何形状或形式的字节数组的方法。希望这会有所帮助!

【讨论】:

【参考方案3】:

这既是迟到的答案,也是自我宣传,但任何查看此问题的人都可能想看看这里: https://github.com/GregoryConrad/SmartSocket

【讨论】:

【参考方案4】:

无需重新发明***,使用 Apache Commons:

IOUtils.toByteArray(inputStream);

例如,带有错误处理的完整代码:

    public static byte[] readInputStreamToByteArray(InputStream inputStream) 
    if (inputStream == null) 
        // normally, the caller should check for null after getting the InputStream object from a resource
        throw new FileProcessingException("Cannot read from InputStream that is NULL. The resource requested by the caller may not exist or was not looked up correctly.");
    
    try 
        return IOUtils.toByteArray(inputStream);
     catch (IOException e) 
        throw new FileProcessingException("Error reading input stream.", e);
     finally 
        closeStream(inputStream);
    


private static void closeStream(Closeable closeable) 
    try 
        if (closeable != null) 
            closeable.close();
        
     catch (Exception e) 
        throw new FileProcessingException("IO Error closing a stream.", e);
    

FileProcessingException 是您的应用特定的有意义的 RT 异常,它将不间断地传输到您的适当处理程序,而不会污染其间的代码。

【讨论】:

【参考方案5】:

这是一个使用 ByteArrayOutputStream 的简单示例...

        socketInputStream = socket.getInputStream();
        int expectedDataLength = 128; //todo - set accordingly/experiment. Does not have to be precise value.
        ByteArrayOutputStream baos = new ByteArrayOutputStream(expectedDataLength);
        byte[] chunk = new byte[expectedDataLength];
        int numBytesJustRead;
        while((numBytesJustRead = socketInputStream.read(chunk)) != -1) 
            baos.write(chunk, 0, numBytesJustRead);
        
        return baos.toString("UTF-8");

但是,如果服务器没有返回 -1,您将需要以其他方式检测数据的结尾 - 例如,返回的内容可能总是以某个标记(例如,“”)结尾,或者您可以使用 socket.setSoTimeout() 解决。 (提及这一点似乎是一个普遍的问题。)

【讨论】:

expectedDataLength = 128。他说不知道长度。【参考方案6】:

假设发送者在数据结束时关闭流:

ByteArrayOutputStream baos = new ByteArrayOutputStream();

byte[] buf = new byte[4096];
while(true) 
  int n = is.read(buf);
  if( n < 0 ) break;
  baos.write(buf,0,n);


byte data[] = baos.toByteArray();

【讨论】:

假设发送方没有关闭流,如果没有更多字节可用,该方法将阻塞。但答案让我朝着正确的方向前进。 如果发送方发送超过 buf 的容量(即 >4096 字节),这也会导致数组越界异常。【参考方案7】:

将所有输入数据流式传输到输出流。这是工作示例:

    InputStream inputStream = null;
    byte[] tempStorage = new byte[1024];//try to read 1Kb at time
    int bLength;
    try

        ByteArrayOutputStream outputByteArrayStream =  new ByteArrayOutputStream();     
        if (fileName.startsWith("http"))
            inputStream = new URL(fileName).openStream();
        else
            inputStream = new FileInputStream(fileName);            

        while ((bLength = inputStream.read(tempStorage)) != -1) 
                outputByteArrayStream.write(tempStorage, 0, bLength);
        
        outputByteArrayStream.flush();
        //Here is the byte array at the end
        byte[] finalByteArray = outputByteArrayStream.toByteArray();
        outputByteArrayStream.close();
        inputStream.close();
    catch(Exception e)
        e.printStackTrace();
        if (inputStream != null) inputStream.close();
    

【讨论】:

【参考方案8】:

使用BufferedInputStream,并使用available() 方法返回可供读取的字节大小,然后构造一个具有该大小的byte[]。问题解决了。 :)

BufferedInputStream buf = new BufferedInputStream(is);  
int size = buf.available();

【讨论】:

.available() 只给出一个估计值——我不会用这个估计值来构造 byte[]【参考方案9】:

您需要根据需要扩展缓冲区,通过读取字节块,一次读取 1024 个字节,就像我前段时间编写的示例代码一样

    byte[] resultBuff = new byte[0];
    byte[] buff = new byte[1024];
    int k = -1;
    while((k = sock.getInputStream().read(buff, 0, buff.length)) > -1) 
        byte[] tbuff = new byte[resultBuff.length + k]; // temp buffer size = bytes already read + bytes last read
        System.arraycopy(resultBuff, 0, tbuff, 0, resultBuff.length); // copy previous bytes
        System.arraycopy(buff, 0, tbuff, resultBuff.length, k);  // copy current lot
        resultBuff = tbuff; // call the temp buffer as your result buff
    
    System.out.println(resultBuff.length + " bytes read.");
    return resultBuff;

【讨论】:

如果网络饱和(即 k 很小),重新分配可能无效。实际上,即使在每个周期重新分配 k=1024 也是昂贵的。我会预先分配一个更大的块(通常建议的当前大小的两倍)并在其中保留当前位置的偏移量。 我解决的问题是缓冲区分配和扩展策略。决不能解决任何特定于案例的性能问题。我的代码是用来读取小文件的,大多数文件不到 1kb,因此这个数字对我来说是有意义的。通常一个有效的选择是在服务器端会话特定代码中传输的“通常/平均”字节,如果此类缓冲区异常大,并发会话可能会阻塞可用内存。在其他情况下,双倍于那个想法也可能很有效 - 所以这完全取决于情况。 嘿,放轻松,不要把它当作私人的! :) 这确实是一个可靠的代码。 我喜欢你的版本,但有一些变化:` int available = resIn.available();字节[] buff = 新字节[可用]; ByteArrayOutputStream bao = new ByteArrayOutputStream(available); int bytesRead = -1; while ((bytesRead = resIn.read(buff, 0, buff.length)) > -1) bao.write(buff, 0, bytesRead); `【参考方案10】:

读取一个 int,它是正在接收的下一段数据的大小。创建一个具有该大小的缓冲区,或使用一个宽敞的预先存在的缓冲区。读入缓冲区,确保它被限制在前面的大小。冲洗并重复:)

如果您真的如您所说的那样事先不知道大小,请阅读其他答案提到的扩展 ByteArrayOutputStream。但是,size 方法确实是最可靠的。

【讨论】:

请记住,从远程端传递的大小必须经过验证。恶意用户或远程软件中的错误可能会导致您分配 1G 的 RAM 作为缓冲区,然后立即进入 OOM。 我想如果你验证然后结果是无效的,唯一的选择就是关闭流,让对方重新初始化一个新的流。【参考方案11】:

要么:

    让发送方在传输完字节后关闭套接字。然后在接收端继续阅读直到 EOS。

    按照 Chris 的建议让发件人在一个长度字前面加上一个前缀,然后读取那么多字节。

    使用自描述协议,例如 XML、序列化等……

【讨论】:

从帖子的上下文来看,@farm ostrich 已经假设流将被关闭。他的问题是缓冲区分配。 @road to yamburg:他没有这么说,而且实际上根本不需要大小完全正确的字节数组:只需流式传输数据,如另一个答案所示,或者根据您的使用 ByteArrayOutputStream 。这不是一个大问题。

以上是关于Java -- 如何从 inputStream (socket/socketServer) 中读取未知数量的字节?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Java 8 Streams 中获取 Inputstream?

如何从 Java 中的 InputStream 打开 MS Access 文件?

Java -- 如何从 inputStream (socket/socketServer) 中读取未知数量的字节?

如何在Java中使用InputStream从目录中获取文件名[关闭]

如何从字符串创建 InputStream? [复制]

如何从String创建InputStream? [重复]