使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)
Posted
技术标签:
【中文标题】使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)【英文标题】:Joining Pyspark Dataframes Using Complex Conditional Logic (Perhaps Using a Map Instead) 【发布时间】:2020-04-10 19:52:45 【问题描述】:下午好。
我正在尝试在 Pyspark 中执行连接,该连接使用一组复杂的条件来生成单个值。
我试图实现的最小示例可能如下所示。想象一组可以在离散时间发生的事件(在t=0
和t=40
之间)。每个事件都有一组三个独立的布尔属性,用于描述事件的性质。每个属性的触发都有一些与时间相关的值,包含在查找表中。对于每个事件,我想确定该事件的所有相关值的总和。
我的第一个数据框df_1
是一个事件列表、事件发生的时间以及与之关联的一系列布尔属性:
+-------------+------------+------------+------------+------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 |
+-------------+------------+------------+------------+------------+
| Event_1 | 13 | 1 | 0 | 1 |
| Event_2 | 24 | 0 | 1 | 1 |
| Event_3 | 35 | 1 | 0 | 0 |
+-------------+------------+------------+------------+------------+
第二个数据框df_2
是一个查找表,它描述了在特定时间特定属性为 TRUE 的相关值。由于所有时间桶中有许多重复值,因此此数据帧的格式是属性具有特定值的包含时间范围。时间范围的大小不一致,并且在不同属性之间可能会有很大差异:
+------------+----------+---------------+-------+
| START_TIME | END_TIME | PROPERTY_NAME | VALUE |
+------------+----------+---------------+-------+
| 0 | 18 | PROPERTY_1 | 0.1 |
| 19 | 40 | PROPERTY_1 | 0.8 |
| 0 | 20 | PROPERTY_2 | 0.7 |
| 20 | 24 | PROPERTY_2 | 0.3 |
| 25 | 40 | PROPERTY_2 | 0.7 |
| 0 | 40 | PROPERTY_3 | 0.5 |
+------------+----------+---------------+-------+
期望的输出:
由于Event_1
发生在时间t=13
,触发PROPERTY_1
和PROPERTY_3
,根据df_2
的值的预期总和应该是0.1(来自PROPERTY_1
0-18 桶)+ 0.5(来自PROPERTY_3
0-40 桶)= 0.6。同样,Event_2
的值应为 0.3(请记住,桶的开始/结束时间包括在内,所以这来自 20-24 桶)+ 0.5 = 0.8。最后,Event_3
= 0.8。
+-------------+------------+------------+------------+------------+-------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 | TOTAL_VALUE |
+-------------+------------+------------+------------+------------+-------------+
| Event_1 | 13 | 1 | 0 | 1 | 0.6 |
| Event_2 | 24 | 0 | 1 | 1 | 0.8 |
| Event_3 | 35 | 1 | 0 | 0 | 0.8 |
+-------------+------------+------------+------------+------------+-------------+
对于我的初始测试数据集,在事件数据框 df_1
中有大约 20,000 个事件分布在 2000 个时间桶中。每个事件都有约 44 个属性,查找表 df_2
的长度约为 53,000。由于我想将这个过程扩展到更多的数据(可能是几个数量级),我对这个问题的可并行解决方案非常感兴趣。例如,我想将 df_2
总结为一个 Python 字典,考虑到数据量,我无法将其广播给我的执行者。
由于我试图在df_1
的每一行中添加一列,因此我尝试使用类似于以下内容的嵌套映射来完成任务:
def calculate_value(df_2):
def _calculate_value(row):
row_dict = row.asDict()
rolling_value = 0.0
for property_name in [key for key in row_dict.keys() if "PROPERTY" in key]:
additional_value = (
df_2
.filter(
(pyspark.sql.functions.col("PROPERTY_NAME") == property_name)
& (pyspark.sql.functions.col("START_BUCKET") <= row_dict["EVENT_TIME"])
& (pyspark.sql.functions.col("END_BUCKET") >= row_dict["EVENT_TIME"])
)
.select("VALUE")
.collect()
)[0][0]
rolling_value += additional_value
return pyspark.sql.Row(**row_dict)
return _calculate_value
此代码能够在驱动程序上执行连接(通过运行calculate_value(df_2)(df_1.rdd.take(1)[0])
),但是当我尝试执行并行化映射时:
(
df_1
.rdd
.map(calculate_value(df_2))
)
我收到一个 Py4JError 指示它无法序列化数据框对象df_2
。这在 *** 的其他地方得到验证,例如Pyspark: PicklingError: Could not serialize object:.
我选择使用映射而不是连接,因为我要在 df_1
中的每一行中添加一列,并且考虑到编码识别 df_2
中正确行所需的复杂逻辑以加起来的困难对于每个给定的事件(首先,检查哪些属性在df_1
中触发并且为TRUE,然后在df_2
中选择这些属性,向下选择仅与给定事件时间相关的属性和值,然后将所有事件相加)。
我正在尝试以可持续、可扩展的方式重新配置df_2
,以实现更简单的连接/映射,但我不确定如何最好地进行。
任何建议将不胜感激。
【问题讨论】:
你的spark version
是什么?
嗨 Mohammad,我的 spark 版本是 2.4.4。
【参考方案1】:
Sample DataFrames:
df1.show()
+-----------+----------+----------+----------+----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|
+-----------+----------+----------+----------+----------+
| Event_1| 13| 1| 0| 1|
| Event_2| 24| 0| 1| 1|
| Event_3| 35| 1| 0| 0|
+-----------+----------+----------+----------+----------+
df2.show()
+----------+--------+-------------+-----+
|START_TIME|END_TIME|PROPERTY_NAME|VALUE|
+----------+--------+-------------+-----+
| 0| 18| PROPERTY_1| 0.1|
| 19| 40| PROPERTY_1| 0.8|
| 0| 20| PROPERTY_2| 0.7|
| 20| 24| PROPERTY_2| 0.3|
| 25| 40| PROPERTY_2| 0.7|
| 0| 40| PROPERTY_3| 0.5|
+----------+--------+-------------+-----+
这适用于 Spark2.4+
使用 DataframeAPI
。(非常可扩展,因为它只使用内置函数,并且对于尽可能多的属性都是动态的列)
只要属性列以'PROPERTY_'
开头,它就适用于尽可能多的属性,因为它是动态的。首先,我将使用 arrays_zip
和 array
和 explode
使用 将所有属性列折叠成 2 列的行element_at
给我们PROPERY_NAME,PROPERTY_VALUE
。在 join
之前,我将过滤以仅保留 PROPERY_VALUE=1
所在的所有行。连接将在 range of time
上进行,其中 PROPERTY
(具有所有折叠的属性行)=@987654335 @(df2)。这将确保我们只获得求和所需的所有行。然后我执行 groupBy
和 agg
来选择我们所有的必填列并得到我们的总和 作为 TOTAL_VALUE.
from pyspark.sql import functions as F
df1.withColumn("PROPERTIES",\
F.explode(F.arrays_zip(F.array([F.array(F.lit(x),F.col(x)) for x in df1.columns if x.startswith("PROPERTY_")]))))\
.select("EVENT_INDEX", "EVENT_TIME","PROPERTIES.*",\
*[x for x in df1.columns if x.startswith("PROPERTY_")]).withColumn("PROPERTY", F.element_at("0",1))\
.withColumn("PROPERTY_VALUE", F.element_at("0",2)).drop("0")\
.filter('PROPERTY_VALUE=1').join(df2, (df1.EVENT_TIME>=df2.START_TIME) & (df1.EVENT_TIME<=df2.END_TIME)& \
(F.col("PROPERTY")==df2.PROPERTY_NAME)).groupBy("EVENT_INDEX").agg(F.first("EVENT_TIME").alias("EVENT_TIME"),\
*[F.first(x).alias(x) for x in df1.columns if x.startswith("PROPERTY_")],\
(F.sum("VALUE").alias("TOTAL_VALUE"))).orderBy("EVENT_TIME").show()
+-----------+----------+----------+----------+----------+-----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|TOTAL_VALUE|
+-----------+----------+----------+----------+----------+-----------+
| Event_1| 13| 1| 0| 1| 0.6|
| Event_2| 24| 0| 1| 1| 0.8|
| Event_3| 35| 1| 0| 0| 0.8|
+-----------+----------+----------+----------+----------+-----------+
【讨论】:
感谢您的回复!感谢您花时间教我这个新的 Spark 功能。 我很高兴为您提供帮助!以上是关于使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:加入 2 个数据帧以仅从第 2 个数据帧获取新记录(历史化)
如果在 pyspark 数据帧中后跟连续 5 个“0”,则在条件下获取第一个“1”
为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?