如何正确处理 with-open 而不会在消费前关闭流?

Posted

技术标签:

【中文标题】如何正确处理 with-open 而不会在消费前关闭流?【英文标题】:How to handle with-open properly without stream getting closed before being consumed? 【发布时间】:2018-02-10 17:10:04 【问题描述】:

我正在编写我的第一个 Clojure 程序。

我正在使用 clojure.data.csv 来处理 csv 文件。我的文件可能很大,所以我确实想利用懒惰。我的 MWE 代码展示了我的问题,如下所示。

当我执行加载数据函数时,我得到“IOException Stream closed”,所以我很清楚惰性流在消费点之前就被关闭了。

我查看了 data.csv (https://github.com/clojure/data.csv) 的文档,可以看到防止在消费之前关闭流的一种方法是将流打开移动到消费流的调用堆栈。据我了解,这就是我在下面所做的,因为(采取 5)在 with-open 的范围内。显然,我有一个概念上的差距。非常感谢任何帮助!

(ns data-load.core
  (:gen-class)
  (:require [clojure.data.csv :as csv]
            [clojure.java.io :as io]))

(defn load-data [from to]
   (with-open [reader (io/reader from)
               writer (io/writer to)]
              (->> (csv/read-csv reader)
              (take 5))))

【问题讨论】:

这里有类似的问题,***.com/q/19645160。 【参考方案1】:

正如你所说,你从load-data 返回的是一个惰性序列,当它被消耗时,你已经离开了with-open 的范围。您只需要在返回之前强制实现惰性序列即可。

据我了解,这是我在下面所做的,因为(take 5)with-open 的范围内。

它在范围内,但take 也返回一个惰性序列!它只在另一个序列中包装了一个惰性序列,直到with-open 范围之后才会实现。来自 clojure.data.csv 示例:

(defn sum-second-column [filename]
  (with-open [reader (io/reader filename)]
    (->> (read-column reader 1)
         (drop 1)
         (map #(Double/parseDouble %))
         (reduce + 0)))) ;; this is the only non-lazy operation

这里的重要观察是最终操作是reduce,它将消耗惰性序列。如果您将reduce 取出并尝试从函数外部使用生成的序列,您将得到相同的“流关闭”异常。

这样做的一种方法是将序列转换为带有vec 的向量,或者使用doall,这也将强制它实现:

(defn load-data [from]
  (with-open [reader (io/reader from)]
   (->> (csv/read-csv reader)
        (take 5)
        ;; other intermediate steps go here
        (doall))))

我的文件可能很大,所以我确实想利用懒惰。

您需要一种方法在流关闭之前完成所有工作,因此您可以为您的 load-data 函数提供一个函数以在 CSV 的每一行上执行:

(defn load-data [from f]
  (with-open [reader (io/reader from)]
    (doall (map f (csv/read-csv reader)))))

例如,将行值连接成字符串:

(load-data (io/resource "input.txt")
           (partial apply str))
=> ("abc" "efg")

【讨论】:

谢谢!强迫doall对我有用。但是,data.csv 文档中的以下示例代码也可以工作,但在我看来,它应该具有与不强制评估相同的问题。 (defn copy-csv [from to](with-open [reader (io/reader from)writer (io/writer to)](->> (csv/read-csv reader)(map #(rest (butlast %)))(csv/write-csv writer)))) 请忽略我之前评论的后半部分。这显然是错误的。 @ViswaV 我认为这是可行的,因为write-csv 将消耗整个输入序列来写入文件。【参考方案2】:

如果您想要一个惰性解决方案,请查看https://***.com/a/13312151/954570(所有功劳归原作者https://***.com/users/181772/andrew-cooke 和https://***.com/users/611752/johnj 提供)。

这个想法是手动管理阅读器打开/关闭并保持阅读器打开直到序列用完。它有自己的怪癖,但对我来说效果很好(我需要合并/处理来自多个不适合内存的大文件的数据)。

【讨论】:

以上是关于如何正确处理 with-open 而不会在消费前关闭流?的主要内容,如果未能解决你的问题,请参考以下文章

如何保证最少消费一次redis的list队列数据

Spark消费Kafka如何实现精准一次性消费?

2023春招面试:消息中间件面试题整理

2023春招面试:消息中间件面试题整理

c# 如何正确传递 HttpRequestMessage 并在函数外返回 HttpRequestMessage 而不会泄漏

第3月第14天 生产者-消费者