使用 spark spark mapPartition 时出错 [重复]

Posted

技术标签:

【中文标题】使用 spark spark mapPartition 时出错 [重复]【英文标题】:Error while using spark spark mapPartition [duplicate] 【发布时间】:2017-10-03 23:27:04 【问题描述】:

所以我有这个代码

val expanededDf = io.readInputs().mapPartitions 
(iter:Iterator[Row]) => 
    iter.map
        (item:Row) => 
            val myNewColumn = getUdf($"someColumnOriginal")
            Row.fromSeq(item.toSeq :+(myNewColumn))
            
     
 
 

我遇到一个异常:“无法找到存储在数据集中的类型的编码器。导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。_支持序列化其他类型将在未来的版本中添加。” 我的进口是:

import spark.implicits._
import org.apache.spark.sql._

我必须使用 UDF,因为进行一些 REST 调用的函数非常复杂。基本上,代码尝试使用特定列值将新列添加到行中,然后返回数据框。我曾尝试使用 withColumn,但由于我在这里处理的是 PB 的数据,所以速度非常慢。我是 spark 和 scala 的新手,因此如果我的问题非常蹩脚,我提前道歉。

【问题讨论】:

【参考方案1】:

首先,withColumn 是要走的路,如果它很慢,可能是因为你的工作需要调整,我认为切换到 RDD 不会让它变得更快。

但无论如何...您不应该在 RDD 的每一行上调用的函数中引用 DataFrame。

为了更好地理解发生了什么,当运行一个 spark 程序时,有一个 Driver,它是 master,还有一个 Executor,它是 slave。 从属不知道 DataFrame,只有驱动知道。

还有一点很重要,当您编写在执行程序中运行的代码时,在引用驱动程序范围内的变量时必须小心。如果你这样做了,Spark 将尝试序列化它们并将它们发送给 Executors。如果它是你想要的,如果这些对象很小,如果 Spark 知道如何序列化它们,那没关系。

在这种情况下,Spark 正在尝试序列化 $"someColumnOriginal",它是类 Column 的对象,但它不知道如何并且它失败了。 在这种情况下,要让它工作,你必须知道你想要的字段在什么位置,假设它在位置 2,你会写

 Row.fromSeq(item.toSeq :+ item.get(2))

如果模式可用(item.schema、rdd.schema),您可以通过查看模式来获得位置,并且由于它是一个 int,它可以在循环之外完成,Spark 将能够对其进行序列化。 你可以阅读这篇文章http://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error 了解更多关于序列化的信息。

【讨论】:

以上是关于使用 spark spark mapPartition 时出错 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

想知道为啥空的内部迭代器会导致 mapPartitionsWithIndex 出现不可序列化的异常

科普Spark,Spark是什么,如何使用Spark

科普Spark,Spark是啥,如何使用Spark

科普Spark,Spark是什么,如何使用Spark

手把手带你玩转Spark机器学习-使用Spark进行文本处理

Spark系列