如何从包含枚举的案例类创建 Spark 数据集或数据框

Posted

技术标签:

【中文标题】如何从包含枚举的案例类创建 Spark 数据集或数据框【英文标题】:How to create Spark Dataset or Dataframe from case classes that contains Enums 【发布时间】:2016-09-23 12:44:51 【问题描述】:

我一直在尝试使用包含枚举的案例类创建 Spark 数据集,但我做不到。我使用的是 Spark 1.6.0 版。例外是抱怨没有为我的枚举找到编码器。这在 Spark 中不可能在数据中包含枚举吗?

代码:

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext

object MyEnum extends Enumeration 
  type MyEnum = Value
  val Hello, World = Value


case class MyData(field: String, other: MyEnum.Value)

object EnumTest 

  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val sqlCtx = new SQLContext(sc)

    import sqlCtx.implicits._

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS()

    println(s"df: $df.collect().mkString(",")")
  


错误:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.company.MyEnum.Value
- field (class: "scala.Enumeration.Value", name: "other")
- root class: "com.company.MyData"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
at com.company.EnumTest$.main(EnumTest.scala:22)
at com.company.EnumTest.main(EnumTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

【问题讨论】:

【参考方案1】:

您可以创建自己的编码器:

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext

object MyEnum extends Enumeration 
  type MyEnum = Value
  val Hello, World = Value


case class MyData(field: String, other: MyEnum.Value)

object MyDataEncoders 
  implicit def myDataEncoder: org.apache.spark.sql.Encoder[MyData] =
    org.apache.spark.sql.Encoders.kryo[MyData]
  

object EnumTest 
  import MyDataEncoders._

  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val sqlCtx = new SQLContext(sc)

    import sqlCtx.implicits._

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS()

    println(s"df: $df.collect().mkString(",")")
  

【讨论】:

谢谢!如果我想做 toDF() 而不是 toDS() 怎么办?然后我收到以下错误:线程“main”中的异常 java.lang.UnsupportedOperationException:不支持类型 com.nordea.gpdw.dq.MyEnum.Value 的架构 您使用的代码与我在回答中使用的代码完全相同吗?我尝试将toDS 更改为toDF,它似乎有效。 是的,你确定它会在标准输出上打印出 df: MyData(hello,World) 吗?因为有很多日志输出。 我可以看到对象的字节数,但恐怕Encoder 功能仅适用于新的Dataset 类型。 好的,所以你真的不能让它在 Spark 1.6.0 中正常工作吗?它在 Spark 2 中有效吗?

以上是关于如何从包含枚举的案例类创建 Spark 数据集或数据框的主要内容,如果未能解决你的问题,请参考以下文章

用于聚合的 Spark 数据集或数据框

如何在 Spark Scala 中的 Schema RDD [从案例类中创建] 中查找重复项以及相应的重复计数?

Spark 案例类 - 十进制类型编码器错误“无法从十进制向上转换”

获取Apache Spark Java中的整个数据集或仅列的摘要

Spark StructType VS 案例类

如何从 spark-shell/spark-submit 运行交互式 Spark 应用程序