Spark结构化流多个WriteStream到同一个接收器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark结构化流多个WriteStream到同一个接收器相关的知识,希望对你有一定的参考价值。

Spark Structured Streaming 2.2.1中没有按顺序发生两个Writestream到同一个数据库接收器。请建议如何按顺序执行它们。

val deleteSink = ds1.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

val UpsertSink = ds2.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

使用上面的代码,deleteSinkUpsertSink之后执行。

答案

如果您想要并行运行两个流,则必须使用

sparkSession.streams.awaitAnyTermination()

代替

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

在你的情况下,除非删除deleteSink或抛出异常,否则UpsertSink永远不会启动,正如scaladoc中所说的那样

等待通过this或异常终止query.stop()查询。如果查询以异常终止,则抛出异常。如果查询已终止,则对此方法的所有后续调用将立即返回(如果查询由stop()终止),或立即抛出异常(如果查询已终止异常)。

以上是关于Spark结构化流多个WriteStream到同一个接收器的主要内容,如果未能解决你的问题,请参考以下文章

是否可以通过 spark 直接将 Writestream 用于 API

在同一个 Spark 会话中运行多个 Spark Kafka 结构化流查询会增加偏移量但显示 numInputRows 0

如何将 Spark 结构化流数据写入 Hive?

如何在单个 Spark 作业中调用多个 writeStream 操作?

[Spark]-结构化流之监控&故障恢复篇

将spark结构化流数据帧转换为JSON