迭代 RDD 迭代器并应用限制时,Spark 似乎没有调用 hasNext
Posted
技术标签:
【中文标题】迭代 RDD 迭代器并应用限制时,Spark 似乎没有调用 hasNext【英文标题】:Spark doesn't seem to call hasNext when iterating through RDD iterators and a limit is applied 【发布时间】:2018-07-12 17:36:46 【问题描述】:我在 Spark(使用 Scala)中有一个自定义 RDD
,它负责缓冲来自数据库连接的流。
当RDD
的计算函数被调用时,我建立一个到数据库的连接,并在我返回的流上返回一个迭代器。
问题是我不确定在哪里关闭与数据库的连接,因为 Spark 似乎没有机制告诉我何时使用迭代器完成。
最初我将清理代码放在我返回的迭代器的“hasNext”中,当返回迭代器的末尾时进行清理。
这样做的问题是,当我执行有限制的查询时,Spark 不会迭代到迭代器的末尾,因此永远不会在正确的时间调用 hasNext
。
我可以使用来自SparkPlan::getByteArrayRdd
的以下 sn-p 来确认这一点(这是在应用限制时读取我的 RDD
的路径):
while (iter.hasNext && (n < 0 || count < n))
val row = iter.next().asInstanceOf[UnsafeRow]
out.writeInt(row.getSizeInBytes)
row.writeToStream(out, buffer)
count += 1
Spark 为自定义 RDD 提供哪些机制来清理其资源?
【问题讨论】:
我怀疑你的所作所为超出了 Spark 的用途。见***.com/questions/31072893/… 【参考方案1】:传递给compute
函数的TaskContext
有一个addTaskCompletionListener
方法,其documentation 表示:
添加一个(Java 友好的)侦听器以在任务完成时执行。 这将在所有情况下调用 - 成功、失败或 消除。将侦听器添加到已完成的任务将 导致立即调用该侦听器。
HadoopRDD 的一个示例用途是注册回调以关闭输入流。
监听器抛出的异常会导致任务失败。
在我看来,这正是您应该关闭数据库连接的地方!
【讨论】:
感谢这项工作,就像警告其他任何偶然发现此问题的人一样:您不能真正从 TaskCompleteListener 调用中抛出异常,因为来自 spark 的正常异常处理不适用,而是您的异常将是吸收到 TaskCompleteListenerException 中,这是一个私有类型,并且没有干净的方法可以让您的异常恢复。以上是关于迭代 RDD 迭代器并应用限制时,Spark 似乎没有调用 hasNext的主要内容,如果未能解决你的问题,请参考以下文章
apache spark - 迭代地跳过并从 RDD 中获取