如何从 Java 8 Streams 中获取 Inputstream?

Posted

技术标签:

【中文标题】如何从 Java 8 Streams 中获取 Inputstream?【英文标题】:How to obtain Inputstream from Java 8 Streams? 【发布时间】:2019-09-06 15:51:27 【问题描述】:

我有一些从不同文件流式传输的数据。格式如下:

Stream<String> linesModifiedAndAppendedFromVariousFiles=getLines();

但是,我需要将其输入到接受 InputStream 或 Reader 作为参数的库方法中。

如何将此 Java 8 流输入 InputStream 或某种类型的 Reader?

P.S:这不是将 java.util.streams.Stream 包裹在 InputStream 周围。我正在寻找的是相反的方式。

【问题讨论】:

A StreamInputStream 无关。这两个完全是两个不同的包。 How can I use Java 8 Streams with an InputStream?的可能重复 @MuratKaragoz 我知道!没有任何问题声称它是。应该有(我希望)一种方法来创建一个接受 Java 8 流作为源的 Inputsream。 @MuratKaragöz 不,那是关于将 java.util.stream.Stream 包装在 InputStream 周围。我需要的是相反的方式。 【参考方案1】:

读取字节流:

    PipedInputStream inputStream = new PipedInputStream();
    Thread infinitInputStreamSupplier = infinitInputStreamSupplierThread(inputStream);
    infinitInputStreamSupplier.start();
    //consume input stream here...
    infinitInputStreamSupplier.interrupt();

这里是生成输入流的方法

private Thread infinitInputStreamSupplierThread(final PipedInputStream inputStream) 
        return new Thread(() -> 
            try (PipedOutputStream outputStream = new PipedOutputStream(inputStream)) 
                Stream<byte[]> infiniteStream = Stream.generate(randomByteSupplier());
                infiniteStream.forEachOrdered(bytes -> 
                    try 
                        outputStream.write(bytes);
                     catch (IOException e) 
                        e.printStackTrace();
                    
                );
             catch (IOException e) 
                throw new RuntimeException(e);
            
        );
    

    private Supplier<byte[]> randomByteSupplier() 
        return () -> 
            byte[] bytes = new byte[100];
            new Random().nextBytes(bytes);

            return bytes;
        ;
    

【讨论】:

【参考方案2】:

您可以使用PipedReader 和PipedWriter 来做到这一点。

PipedReader reader = new PipedReader();

Runnable feeder = new Runnable() 
    @Override
    public void run() 
        try (PipedWriter writer = new PipedWriter(reader)) 
            linesModifiedAndAppendedFromVariousFiles.forEachOrdered(line -> 
                try 
                    writer.write(line);
                    writer.write('\n');
                 catch (IOException e) 
                    throw new UncheckedIOException(e);
                
            );
         catch (IOException e) 
            throw new RuntimeException(e);
        
    
;
new Thread(feeder).start();

someLibraryMethod.consumeReader(reader);

【讨论】:

感谢您的回复。问题是,这里的 forEachOrdered 是一个终端操作,因此它终止了流。但我希望它是一个不断填充的无限流。鉴于此信息,我怎样才能实现上述相同的目标? forEachOrdered 将继续,直到 Stream 关​​闭。 forEachOrdered 本身不会关闭或终止 Stream。【参考方案3】:

java.util.Stream 在概念上是一个潜在的无穷无尽的、不可逆的(例如,一旦你移过一个条目就不能回到它)序列,这可能允许你并行处理它。至关重要的是,序列中的“东西”可以是ANYTHING。例如,您可以拥有一个 Color 对象流。

java.io.InputStream 在概念上是一个潜在的无限、不可逆、不可并行的字节序列。

这两件事是不一样的。

但是,如果您有专门的字节流,则可以将其转换为输入流。您只需选择不使用 Stream 中固有的并行化选项,然后这两件事就开始归结为同一件事。但是,如果您有任何不是字节的流,则必须提出“映射”。

假设您有一个字符串对象流。假设这是英文前 4 个数字的流(所以:Arrays.asList("one", "two", "three", "four").stream())。

您想如何将此字符串流映射到字节流?一种策略是使用 UTF-8 编码将字符串呈现为字节,并使用 0 字符作为分隔符。换句话说,您想要与此假设相同的结果:new ByteArrayInputStream(new String("one\0two\0three\0four").getBytes(StandardCharsets.UTF_8))

可以想象一个函数,它接收Stream&lt;Byte&gt; 并将其转换为 InputStream。但是,Stream&lt;Byte&gt; 将是一个非常低效的概念。还可以想象一个函数,它采用Stream&lt;T&gt; 以及将T 映射到byte[] 的映射函数,以及产生分隔符的分隔符常量(或生成值的函数)。对于上面的示例,类似于:

toInputStream(oneTwoThreeFour, str -> str.getBytes(StandardCharsets.UTF_8), "\0");

据我所知,这在核心库中不存在,也不存在于像番石榴这样的地方。但是写它应该是微不足道的。也许是半页的代码。

【讨论】:

以上是关于如何从 Java 8 Streams 中获取 Inputstream?的主要内容,如果未能解决你的问题,请参考以下文章

Hashmap with Streams in Java 8 Streams 收集 Map 的值

如何使用 Java Streams 从 HashMap 的 ArrayList 中获取字符串

如果使用我的服务使用Java 8 lambda / streams列表不为空,如何从列表中删除每个元素

Java 8 Streams 可以对集合中的项目进行操作,然后将其删除吗?

Java9函数式编程 Functional Programming with Streams in Java 9

Java 8 Streams - 分组为单个值[重复]