如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?

Posted

技术标签:

【中文标题】如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?【英文标题】:How to convert a dataframe to dataset in Apache Spark in Scala? 【发布时间】:2017-06-13 08:51:39 【问题描述】:

我需要将我的数据框转换为数据集,我使用了以下代码:

    val final_df = Dataframe.withColumn(
      "features",
      toVec4(
        // casting into Timestamp to parse the string, and then into Int
        $"time_stamp_0".cast(TimestampType).cast(IntegerType),
        $"count",
        $"sender_ip_1",
        $"receiver_ip_2"
      )
    ).withColumn("label", (Dataframe("count"))).select("features", "label")

    final_df.show()

    val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
    val TrainingDF = trainingTest(0)
    val TestingDF=trainingTest(1)
    TrainingDF.show()
    TestingDF.show()

    ///lets create our liner regression
    val lir= new LinearRegression()
    .setRegParam(0.3)
    .setElasticNetParam(0.8)
    .setMaxIter(100)
    .setTol(1E-6)

    case class df_ds(features:Vector, label:Integer)
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

    val Training_ds = TrainingDF.as[df_ds]

我的问题是,我收到以下错误:

Error:(96, 36) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    val Training_ds = TrainingDF.as[df_ds]

似乎数据框中的值的数量与我班级中的值的数量不同。但是我在我的 TrainingDF 数据帧上使用case class df_ds(features:Vector, label:Integer),因为它有一个特征向量和一个整数标签。这是 TrainingDF 数据框:

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,10...|   10|
+--------------------+-----+

这也是我原来的 final_df 数据框:

+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.3|     10.0.0.2|   10|
+------------+-----------+-------------+-----+

但是我得到了提到的错误!有谁能够帮助我? 提前致谢。

【问题讨论】:

【参考方案1】:

您正在阅读的错误消息是一个很好的指针。

当您将 DataFrame 转换为 Dataset 时,您必须为 DataFrame 行中存储的任何内容提供正确的 Encoder

类似原始类型(Ints、Strings 等)和 case classes 的编码器只需导入 SparkSession 的隐式即可提供,如下所示:

case class MyData(intField: Int, boolField: Boolean) // e.g.

val spark: SparkSession = ???
val df: DataFrame = ???

import spark.implicits._

val ds: Dataset[MyData] = df.as[MyData]

如果这也不起作用,是因为您尝试将DataFrame 转换到的类型不受支持。在这种情况下,您将不得不编写自己的 Encoder:您可以找到有关它的更多信息 here 并查看示例(Encoder 代表 java.time.LocalDateTime)here。

【讨论】:

【参考方案2】:

Spark 1.6.0

case class MyCase(id: Int, name: String)

val encoder = org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[MyCase]

val dataframe = …

val dataset = dataframe.as(encoder)

Spark 2.0 或更高版本

case class MyCase(id: Int, name: String)

val encoder = org.apache.spark.sql.Encoders.product[MyCase]

val dataframe = …

val dataset = dataframe.as(encoder)

【讨论】:

以上是关于如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?的主要内容,如果未能解决你的问题,请参考以下文章

在scala中将Spark Dataframe转换为RDD

如何在火花中将rdd对象转换为数据框

如何将火花数据帧数组转换为元组

使用 scala 将 JavapairRDD 转换为数据帧

如何在 pyspark 中将 DenseMatrix 转换为 spark DataFrame?

如何在火花中将数据帧转换为csv [重复]