使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)

Posted

技术标签:

【中文标题】使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)【英文标题】:Joining Pyspark Dataframes Using Complex Conditional Logic (Perhaps Using a Map Instead) 【发布时间】:2020-04-10 19:52:45 【问题描述】:

下午好。

我正在尝试在 Pyspark 中执行连接,该连接使用一组复杂的条件来生成单个值。

我试图实现的最小示例可能如下所示。想象一组可以在离散时间发生的事件(在t=0t=40 之间)。每个事件都有一组三个独立的布尔属性,用于描述事件的性质。每个属性的触发都有一些与时间相关的值,包含在查找表中。对于每个事件,我想确定该事件的所有相关值的总和。

我的第一个数据框df_1 是一个事件列表、事件发生的时间以及与之关联的一系列布尔属性:

+-------------+------------+------------+------------+------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 |
+-------------+------------+------------+------------+------------+
|   Event_1   |     13     |     1      |      0     |     1      |
|   Event_2   |     24     |     0      |      1     |     1      |
|   Event_3   |     35     |     1      |      0     |     0      |
+-------------+------------+------------+------------+------------+

第二个数据框df_2 是一个查找表,它描述了在特定时间特定属性为 TRUE 的相关值。由于所有时间桶中有许多重复值,因此此数据帧的格式是属性具有特定值的包含时间范围。时间范围的大小不一致,并且在不同属性之间可能会有很大差异:

+------------+----------+---------------+-------+
| START_TIME | END_TIME | PROPERTY_NAME | VALUE |
+------------+----------+---------------+-------+
|      0     |    18    |  PROPERTY_1   |  0.1  |
|     19     |    40    |  PROPERTY_1   |  0.8  |
|      0     |    20    |  PROPERTY_2   |  0.7  |
|     20     |    24    |  PROPERTY_2   |  0.3  |
|     25     |    40    |  PROPERTY_2   |  0.7  |
|      0     |    40    |  PROPERTY_3   |  0.5  |
+------------+----------+---------------+-------+

期望的输出: 由于Event_1 发生在时间t=13,触发PROPERTY_1PROPERTY_3,根据df_2 的值的预期总和应该是0.1(来自PROPERTY_1 0-18 桶)+ 0.5(来自PROPERTY_3 0-40 桶)= 0.6。同样,Event_2 的值应为 0.3(请记住,桶的开始/结束时间包括在内,所以这来自 20-24 桶)+ 0.5 = 0.8。最后,Event_3 = 0.8。

+-------------+------------+------------+------------+------------+-------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 | TOTAL_VALUE |
+-------------+------------+------------+------------+------------+-------------+
|   Event_1   |     13     |     1      |      0     |     1      |     0.6     |
|   Event_2   |     24     |     0      |      1     |     1      |     0.8     |
|   Event_3   |     35     |     1      |      0     |     0      |     0.8     |
+-------------+------------+------------+------------+------------+-------------+

对于我的初始测试数据集,在事件数据框 df_1 中有大约 20,000 个事件分布在 2000 个时间桶中。每个事件都有约 44 个属性,查找表 df_2 的长度约为 53,000。由于我想将这个过程扩展到更多的数据(可能是几个数量级),我对这个问题的可并行解决方案非常感兴趣。例如,我想将 df_2 总结为一个 Python 字典,考虑到数据量,我无法将其广播给我的执行者。

由于我试图在df_1 的每一行中添加一列,因此我尝试使用类似于以下内容的嵌套映射来完成任务:

def calculate_value(df_2):
    def _calculate_value(row):
        row_dict = row.asDict()
        rolling_value = 0.0
        for property_name in [key for key in row_dict.keys() if "PROPERTY" in key]:
            additional_value = (
                df_2
                .filter(
                    (pyspark.sql.functions.col("PROPERTY_NAME") == property_name)
                    & (pyspark.sql.functions.col("START_BUCKET") <= row_dict["EVENT_TIME"])
                    & (pyspark.sql.functions.col("END_BUCKET") >= row_dict["EVENT_TIME"])
                )
                .select("VALUE")
                .collect()
            )[0][0]
            rolling_value += additional_value
        return pyspark.sql.Row(**row_dict)
    return _calculate_value

此代码能够在驱动程序上执行连接(通过运行calculate_value(df_2)(df_1.rdd.take(1)[0])),但是当我尝试执行并行化映射时:

(
    df_1
    .rdd
    .map(calculate_value(df_2))
)

