结构化流式传输:具有流式传输源的查询必须使用 writeStream.start() 执行

Posted

技术标签:

【中文标题】结构化流式传输:具有流式传输源的查询必须使用 writeStream.start() 执行【英文标题】:Structured streaming: Queries with streaming sources must be executed with writeStream.start() 【发布时间】:2020-03-30 21:15:06 【问题描述】:

我正在尝试使用结构化流从文件中读取一些数据,最后将其写入 Cassandra。但是我收到以下错误(在 cassandra 写作之前)

"org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"

这是我正在使用的代码 sn-p

val ip15M = spark.readStream.schema(NewsSchema).parquet(INPUT_DIRECTORY)
val dataframeToRowColFunction = new RowToColumn(table) // This seems to work fine

val pairs = ip15M.toJavaRDD.flatMapToPair(dataframeToRowColFunction.FlatMapData) // This fails
// ... Other code

这是 RowToColumn 类的样子

class RowToColumn (var table: Table) extends java.io.Serializable
  val decomposer = new EventDecomposer(table)

  val FlatMapData: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() 
  //val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() 
    println(" in FlatMapData") // This is printed
    override def call(x: Row) = 
      println(" in Call method") // This is not printed
      // Other code ...
  


此作业无需流式传输即可正常工作。另外,我查看了其他link1 和link2 ,但没有解决问题

【问题讨论】:

【参考方案1】:

您可以通过以下方式处理写入部分,因为我不知道 Cassandra 是否具有用于 Spark 中结构化流的流到流连接器:

ip15M
      .writeStream
      .foreachBatch  (df, batchId) => 
        // here apply all of your logic on dataframe
      
      
      .start()

请记住,在 foreach 循环中,您处理的是 dataframe,而不是流,您很可能可以直接将它们保存在 Cassandra 中。

【讨论】:

谢谢。这似乎有效。我还没有写信给 Cassandra,但前面的步骤正在工作

以上是关于结构化流式传输:具有流式传输源的查询必须使用 writeStream.start() 执行的主要内容,如果未能解决你的问题,请参考以下文章

使用java每秒流式传输大量数据

发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询

具有不同长度的音频文件的 HTTP 实时流式传输

Mongoose 使用异步迭代器流式传输聚合查询

在 SDK 8 中使用 Android MediaPlayer 进行流式传输

Amazon DynamoDB 是不是支持查询结果的流式传输