如何从包含枚举的案例类创建 Spark 数据集或数据框
Posted
技术标签:
【中文标题】如何从包含枚举的案例类创建 Spark 数据集或数据框【英文标题】:How to create Spark Dataset or Dataframe from case classes that contains Enums 【发布时间】:2016-09-23 12:44:51 【问题描述】:我一直在尝试使用包含枚举的案例类创建 Spark 数据集,但我做不到。我使用的是 Spark 1.6.0 版。例外是抱怨没有为我的枚举找到编码器。这在 Spark 中不可能在数据中包含枚举吗?
代码:
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
object MyEnum extends Enumeration
type MyEnum = Value
val Hello, World = Value
case class MyData(field: String, other: MyEnum.Value)
object EnumTest
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val sqlCtx = new SQLContext(sc)
import sqlCtx.implicits._
val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS()
println(s"df: $df.collect().mkString(",")")
错误:
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.company.MyEnum.Value
- field (class: "scala.Enumeration.Value", name: "other")
- root class: "com.company.MyData"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
at com.company.EnumTest$.main(EnumTest.scala:22)
at com.company.EnumTest.main(EnumTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
【问题讨论】:
【参考方案1】:您可以创建自己的编码器:
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
object MyEnum extends Enumeration
type MyEnum = Value
val Hello, World = Value
case class MyData(field: String, other: MyEnum.Value)
object MyDataEncoders
implicit def myDataEncoder: org.apache.spark.sql.Encoder[MyData] =
org.apache.spark.sql.Encoders.kryo[MyData]
object EnumTest
import MyDataEncoders._
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val sqlCtx = new SQLContext(sc)
import sqlCtx.implicits._
val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS()
println(s"df: $df.collect().mkString(",")")
【讨论】:
谢谢!如果我想做 toDF() 而不是 toDS() 怎么办?然后我收到以下错误:线程“main”中的异常 java.lang.UnsupportedOperationException:不支持类型 com.nordea.gpdw.dq.MyEnum.Value 的架构 您使用的代码与我在回答中使用的代码完全相同吗?我尝试将toDS
更改为toDF
,它似乎有效。
是的,你确定它会在标准输出上打印出 df: MyData(hello,World) 吗?因为有很多日志输出。
我可以看到对象的字节数,但恐怕Encoder
功能仅适用于新的Dataset
类型。
好的,所以你真的不能让它在 Spark 1.6.0 中正常工作吗?它在 Spark 2 中有效吗?以上是关于如何从包含枚举的案例类创建 Spark 数据集或数据框的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark Scala 中的 Schema RDD [从案例类中创建] 中查找重复项以及相应的重复计数?
Spark 案例类 - 十进制类型编码器错误“无法从十进制向上转换”