如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?
Posted
技术标签:
【中文标题】如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?【英文标题】:How to convert a dataframe to dataset in Apache Spark in Scala? 【发布时间】:2017-06-13 08:51:39 【问题描述】:我需要将我的数据框转换为数据集,我使用了以下代码:
val final_df = Dataframe.withColumn(
"features",
toVec4(
// casting into Timestamp to parse the string, and then into Int
$"time_stamp_0".cast(TimestampType).cast(IntegerType),
$"count",
$"sender_ip_1",
$"receiver_ip_2"
)
).withColumn("label", (Dataframe("count"))).select("features", "label")
final_df.show()
val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
val TrainingDF = trainingTest(0)
val TestingDF=trainingTest(1)
TrainingDF.show()
TestingDF.show()
///lets create our liner regression
val lir= new LinearRegression()
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setMaxIter(100)
.setTol(1E-6)
case class df_ds(features:Vector, label:Integer)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
val Training_ds = TrainingDF.as[df_ds]
我的问题是,我收到以下错误:
Error:(96, 36) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val Training_ds = TrainingDF.as[df_ds]
似乎数据框中的值的数量与我班级中的值的数量不同。但是我在我的 TrainingDF 数据帧上使用case class df_ds(features:Vector, label:Integer)
,因为它有一个特征向量和一个整数标签。这是 TrainingDF 数据框:
+--------------------+-----+
| features|label|
+--------------------+-----+
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,10...| 10|
+--------------------+-----+
这也是我原来的 final_df 数据框:
+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.3| 10.0.0.2| 10|
+------------+-----------+-------------+-----+
但是我得到了提到的错误!有谁能够帮助我? 提前致谢。
【问题讨论】:
【参考方案1】:您正在阅读的错误消息是一个很好的指针。
当您将 DataFrame
转换为 Dataset
时,您必须为 DataFrame
行中存储的任何内容提供正确的 Encoder
。
类似原始类型(Int
s、String
s 等)和 case classes
的编码器只需导入 SparkSession
的隐式即可提供,如下所示:
case class MyData(intField: Int, boolField: Boolean) // e.g.
val spark: SparkSession = ???
val df: DataFrame = ???
import spark.implicits._
val ds: Dataset[MyData] = df.as[MyData]
如果这也不起作用,是因为您尝试将DataFrame
转换到的类型不受支持。在这种情况下,您将不得不编写自己的 Encoder
:您可以找到有关它的更多信息 here 并查看示例(Encoder
代表 java.time.LocalDateTime
)here。
【讨论】:
【参考方案2】:Spark 1.6.0
case class MyCase(id: Int, name: String)
val encoder = org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[MyCase]
val dataframe = …
val dataset = dataframe.as(encoder)
Spark 2.0 或更高版本
case class MyCase(id: Int, name: String)
val encoder = org.apache.spark.sql.Encoders.product[MyCase]
val dataframe = …
val dataset = dataframe.as(encoder)
【讨论】:
以上是关于如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?的主要内容,如果未能解决你的问题,请参考以下文章