AnalysisException:必须使用 writeStream.start() 执行带有流源的查询

Posted

技术标签:

【中文标题】AnalysisException:必须使用 writeStream.start() 执行带有流源的查询【英文标题】:AnalysisException: Queries with streaming sources must be executed with writeStream.start() 【发布时间】:2018-07-06 00:04:23 【问题描述】:

我收到一个异常,表明我需要启动一个流才能使用它。但是,流正在启动。这个设置有什么问题?

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("subscribe", "inputTopic")
  .option("startingOffsets", "earliest")
  .load
  .selectExpr(deserializeKeyExpression, deserializeValueExpression)
  .select("value.*")
  .withColumn("date", to_timestamp(from_unixtime(col("date"))))
  .transform(model.transform)
  .select(col("id") as "key", func(col(config.probabilityCol)) as "value.prediction")
  .selectExpr(serializeKeyExpression, serializeValueExpression)
  .writeStream
  .outputMode("update")
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("checkpointLocation", "checkpoint")
  .option("topic", "outputTopic")
  .start

这是一个例外:

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    ...
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3249)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2491)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2498)
    at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
    at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
    at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
    at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:306)
    at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
    at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
    at org.apache.spark.sql.Dataset.transform(Dataset.scala:2513)
    at com.company.project.Stream.transform(NewsRateJob.scala:65)
    at com.company.project.Stream.setupStream(NewsRateJob.scala:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:311)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:134)
    ... 18 common frames omitted

我熟悉 spark 2.2 和 VectorAssembler 的问题,但我使用的是 spark 2.3.1。

【问题讨论】:

尝试缩小问题发生的范围。如果您删除 transform(model.transform) 部分,它是否按预期工作? @Shaido 是的,异常发生在model.transform。它并没有按预期工作,因为model.transform 将数据集转换为不同的类型。 【参考方案1】:

发生此异常的原因是模型在流启动之前尝试从流中访问数据。在这种情况下,VectorAssembler 在数据集上调用 first 以确定向量的宽度。

2.3 不会自动修复 VectorAssembler 和结构化流的问题,它只是提供一个类(特别是 VectorSizeHint 类),它可以与带有结构化流的 VectorAssembler 一起使用。将此添加到管道的各个阶段可以解决问题。

stages += new VectorSizeHint()
  .setInputCol(column)
  .setSize(size)

这里有一些说明如何使用它的文档:https://docs.databricks.com/spark/latest/mllib/mllib-pipelines-and-stuctured-streaming.html

注意:OneHotEncoderEstimator 功能不需要这样做。

由于其他几个原因,我们遇到了类似的堆栈跟踪。一个是因为我们在模型中使用了 OneHotEstimator(需要将其更新为 OneHotEncoderEstimator),另一个是因为我们正在缓存管道(我们删除了缓存步骤)。

【讨论】:

【参考方案2】:

发生异常是因为您尝试将 ML Transformer 与流式传输 Dataset 一起使用。正如Spark Structured Streaming and Spark-Ml Regression 中所述,截至今天,Spark 不支持结构化流上的机器学习。

您必须重写代码以手动转换数据,而不依赖于 RDD 和 ML 库。

【讨论】:

我不确定 Jira 票证引用了什么,这里有一些 databricks 文档展示了如何使用 MLlib 管道和结构化流。 docs.databricks.com/spark/latest/mllib/…

以上是关于AnalysisException:必须使用 writeStream.start() 执行带有流源的查询的主要内容,如果未能解决你的问题,请参考以下文章

pyspark.sql.utils.AnalysisException:u'无法推断Parquet的模式。必须手动指定。

用户类抛出异常:org.apache.spark.sql.AnalysisException:无法推断 Parquet 的架构。必须手动指定

如何正确处理 spark.sql.AnalysisException

AnalysisException: Table or view not found --- 即使我使用“createGlobalTempView”创建了一个视图,如何解决?

无法使用 sparkDataframe 获取 Json 列:org.apache.spark.sql.AnalysisException:无法解析 'explode;

pyspark.sql.utils.AnalysisException:找不到数据源:kafka