结构化流式传输:具有流式传输源的查询必须使用 writeStream.start() 执行
Posted
技术标签:
【中文标题】结构化流式传输:具有流式传输源的查询必须使用 writeStream.start() 执行【英文标题】:Structured streaming: Queries with streaming sources must be executed with writeStream.start() 【发布时间】:2020-03-30 21:15:06 【问题描述】:我正在尝试使用结构化流从文件中读取一些数据,最后将其写入 Cassandra。但是我收到以下错误(在 cassandra 写作之前)
"org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"
这是我正在使用的代码 sn-p
val ip15M = spark.readStream.schema(NewsSchema).parquet(INPUT_DIRECTORY)
val dataframeToRowColFunction = new RowToColumn(table) // This seems to work fine
val pairs = ip15M.toJavaRDD.flatMapToPair(dataframeToRowColFunction.FlatMapData) // This fails
// ... Other code
这是 RowToColumn 类的样子
class RowToColumn (var table: Table) extends java.io.Serializable
val decomposer = new EventDecomposer(table)
val FlatMapData: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]()
//val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]()
println(" in FlatMapData") // This is printed
override def call(x: Row) =
println(" in Call method") // This is not printed
// Other code ...
此作业无需流式传输即可正常工作。另外,我查看了其他link1 和link2 ,但没有解决问题
【问题讨论】:
【参考方案1】:您可以通过以下方式处理写入部分,因为我不知道 Cassandra 是否具有用于 Spark 中结构化流的流到流连接器:
ip15M
.writeStream
.foreachBatch (df, batchId) =>
// here apply all of your logic on dataframe
.start()
请记住,在 foreach
循环中,您处理的是 dataframe
,而不是流,您很可能可以直接将它们保存在 Cassandra 中。
【讨论】:
谢谢。这似乎有效。我还没有写信给 Cassandra,但前面的步骤正在工作以上是关于结构化流式传输:具有流式传输源的查询必须使用 writeStream.start() 执行的主要内容,如果未能解决你的问题,请参考以下文章
发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询