使用 spark spark mapPartition 时出错 [重复]
Posted
技术标签:
【中文标题】使用 spark spark mapPartition 时出错 [重复]【英文标题】:Error while using spark spark mapPartition [duplicate] 【发布时间】:2017-10-03 23:27:04 【问题描述】:所以我有这个代码
val expanededDf = io.readInputs().mapPartitions
(iter:Iterator[Row]) =>
iter.map
(item:Row) =>
val myNewColumn = getUdf($"someColumnOriginal")
Row.fromSeq(item.toSeq :+(myNewColumn))
我遇到一个异常:“无法找到存储在数据集中的类型的编码器。导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。_支持序列化其他类型将在未来的版本中添加。” 我的进口是:
import spark.implicits._
import org.apache.spark.sql._
我必须使用 UDF,因为进行一些 REST 调用的函数非常复杂。基本上,代码尝试使用特定列值将新列添加到行中,然后返回数据框。我曾尝试使用 withColumn,但由于我在这里处理的是 PB 的数据,所以速度非常慢。我是 spark 和 scala 的新手,因此如果我的问题非常蹩脚,我提前道歉。
【问题讨论】:
【参考方案1】:首先,withColumn
是要走的路,如果它很慢,可能是因为你的工作需要调整,我认为切换到 RDD 不会让它变得更快。
但无论如何...您不应该在 RDD 的每一行上调用的函数中引用 DataFrame。
为了更好地理解发生了什么,当运行一个 spark 程序时,有一个 Driver,它是 master,还有一个 Executor,它是 slave。 从属不知道 DataFrame,只有驱动知道。
还有一点很重要,当您编写在执行程序中运行的代码时,在引用驱动程序范围内的变量时必须小心。如果你这样做了,Spark 将尝试序列化它们并将它们发送给 Executors。如果它是你想要的,如果这些对象很小,如果 Spark 知道如何序列化它们,那没关系。
在这种情况下,Spark 正在尝试序列化 $"someColumnOriginal"
,它是类 Column
的对象,但它不知道如何并且它失败了。
在这种情况下,要让它工作,你必须知道你想要的字段在什么位置,假设它在位置 2,你会写
Row.fromSeq(item.toSeq :+ item.get(2))
如果模式可用(item.schema、rdd.schema),您可以通过查看模式来获得位置,并且由于它是一个 int,它可以在循环之外完成,Spark 将能够对其进行序列化。 你可以阅读这篇文章http://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error 了解更多关于序列化的信息。
【讨论】:
以上是关于使用 spark spark mapPartition 时出错 [重复]的主要内容,如果未能解决你的问题,请参考以下文章
想知道为啥空的内部迭代器会导致 mapPartitionsWithIndex 出现不可序列化的异常