如何为 scala Iterable、spark 数据集制作编码器

Posted

技术标签:

【中文标题】如何为 scala Iterable、spark 数据集制作编码器【英文标题】:How to make an Encoder for scala Iterable, spark dataset 【发布时间】:2018-02-16 11:21:04 【问题描述】:

我正在尝试从 RDD y 创建一个数据集

Pattern: y: RDD[(MyObj1, scala.Iterable[MyObj2])]

所以我明确地创建了编码器

implicit def tuple2[A1, A2](
                              implicit e1: Encoder[A1],
                              e2: Encoder[A2]
                            ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) 
//Create Dataset
val z = spark.createDataset(y)(tuple2[MyObj1, Iterable[MyObj2]]) 

当我编译这段代码时,我没有错误,但是当我尝试运行它时,我得到了这个错误:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for scala.Iterable[org.bean.input.MyObj2]
- field (class: "scala.collection.Iterable", name: "_2")
- root class: "scala.Tuple2"
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:625)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:619)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:607)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:344)
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:607)
        at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
        at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
        at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233)
        at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)

对我的对象(MyObj1 和 MyObj2)的一些解释- MyObj1:

case class MyObj1(
                      id:String,
                      type:String
                  ) 

- MyObj2:

trait MyObj2 
  val o_state:Option[String]

  val n_state:Option[String]

  val ch_inf: MyObj1

  val state_updated:MyObj3

请帮忙

【问题讨论】:

您是否导入了spark.implicits._(其中spark 是您的SparkSession 的名称)?我认为这也可以让您访问元组的编码器。您可能还需要为MyObjMyObj2 提供编码器。 @hoyland 可能是这样,否则我猜会是并发症错误。 是的,我导入了`spark.implicits._ 【参考方案1】:

Spark 不为Iterables 提供Encoder,因此除非您想使用Encoder.kryoEncoder.java,否则这是行不通的。

Spark 为其提供EncodersIterable 最接近的子类是Seq,所以这可能是您应该在这里使用的子类。否则参考How to store custom objects in Dataset?

【讨论】:

是的,我尝试了 kryo 编码器:implicit def chargingStatEncoder [A] (implicit cs: ClassTag [A]) = org.apache.spark.sql.Encoders.kryo[A](cs)。但我的数据集是:Pattern: z: Dataset[(MyObj1, scala.Iterable[MyObj2])]。我该如何实现它?【参考方案2】:

尝试将声明更改为:val y: RDD[(MyObj1, Seq[MyObj2])],它会起作用。我检查了我的课程:

case class Key(key: String) 
case class Value(value: Int) 

为:

val y: RDD[(Key, Seq[Value])] = sc.parallelize(Map(
  Key("A") -> List(Value(1), Value(2)),
  Key("B") -> List(Value(3), Value(4), Value(5))
).toSeq)

val z = sparkSession.createDataset(y)
z.show()

我明白了:

+---+---------------+
| _1|             _2|
+---+---------------+
|[A]|     [[1], [2]]|
|[B]|[[3], [4], [5]]|
+---+---------------+

如果我更改为 Iterable,我会遇到你的异常。

【讨论】:

使用 seq 时出现另一个错误,因为我在 MyObj2 MyObj1 & MyObj3 中有 another Objects

以上是关于如何为 scala Iterable、spark 数据集制作编码器的主要内容,如果未能解决你的问题,请参考以下文章

Spark Scala - 如何为每个组创建新列,然后在 spark 数据框中分解列值

在 Scala Spark 中,当源列为 NULL 时如何为派生列添加默认值?

scala.collection.immutable.Iterable[org.apache.spark.sql.Row] 到 DataFrame ?错误:使用替代方法重载了方法值 createDat

如何为scala中的空数据框现有列添加赋值?

如何为 Spark SQL 中的posexplode 列提供别名?

Spark基础-scala学习(集合)