WrappedArray 的 Spark Dataframe 到 Dataframe[Vector]

Posted

技术标签:

【中文标题】WrappedArray 的 Spark Dataframe 到 Dataframe[Vector]【英文标题】:Spark Dataframe of WrappedArray to Dataframe[Vector] 【发布时间】:2017-10-18 11:46:51 【问题描述】:

我有一个 spark Dataframe df 具有以下架构:

root
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = false)

我想创建一个新的数据框,其中每一行都是Doubles 的向量,并希望获得以下架构:

root
     |-- features: vector (nullable = true)

到目前为止,我有以下代码(受这篇文章的影响:Converting Spark Dataframe(with WrappedArray) to RDD[labelPoint] in scala),但我担心它有问题,因为即使计算合理数量的行也需要很长时间。 此外,如果行太多,应用程序将因堆空间异常而崩溃。

val clustSet = df.rdd.map(r => 
          val arr = r.getAs[mutable.WrappedArray[Double]]("features")
          val features: Vector = Vectors.dense(arr.toArray)
          features
          ).map(Tuple1(_)).toDF()

我怀疑指令 arr.toArray 在这种情况下不是一个好的 Spark 实践。任何澄清都会非常有帮助。

谢谢!

【问题讨论】:

【参考方案1】:

这是因为.rdd 必须从内部内存格式中反序列化对象,这非常耗时。

可以使用.toArray - 您在行级别上操作,而不是将所有内容收集到驱动程序节点。

您可以使用 UDF 轻松做到这一点:

import org.apache.spark.ml.linalg._
val convertUDF = udf((array : Seq[Double]) => 
  Vectors.dense(array.toArray)
)
val withVector = dataset
  .withColumn("features", convertUDF('features))

代码来自这个答案:Convert ArrayType(FloatType,false) to VectorUTD

但是问题的作者没有询问差异

【讨论】:

非常感谢,这帮助很大,并将其标记为答案。我现在可以运行更多行,而且时间上令人满意。我仍然遇到一个异常:org.apache.spark.SparkException:Kryo 序列化失败:缓冲区溢出。当我尝试 200,000 行时,可用:0,必需:1。你对此有什么见解吗?再次感谢。 @user159941 请查看***.com/questions/31947335/… 我在我的代码中设置了以下内容:val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set( "spark.kryoserializer.buffer.max.mb","256") 并且成功了!谢谢。

以上是关于WrappedArray 的 Spark Dataframe 到 Dataframe[Vector]的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark Scala 中将 WrappedArray 转换为 List?

spark提示Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot b

将 Any 转换为数组 [重复]

Spark数据框:从数组中删除元素

总和 MADlib UDF Spark SQL

WrapedArray 的 WrappedArray 到 java 数组