如何在单个 Spark 作业中调用多个 writeStream 操作?
Posted
技术标签:
【中文标题】如何在单个 Spark 作业中调用多个 writeStream 操作?【英文标题】:How do you call multiple writeStream operations within a single Spark Job? 【发布时间】:2020-07-22 20:21:08 【问题描述】:我正在尝试编写一个 Spark Structured Streaming 作业,该作业从 Kafka 主题读取并通过 writeStream
操作写入单独的路径(在执行一些转换之后)。但是,当我运行以下代码时,只有第一个 writeStream
被执行,第二个被忽略。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start() \
.awaitTermination()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start() \
.awaitTermination()
我最初认为我的问题与这个post 有关,但是在将我的代码更改为以下内容后:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start()
write_one.awaitTermination()
write_two.awaitTermination()
我收到以下错误:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
我不确定为什么start()
和awaitTermination()
之间的附加代码会导致上述错误(但我认为这可能是一个单独的问题,在answer 中引用到上面的同一篇文章)。在同一个作业中调用多个 writeStream
操作的正确方法是什么?最好在foreachBatch
调用的函数中同时写入这两个函数,还是有更好的方法来实现这一点?
【问题讨论】:
【参考方案1】:Spark 文档说,如果您需要写入多个位置,则需要使用 foreachBatch
方法。
您的代码应该类似于:
streamingDF.writeStream.foreachBatch (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
注意:需要persist
以防止重新计算。
您可以查看更多:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
【讨论】:
这是否意味着我将有一个用于两个写入的检查点位置?如果其中一个写入失败,而另一个写入失败怎么办。最好为每个写入设置单独的检查点,以便在发生故障时每个都可以独立恢复?【参考方案2】:您只是不为每个流查询调用 awaiTermination()
,而只是通过 spark 会话调用一个,例如:
spark.streams.awaitAnyTermination()
【讨论】:
以上是关于如何在单个 Spark 作业中调用多个 writeStream 操作?的主要内容,如果未能解决你的问题,请参考以下文章
如何在单个 Spark 作业中摄取不同的 Spark 数据帧
Spark:从具有不同内存/核心配置的单个JVM作业同时启动
对于 YARN 中的单个队列,如何将 state=RUNNING 中的 spark 应用程序数量限制为 1?