迭代 RDD 迭代器并应用限制时,Spark 似乎没有调用 hasNext

Posted

技术标签:

【中文标题】迭代 RDD 迭代器并应用限制时,Spark 似乎没有调用 hasNext【英文标题】:Spark doesn't seem to call hasNext when iterating through RDD iterators and a limit is applied 【发布时间】:2018-07-12 17:36:46 【问题描述】:

我在 Spark(使用 Scala)中有一个自定义 RDD,它负责缓冲来自数据库连接的流。

RDD 的计算函数被调用时,我建立一个到数据库的连接,并在我返回的流上返回一个迭代器。

问题是我不确定在哪里关闭与数据库的连接,因为 Spark 似乎没有机制告诉我何时使用迭代器完成。

最初我将清理代码放在我返回的迭代器的“hasNext”中,当返回迭代器的末尾时进行清理。

这样做的问题是,当我执行有限制的查询时,Spark 不会迭代到迭代器的末尾,因此永远不会在正确的时间调用 hasNext

我可以使用来自SparkPlan::getByteArrayRdd 的以下 sn-p 来确认这一点(这是在应用限制时读取我的 RDD 的路径):

while (iter.hasNext && (n < 0 || count < n)) 
  val row = iter.next().asInstanceOf[UnsafeRow]
  out.writeInt(row.getSizeInBytes)
  row.writeToStream(out, buffer)
  count += 1

Spark 为自定义 RDD 提供哪些机制来清理其资源?

【问题讨论】:

我怀疑你的所作所为超出了 Spark 的用途。见***.com/questions/31072893/… 【参考方案1】:

传递给compute 函数的TaskContext 有一个addTaskCompletionListener 方法,其documentation 表示:

添加一个(Java 友好的)侦听器以在任务完成时执行。 这将在所有情况下调用 - 成功、失败或 消除。将侦听器添加到已完成的任务将 导致立即调用该侦听器。

HadoopRDD 的一个示例用途是注册回调以关闭输入流。

监听器抛出的异常会导致任务失败。

在我看来,这正是您应该关闭数据库连接的地方!

【讨论】:

感谢这项工作,就像警告其他任何偶然发现此问题的人一样:您不能真正从 TaskCompleteListener 调用中抛出异常,因为来自 spark 的正常异常处理不适用,而是您的异常将是吸收到 TaskCompleteListenerException 中,这是一个私有类型,并且没有干净的方法可以让您的异常恢复。

以上是关于迭代 RDD 迭代器并应用限制时,Spark 似乎没有调用 hasNext的主要内容,如果未能解决你的问题,请参考以下文章

apache spark - 迭代地跳过并从 RDD 中获取

修改 Spark RDD foreach 中的集合

Java8函数式编程:类比Spark RDD算子的Stream流操作

对spark内存迭代计算框架的理解误区

迭代cogrouped RDD

spark系列-2Spark 核心数据结构:弹性分布式数据集 RDD