旋转流式数据帧 pyspark

Posted

技术标签:

【中文标题】旋转流式数据帧 pyspark【英文标题】:Pivot a streaming dataframe pyspark 【发布时间】:2021-07-14 14:19:38 【问题描述】:

我有一个来自 kafka 的流式数据框,我需要旋转两列。这是我目前使用的代码:

streaming_df = streaming_df.groupBy('Id','Date')\
            .pivot('Var')\
            .agg(first('Val'))

query = streaming_df.limit(5) \
            .writeStream \
            .outputMode("append") \
            .format("memory") \
            .queryName("stream") \
            .start()

time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()

`

我收到以下错误: pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

pyspark 版本:3.1.1

任何想法如何使用流数据帧实现数据透视?

【问题讨论】:

是help你吗? 很遗憾没有。 start() 之后出现的更新代码。 @Kafels 您的 sql 命令应该出现在 start() 之前,因为它适用于流数据。或者您必须在运行 sql 命令之前 stop() 流式传输。让我知道它是否对您有帮助! 不行,两个版本都试过了。问题是在流式df上应用pivot,因为sql命令适用于其他转换(不包括pivot)。 @DimitriK.Sifoua 我明白了。事实上,当它应用于流数据时,不支持枢轴转换。您必须将 foreachBatch 与用户定义的函数一起使用,该函数将在批处理模式下应用枢轴转换。 【参考方案1】:

在应用于流式数据时,Spark 不支持 pivot 转换。

您可以做的是将foreachBatch 与这样的用户定义函数一起使用:

def apply_pivot(stream_df, batch_id):
    # Here your pivot transformation
    stream_df \
        .groupBy('Id','Date') \
        .pivot('Var') \
        .agg(first('Val')) \
        .write \
        .format('memory') \
        .outputMode('append') \
        .queryName("stream")

query = streaming_df.limit(5) \
    .writeStream \
    .foreachBatch(apply_pivot) \
    .start()

time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()

如果对你有帮助,请告诉我!

【讨论】:

您好,谢谢。一个后续问题-我的输入来自流式 kafka 主题,在接下来的阶段中,我需要将干净+聚合+过滤的数据保存在 MSQL 服务器中。假设我使用 ForeachBatch 在当前微批次上应用 PIVOT 和任何其他聚合函数,这些更改会影响过去运行的先前批次吗?例如,如果我在 col 上计算 avg,这个 avg 是累积的? 您在 foreachbatch 中执行的每个聚合函数当然只适用于当前批次! 那么在 Foreach 内部完成的计算呢?它们是否反映在数据存储(MSQL 服务器)中,更重要的是 - 我如何确保它们根据不同批次之间的每条相关记录进行累积和计算? Foreach 将计算应用于每一行,而 ForeachBatch 将计算应用于行组。如果要在整个流中执行计算,可以使用来自 spark 上下文的累加器。 link

以上是关于旋转流式数据帧 pyspark的主要内容,如果未能解决你的问题,请参考以下文章

Android:编码的相机预览帧总是横向的。如何旋转?

使用流式数据帧进行内部连接

如何将流式数据帧写入 PostgreSQL?

如何在火花流中刷新加载的数据帧内容?

如何将流式 DataFrame 转换为常规批处理 DataFrame? [关闭]

将视频帧转换为流式视频