在 Stream API 中使用 AutoCloseable 接口 [重复]

Posted

技术标签:

【中文标题】在 Stream API 中使用 AutoCloseable 接口 [重复]【英文标题】:Using AutoClosable interfaces inside Stream API [duplicate] 【发布时间】:2017-09-22 08:57:04 【问题描述】:

今天我尝试重构这段代码,它从目录中的文件中读取 id,

Set<Long> ids = new HashSet<>();
for (String fileName : fileSystem.list("my-directory")) 
    InputStream stream = fileSystem.openInputStream(fileName);
    BufferedReader br = new BufferedReader(new InputStreamReader(stream));
    String line;
    while ((line = br.readLine()) != null) 
        ids.add(Long.valueOf(line.trim()));
    
    br.close();

使用流 api

Set<Long> ids = fileSystem.list("my-directory").stream()
    .map(fileName -> fileSystem::openInputStream)
    .map(is -> new BufferedReader(new InputStreamReader(is)))
    .flatMap(BufferedReader::lines)
    .map(String::trim)
    .map(Long::valueOf)
    .collect(Collectors.toSet());

然后我发现IO流不会被关闭,我也没有看到简单的方法来关闭它们,因为它们是在管道内部创建的。

有什么想法吗?

upd:例子中的文件系统是HDFS,Files#lines等类似方法不能用。

【问题讨论】:

另外,如果您正在访问实际的文件系统,请不要忘记您需要关闭由 fileSystem.list("my-directory").stream() 创建的 Stream。这些是自动关闭的,因此可以使用 try-with-resources 块来完成。 @GeraldMücke 我检查了一下,这两种解决方案都无济于事,try-with-resources 的问题与 Luan Nico 的回答完全相同 @JarrodRoberson 既然您有时间结束这个问题,也许您会详细说明该答案中的包装解决方案或 try-with-resources 将如何提供帮助? 【参考方案1】:

一旦流的所有元素都被消耗完,就可以挂钩到流中以“关闭”资源。因此可以在读取所有行后通过以下修改关闭阅读器:

.flatMap(reader -> reader.lines().onClose(() -> close(reader)))

close(AutoClosable) 在哪里处理 IOException。

作为概念证明,以下代码和输出已经过测试:

import java.util.stream.Stream;

class Test 
    public static void main(String[] args) 
        Stream.of(1, 2, 3).flatMap(i ->
                Stream.of(i, i * 2).onClose(() ->
                        System.out.println("Closed!")
                )
        ).forEach(System.out::println);
    


1
2
Closed!
2
4
Closed!
3
6
Closed!

【讨论】:

哦,也许这就是我要找的东西 很好,它有效 @Kiskae 是的,肯定加一。这正是 onClose 存在的原因 - 如果您不能关闭资源,请关闭。 @VGR 如果您有一个返回 Stream 对象的组件,那么了解此功能可能会很好,因此它会在使用后进行清理。 使用UncheckedCloseable 的this answer,您可以简单地使用.flatMap(br -&gt; br.lines().onClose(UncheckedCloseable.wrap(br)))【参考方案2】:

我还没有测试过实际的代码,但也许是这些方面的东西?

Set<Long> ids = fileSystem.list("my-directory").stream()
.map(fileName -> fileSystem::openInputStream)
.flatMap(is -> 
    try (BufferedReader br = new BufferedReader(new InputStreamReader(is))) 
      return is.lines().map(String::trim).map(Long::valueOf);
    
 )
.collect(Collectors.toSet());

当然不如你的漂亮,但我相信它是最接近你可以关闭它的。

【讨论】:

在实际返回行之前不会在 flatMap 中关闭 IO 流吗?需要测试一下 我不确定,@AdamSkywalker,这是一个很好的观点。你能在你的代码中测试它吗?我不确定这个文件系统变量是什么。 是的,我在本地文件系统上测试并得到 java.io.UncheckedIOException: java.io.IOException: Stream closed,看来我的第一条评论是正确的【参考方案3】:

为什么不简单一点,通过Files.lines

try (Stream<String> s = Files.lines(Paths.get("yourpath" + fileName))) 
    s.map(String::trim)
      .map(Long::valueOf)
      .collect(Collectors.toSet());

【讨论】:

因为我的文件系统是 hdfs :) 它在其他抽象上运行,我不能使用 Files.lines 方法,因为我没有 java.nio.file.Path 和其他东西 @Eugene Paths(因此Files.lines)默认情况下仅适用于某些标准文件系统。问题中的fileSystem 可能不是最好的例子,因为它似乎是自定义的或来自某些框架。 @Kapep 谢谢,有道理。 是的,我更新了问题以澄清。我正在寻找 java-stream 级别的解决方案,而不是文件系统解决方法

以上是关于在 Stream API 中使用 AutoCloseable 接口 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

使用 Stream API 简化集合操作

为啥不在 Stream API 中使用 reduce 来合并多个映射?

学习Java 8 Stream Api - Stream 周边及其他

工作中使用过与stream流相关的API操作

工作中使用过与stream流相关的API操作

stream操作常用API 示例详解