Spark:如何使用 RowEncoder 创建流式数据集?
Posted
技术标签:
【中文标题】Spark:如何使用 RowEncoder 创建流式数据集?【英文标题】:Spark: How to create streaming Dataset with RowEncoder? 【发布时间】:2018-05-07 11:38:15 【问题描述】:我有一个流数据帧,使用 Spark 结构化流创建。像这样-
val dataStream =
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", topic)
.load()
现在,当我尝试从 datastream
创建一个带有名为 newKey
的附加列的数据集时,它给了我以下错误-
[error] (run-main-0) java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
[error] - field (class: "org.apache.spark.sql.Row", name: "_2")
[error] - root class: "scala.Tuple2"
java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- field (class: "org.apache.spark.sql.Row", name: "_2")
- root class: "scala.Tuple2"
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:642)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:444)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:820)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:444)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:636)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:624)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:444)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:820)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:444)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:433)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
我使用的代码如下:
import spark.implicits._
implicit val rowEncoder: ExpressionEncoder[Row] = RowEncoder(dataStream.schema)
val dsStream =
dataStream
.select(lit("a").as("newKey"), col("*"))
.as[(String, Row)]
.writeStream
.format("console")
.start()
谁能帮我解决?
【问题讨论】:
【参考方案1】:以下任何一种都可以:
map
:
import org.apache.spark.sql.Encoder, Encoders, Row
import org.apache.spark.sql.functions._
val df = Seq((1L, "a", 4.0)).toDF("x", "y", "z")
val encoder = Encoders.tuple(Encoders.STRING, RowEncoder(df.schema))
df.map(row => ("a", row))(encoder)
select
与 struct
和 as
:
df.select(lit("a"), struct(df.columns map col: _*)).as[(String, Row)](encoder)
【讨论】:
感谢您的回复。但这对我不起作用。我得到以下异常 -org.apache.spark.sql.AnalysisException: Try to map struct<newKey:string,key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,timestampType:int> to Tuple2, but failed as the number of fields does not line up.;
第二个对我有用。不是第一个。谢谢!以上是关于Spark:如何使用 RowEncoder 创建流式数据集?的主要内容,如果未能解决你的问题,请参考以下文章
Spark Direct Stream 不会为每个 kafka 分区创建并行流
Apache Spark:SparkStream创建Receiver来实现模拟无边界流操作
Apache Spark - 在java中返回空数据集的映射函数