旋转流式数据帧 pyspark
Posted
技术标签:
【中文标题】旋转流式数据帧 pyspark【英文标题】:Pivot a streaming dataframe pyspark 【发布时间】:2021-07-14 14:19:38 【问题描述】:我有一个来自 kafka 的流式数据框,我需要旋转两列。这是我目前使用的代码:
streaming_df = streaming_df.groupBy('Id','Date')\
.pivot('Var')\
.agg(first('Val'))
query = streaming_df.limit(5) \
.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("stream") \
.start()
time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()
`
我收到以下错误:
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
pyspark 版本:3.1.1
任何想法如何使用流数据帧实现数据透视?
【问题讨论】:
是help你吗? 很遗憾没有。 start() 之后出现的更新代码。 @Kafels 您的 sql 命令应该出现在start()
之前,因为它适用于流数据。或者您必须在运行 sql 命令之前 stop()
流式传输。让我知道它是否对您有帮助!
不行,两个版本都试过了。问题是在流式df上应用pivot,因为sql命令适用于其他转换(不包括pivot)。 @DimitriK.Sifoua
我明白了。事实上,当它应用于流数据时,不支持枢轴转换。您必须将 foreachBatch
与用户定义的函数一起使用,该函数将在批处理模式下应用枢轴转换。
【参考方案1】:
在应用于流式数据时,Spark 不支持 pivot
转换。
您可以做的是将foreachBatch
与这样的用户定义函数一起使用:
def apply_pivot(stream_df, batch_id):
# Here your pivot transformation
stream_df \
.groupBy('Id','Date') \
.pivot('Var') \
.agg(first('Val')) \
.write \
.format('memory') \
.outputMode('append') \
.queryName("stream")
query = streaming_df.limit(5) \
.writeStream \
.foreachBatch(apply_pivot) \
.start()
time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()
如果对你有帮助,请告诉我!
【讨论】:
您好,谢谢。一个后续问题-我的输入来自流式 kafka 主题,在接下来的阶段中,我需要将干净+聚合+过滤的数据保存在 MSQL 服务器中。假设我使用 ForeachBatch 在当前微批次上应用 PIVOT 和任何其他聚合函数,这些更改会影响过去运行的先前批次吗?例如,如果我在 col 上计算 avg,这个 avg 是累积的? 您在 foreachbatch 中执行的每个聚合函数当然只适用于当前批次! 那么在 Foreach 内部完成的计算呢?它们是否反映在数据存储(MSQL 服务器)中,更重要的是 - 我如何确保它们根据不同批次之间的每条相关记录进行累积和计算? Foreach 将计算应用于每一行,而 ForeachBatch 将计算应用于行组。如果要在整个流中执行计算,可以使用来自 spark 上下文的累加器。 link以上是关于旋转流式数据帧 pyspark的主要内容,如果未能解决你的问题,请参考以下文章