如何使用 PySpark 进行嵌套的 for-each 循环

Posted

技术标签:

【中文标题】如何使用 PySpark 进行嵌套的 for-each 循环【英文标题】:how to do a nested for-each loop with PySpark 【发布时间】:2016-08-25 22:54:29 【问题描述】:

想象一个大型数据集(>40GB parquet 文件),其中包含数千个变量的值观察值作为三元组(变量、时间戳、值)

现在考虑一个您只对 500 个变量的子集感兴趣的查询。并且您想要检索特定时间点(观察窗口或时间范围)的这些变量的观察结果(值 --> 时间序列)。这样有一个开始和结束时间。

如果没有分布式计算 (Spark),您可以这样编写代码:

for var_ in variables_of_interest:
    for incident in incidents:

        var_df = df_all.filter(
            (df.Variable == var_)
            & (df.Time > incident.startTime)
            & (df.Time < incident.endTime))

我的问题是:如何使用 Spark/PySpark 做到这一点?我在想:

    以某种方式将事件与变量结合起来,然后过滤数据框。 在过滤变量观测值 (df_all) 时广播事件数据帧并在地图函数中使用它。 以某种方式使用 RDD.cartasian 或 RDD.mapParitions(备注:parquet 文件是按变量分区保存的)。

预期的输出应该是:

incident1 --> dataframe 1
incident2 --> dataframe 2
...

其中数据框 1 包含事件 1 时间范围内的所有变量及其观测值,数据框 2 包含事件 2 时间范围内的所有变量。

我希望你明白了。

更新

我尝试根据想法 #1 和 zero323 给出的答案中的代码编写解决方案。工作很好,但我想知道如何在最后一步将它聚合/分组到事件中?我尝试为每个事件添加一个序列号,但在最后一步出现错误。如果您可以查看和/或完成代码,那就太好了。因此,我上传了示例数据和脚本。环境为 Spark 1.4 (PySpark):

事件:incidents.csv 变量值观测数据(77MB):parameters_sample.csv(放到HDFS) Jupyter 笔记本:nested_for_loop_optimized.ipynb Python 脚本:nested_for_loop_optimized.py 脚本的PDF导出:nested_for_loop_optimized.pdf

【问题讨论】:

不是一个大数据集,它甚至不是一个 tera。虽然很大! :) 到目前为止你尝试过什么..? 我阅读了大量的帖子和示例只是为了了解如何解决它。但是还没有实现任何东西。但我的第一个镜头是使用 isin 函数将变量过滤到 df2,然后广播事件数据帧并在 df2 上使用 map。但不确定如何为每个事件获取(屈服)这些数据框(观察)。不知何故卡住了。 join 看起来像是一个明智的起始点。您有足够的能力避免笛卡尔积,并且有 500 条记录,这可以很容易地优化为广播加入。 有什么例子吗?我今天会尝试编写一个代码。 我在上面添加了示例代码和脚本。请查看。 【参考方案1】:

一般来说,只有第一种方法对我来说是明智的。记录数量和分布的精确连接策略,但您可以创建***数据框:

ref = sc.parallelize([(var_, incident) 
    for var_ in variables_of_interest:
    for incident in incidents
]).toDF(["var_", "incident"])

只需join

same_var = col("Variable") == col("var_")
same_time = col("Time").between(
    col("incident.startTime"),
    col("incident.endTime")
)

ref.join(df.alias("df"), same_var &  same_time)

或针对特定分区执行连接:

incidents_ = sc.parallelize([
   (incident, ) for incident in incidents
]).toDF(["incident"])

for var_ in variables_of_interest:
    df = spark.read.parquet("/some/path/Variable=0".format(var_))
    df.join(incidents_, same_time)

可选marking one side as small enough to be broadcasted。

【讨论】:

嗯,谢谢你的例子。需要一些时间来理解它。你能在 Skype 上联系我吗?只是为了讨论一些细节? nagilo12345 抱歉@Matthias,我没有帐户了。 嗨,零。我尝试在脚本中使用您的代码并且到目前为止工作正常。我唯一没有得到的是如何为每个事件添加一个数字,我可以在最后的连接步骤之后使用它来按事件编号从结果帧中选择数据。您可以在上面更新的问题中找到脚本。请审核,谢谢! 对不起,我最近不在这儿。有空我会去看看。 是的,stackoverlow 不是最快的沟通方式;) 无论如何感谢您的支持。

以上是关于如何使用 PySpark 进行嵌套的 for-each 循环的主要内容,如果未能解决你的问题,请参考以下文章

如何使用pyspark从xml的每个嵌套节点创建一个表

如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?

复杂和嵌套的 json 数据集如何与 pyspark 一起使用

PySpark在嵌套数组中反转StringIndexer

PySpark:如何更新嵌套列?

pyspark中的未嵌套列表