Spark Streaming:使用带有列修剪的 MicroBatchReader 的模式不匹配

Posted

技术标签:

【中文标题】Spark Streaming:使用带有列修剪的 MicroBatchReader 的模式不匹配【英文标题】:Spark Streaming: schema mismatch using MicroBatchReader with columns pruning 【发布时间】:2018-06-29 14:08:55 【问题描述】:

我正在编写自定义 Spark 流式传输源。我想支持列修剪。 我不能分享完整的代码,反正我做了这样的事情:

class MyMicroBatchReader(...) extends MicroBatchReader with SupportsPushDownRequiredColumns 

  var schema: StructType = createSchema()

  def readSchema(): StructType = schema

  def pruneColumns(requiredSchema: StructType): Unit = 
    schema = requiredSchema
  

  ...


我正在使用架构创建批次行:我已经检查过在我返回的行中只有所请求列的值。

但是,如果我运行选择某些列的流式查询,作业将失败。例如,运行

spark.readStream().format("mysource").load().select("Id").writeStream().format("console").start()

我得到以下异常:

18/06/29 15:50:01 ERROR MicroBatchExecution: Query [id = 59c13195-9d63-42c9-8f92-eb9d67e8b26c, runId = 72124019-1ab3-48a9-9503-0cf1c7d26fb9] terminated with error
java.lang.AssertionError: assertion failed: Invalid batch: fieldA#0,fieldB#1,fieldC,Id#3,fieldD#4,fieldE#5 != Id#52
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:417)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:416)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:416)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:414)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:414)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

你能帮我理解一下怎么回事吗?

谢谢。

【问题讨论】:

Spark 的版本是多少?你能explain(true)流数据集,即spark.readStream().format("mysource").load().select("Id").explain(true)吗? 也发生在这里,出于某种原因,仅在第二批迭代中 - 解释命令: sparkSession.readStream.format(myV2Source).load().select("c1").explain(true) 结果: == 已解析逻辑计划 == 'Project [unresolvedalias('c1, None)] +- AnalysisBarrier +- StreamingRelationV2 com.mycompany.v2.MyDefaultSource@11987483, com.mycompany.v2.MyDefaultSource, Map(), [c1#452, c2#453L, c3#454, ... 26 个更多字段] == 分析的逻辑计划 == c1: string Project [c1#452] +- StreamingRelationV2 com.mycompany.v2.MyDefaultSource @11987483, com.mycompany.v2.MyDefaultSource, Map(), [c1#452, c2#453L, c3#454, ... 26 个更多字段] == 优化逻辑计划 == 项目 [c1#452] +- StreamingRelationV2 com.mycompany.v2.MyDefaultSource@11987483, com.mycompany.v2.MyDefaultSource, Map(), [c1#452 , c2#453L, c3#454, ... 26 个更多字段] == 物理计划 == *(1) 项目 [c1#452] +- StreamingRelation com.mycompany.v2.MyDefaultSource, [c1#452, c2# 453L, c3#454, ... 还有 26 个字段] 【参考方案1】:

我通过在每次微批量提交后将架构设置为完整方案来解决它:

class MyMicroBatchReader(...) extends MicroBatchReader with SupportsPushDownRequiredColumns 

  var fullSchema: StructType = createSchema()
  var schema: StructType = fullSchema

  def readSchema(): StructType = schema

  def pruneColumns(requiredSchema: StructType): Unit = 
    schema = requiredSchema
  

  def commit (end: OffsetV2): Unit =   
    ...
    schema = fullSchema
  


【讨论】:

以上是关于Spark Streaming:使用带有列修剪的 MicroBatchReader 的模式不匹配的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行

Spark 是不是支持使用 Parquet 文件进行分区修剪

Spark Streaming Job 不可恢复

pySpark 3.0如何修剪所有列的空格[重复]

将 RDD 转换为 DataFrame Spark Streaming 时的 ClassCastException

Spark Structured Streaming 2.3.0 中的水印