如何在单个 Spark 作业中调用多个 writeStream 操作?

Posted

技术标签:

【中文标题】如何在单个 Spark 作业中调用多个 writeStream 操作?【英文标题】:How do you call multiple writeStream operations within a single Spark Job? 【发布时间】:2020-07-22 20:21:08 【问题描述】:

我正在尝试编写一个 Spark Structured Streaming 作业,该作业从 Kafka 主题读取并通过 writeStream 操作写入单独的路径(在执行一些转换之后)。但是,当我运行以下代码时,只有第一个 writeStream 被执行,第二个被忽略。

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() \
  .awaitTermination()

// transform df to df2

write_two = df2.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
  .start() \
  .awaitTermination()

我最初认为我的问题与这个post 有关,但是在将我的代码更改为以下内容后:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() 

// transform df to df2 
 
write_two = df2.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
  .start()

write_one.awaitTermination()
write_two.awaitTermination()

我收到以下错误:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我不确定为什么start()awaitTermination() 之间的附加代码会导致上述错误(但我认为这可能是一个单独的问题,在answer 中引用到上面的同一篇文章)。在同一个作业中调用多个 writeStream 操作的正确方法是什么?最好在foreachBatch 调用的函数中同时写入这两个函数,还是有更好的方法来实现这一点?

【问题讨论】:

【参考方案1】:

Spark 文档说,如果您需要写入多个位置,则需要使用 foreachBatch 方法。

您的代码应该类似于:

streamingDF.writeStream.foreachBatch  (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()

注意:需要persist 以防止重新计算。

您可以查看更多:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

【讨论】:

这是否意味着我将有一个用于两个写入的检查点位置?如果其中一个写入失败,而另一个写入失败怎么办。最好为每个写入设置单独的检查点,以便在发生故障时每个都可以独立恢复?【参考方案2】:

您只是不为每个流查询调用 awaiTermination(),而只是通过 spark 会话调用一个,例如:

spark.streams.awaitAnyTermination()

【讨论】:

以上是关于如何在单个 Spark 作业中调用多个 writeStream 操作?的主要内容,如果未能解决你的问题,请参考以下文章

如何在单个 Spark 作业中摄取不同的 Spark 数据帧

Spark:从具有不同内存/核心配置的单个JVM作业同时启动

对于 YARN 中的单个队列,如何将 state=RUNNING 中的 spark 应用程序数量限制为 1?

如何对多个 Spark 作业并行执行多个 Kafka 主题

如何在 azure devops YAML 管道中将单个代理用于多个作业/阶段

Spark-java 多线程与运行单个 Spark 作业