Spark:如何使用 RowEncoder 创建流式数据集?

Posted

技术标签:

【中文标题】Spark:如何使用 RowEncoder 创建流式数据集?【英文标题】:Spark: How to create streaming Dataset with RowEncoder? 【发布时间】:2018-05-07 11:38:15 【问题描述】:

我有一个流数据帧,使用 Spark 结构化流创建。像这样-

val dataStream =
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServer)
      .option("subscribe", topic)
      .load()

现在,当我尝试从 datastream 创建一个带有名为 newKey 的附加列的数据集时,它给了我以下错误-

[error] (run-main-0) java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
[error] - field (class: "org.apache.spark.sql.Row", name: "_2")
[error] - root class: "scala.Tuple2"
java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- field (class: "org.apache.spark.sql.Row", name: "_2")
- root class: "scala.Tuple2"
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:642)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:444)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:820)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:444)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:636)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:624)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:624)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:444)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:820)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:444)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:433)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)

我使用的代码如下:

import spark.implicits._
implicit val rowEncoder: ExpressionEncoder[Row] = RowEncoder(dataStream.schema)

val dsStream =
    dataStream
      .select(lit("a").as("newKey"), col("*"))
      .as[(String, Row)]
      .writeStream
      .format("console")
      .start()

谁能帮我解决?

【问题讨论】:

【参考方案1】:

以下任何一种都可以:

map:

import org.apache.spark.sql.Encoder, Encoders, Row
import org.apache.spark.sql.functions._

val df = Seq((1L, "a", 4.0)).toDF("x", "y", "z")

val encoder = Encoders.tuple(Encoders.STRING, RowEncoder(df.schema))

df.map(row => ("a", row))(encoder)

selectstructas

df.select(lit("a"), struct(df.columns map col: _*)).as[(String, Row)](encoder)

【讨论】:

感谢您的回复。但这对我不起作用。我得到以下异常 - org.apache.spark.sql.AnalysisException: Try to map struct<newKey:string,key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,timestampType:int> to Tuple2, but failed as the number of fields does not line up.; 第二个对我有用。不是第一个。谢谢!

以上是关于Spark:如何使用 RowEncoder 创建流式数据集?的主要内容,如果未能解决你的问题,请参考以下文章

Spark Direct Stream 不会为每个 kafka 分区创建并行流

Apache Spark:SparkStream创建Receiver来实现模拟无边界流操作

Apache Spark - 在java中返回空数据集的映射函数

如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?

如何将 Spark 结构化流数据写入 Hive?

如何在 Spark 结构化流中获取书面记录的数量?