Scala Spark - 调用 createDataFrame 时获取重载方法

Posted

技术标签:

【中文标题】Scala Spark - 调用 createDataFrame 时获取重载方法【英文标题】:Scala Spark - Get Overloaded method when calling createDataFrame 【发布时间】:2017-02-13 14:38:26 【问题描述】:

我尝试从一个双精度数组 (Array[Array[Double]]) 创建一个 DataFrame,如下所示:

val points : ArrayBuffer[Array[Double]] = ArrayBuffer(
Array(0.19238990024216676, 1.0, 0.0, 0.0),
Array(0.2864319929878242, 0.0, 1.0, 0.0),
Array(0.11160349352921925, 0.0, 2.0, 1.0),
Array(0.3659220026496052, 2.0, 2.0, 0.0),
Array(0.31809629470827383, 1.0, 1.0, 1.0))

val x = Array("__1", "__2", "__3", "__4")
val myschema = StructType(x.map(fieldName ⇒ StructField(fieldName, DoubleType, true)))

points.map(e => Row(e(0), e(1), e(2), e(3)))
val newDF = sqlContext.createDataFrame(points, myschema)

但是得到这个错误:

<console>:113: error: overloaded method value createDataFrame with alternatives:
(data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (scala.collection.mutable.ArrayBuffer[Array[Double]], org.apache.spark.sql.types.StructType)
val newDF = sqlContext.createDataFrame(points, myschema)

我在互联网上进行了搜索,但找不到解决方法!所以如果有人对此有任何想法,请帮助我!

【问题讨论】:

【参考方案1】:

这对我有用:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.collection.mutable.ArrayBuffer

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val points : ArrayBuffer[Array[Double]] = ArrayBuffer(
  Array(0.19238990024216676, 1.0, 0.0, 0.0),
  Array(0.2864319929878242, 0.0, 1.0, 0.0),
  Array(0.11160349352921925, 0.0, 2.0, 1.0),
  Array(0.3659220026496052, 2.0, 2.0, 0.0),
  Array(0.31809629470827383, 1.0, 1.0, 1.0))

val x = Array("__1", "__2", "__3", "__4")
val myschema = StructType(x.map(fieldName ⇒ StructField(fieldName, DoubleType, true)))

val rdd = sc.parallelize(points.map(e => Row(e(0), e(1), e(2), e(3))))
val newDF = sqlContext.createDataFrame(rdd, myschema)

newDF.show

【讨论】:

我会感谢您解释为什么不赞成。谢谢【参考方案2】:

没有接受ArrayBuffer[Array[Double]] 实例的方法createDataFrame 的重载。您对points.map 的调用没有被分配给任何东西,它返回一个新实例而不是就地操作。试试:

val points : List[Array[Double]] = List(
    Seq(0.19238990024216676, 1.0, 0.0, 0.0),
    Seq(0.2864319929878242, 0.0, 1.0, 0.0),
    Seq(0.11160349352921925, 0.0, 2.0, 1.0),
    Seq(0.3659220026496052, 2.0, 2.0, 0.0),
    Seq(0.31809629470827383, 1.0, 1.0, 1.0))

val x = Array("__1", "__2", "__3", "__4")
val myschema = StructType(x.map(fieldName ⇒ StructField(fieldName, DoubleType, true)))

val newDF = sqlContext.createDataFrame(
    points.map(Row.fromSeq(_), myschema)

【讨论】:

以上是关于Scala Spark - 调用 createDataFrame 时获取重载方法的主要内容,如果未能解决你的问题,请参考以下文章

从 Scala Spark 代码调用 Pyspark 脚本

使用 Scala 从 Spark 的 withColumn 中调用 udf 时出错

教你如何在Spark Scala/Java应用中调用Python脚本

如何使用反射从scala调用spark UDF?

教你如何在Spark Scala/Java应用中调用Python脚本

教你如何在Spark Scala/Java应用中调用Python脚本