我收到一个 Py4JError 指示它无法序列化数据框对象df_2。这在 *** 的其他地方得到验证,例如Pyspark: PicklingError: Could not serialize object:.

我选择使用映射而不是连接,因为我要在 df_1 中的每一行中添加一列,并且考虑到编码识别 df_2 中正确行所需的复杂逻辑以加起来的困难对于每个给定的事件(首先,检查哪些属性在df_1 中触发并且为TRUE,然后在df_2 中选择这些属性,向下选择仅与给定事件时间相关的属性和值,然后将所有事件相加)。

我正在尝试以可持续、可扩展的方式重新配置df_2,以实现更简单的连接/映射,但我不确定如何最好地进行。

任何建议将不胜感激。

【问题讨论】:

你的spark version是什么? 嗨 Mohammad,我的 spark 版本是 2.4.4。 【参考方案1】:

Sample DataFrames:

df1.show()
+-----------+----------+----------+----------+----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|
+-----------+----------+----------+----------+----------+
|    Event_1|        13|         1|         0|         1|
|    Event_2|        24|         0|         1|         1|
|    Event_3|        35|         1|         0|         0|
+-----------+----------+----------+----------+----------+

df2.show()
+----------+--------+-------------+-----+
|START_TIME|END_TIME|PROPERTY_NAME|VALUE|
+----------+--------+-------------+-----+
|         0|      18|   PROPERTY_1|  0.1|
|        19|      40|   PROPERTY_1|  0.8|
|         0|      20|   PROPERTY_2|  0.7|
|        20|      24|   PROPERTY_2|  0.3|
|        25|      40|   PROPERTY_2|  0.7|
|         0|      40|   PROPERTY_3|  0.5|
+----------+--------+-------------+-----+

这适用于 Spark2.4+ 使用 DataframeAPI(非常可扩展,因为它只使用内置函数,并且对于尽可能多的属性都是动态的列)

只要属性列以'PROPERTY_' 开头,它就适用于尽可能多的属性,因为它是动态的。首先,我将使用 arrays_ziparrayexplode 使用 将所有属性列折叠成 2 列的行element_at 给我们PROPERY_NAME,PROPERTY_VALUE。在 join 之前,我将过滤以仅保留 PROPERY_VALUE=1 所在的所有行。连接将在 range of time 上进行,其中 PROPERTY(具有所有折叠的属性行)=@987654335 @df2)。这将确保我们只获得求和所需的所有行。然后我执行 groupByagg 来选择我们所有的必填列并得到我们的总和 作为 TOTAL_VALUE.

from pyspark.sql import functions as F
df1.withColumn("PROPERTIES",\
F.explode(F.arrays_zip(F.array([F.array(F.lit(x),F.col(x)) for x in df1.columns if x.startswith("PROPERTY_")]))))\
.select("EVENT_INDEX", "EVENT_TIME","PROPERTIES.*",\
       *[x for x in df1.columns if x.startswith("PROPERTY_")]).withColumn("PROPERTY", F.element_at("0",1))\
                                                    .withColumn("PROPERTY_VALUE", F.element_at("0",2)).drop("0")\
.filter('PROPERTY_VALUE=1').join(df2, (df1.EVENT_TIME>=df2.START_TIME) & (df1.EVENT_TIME<=df2.END_TIME)& \
(F.col("PROPERTY")==df2.PROPERTY_NAME)).groupBy("EVENT_INDEX").agg(F.first("EVENT_TIME").alias("EVENT_TIME"),\
*[F.first(x).alias(x) for x in df1.columns if x.startswith("PROPERTY_")],\
(F.sum("VALUE").alias("TOTAL_VALUE"))).orderBy("EVENT_TIME").show()

+-----------+----------+----------+----------+----------+-----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|TOTAL_VALUE|
+-----------+----------+----------+----------+----------+-----------+
|    Event_1|        13|         1|         0|         1|        0.6|
|    Event_2|        24|         0|         1|         1|        0.8|
|    Event_3|        35|         1|         0|         0|        0.8|
+-----------+----------+----------+----------+----------+-----------+

【讨论】:

感谢您的回复!感谢您花时间教我这个新的 Spark 功能。 我很高兴为您提供帮助!

以上是关于使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:加入 2 个数据帧以仅从第 2 个数据帧获取新记录(历史化)

加入两个 Pyspark 数据帧的两种方法有啥区别

如果在 pyspark 数据帧中后跟连续 5 个“0”,则在条件下获取第一个“1”

为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?

优先加入 PySpark 数据帧

按复杂标准合并/加入 2 个数据帧