Pyspark循环遍历数据框并减少列值
Posted
技术标签:
【中文标题】Pyspark循环遍历数据框并减少列值【英文标题】:Pyspark loop over dataframe and decrement column value 【发布时间】:2021-07-30 22:07:39 【问题描述】:我需要帮助在 pyspark 数据帧中逐行循环:
例如:
df1
+---------+
|id|value|
+---------+
|a|100|
|b|100|
|c|100|
+---------+
我需要根据另一个数据帧循环并减少值
df2
+---------+---------------
|id|value|timestamp
+---------+---------------
|a|20 |2020-01-02 01:30
|a|50 |2020-01-02 05:30
|b|50 |2020-01-15 07:30
|b|80 |2020-02-01 09:30
|c|50 |2020-02-01 09:30
+---------+-------------
基于 udf 或函数的预期输出
customFunction(df1(row_n)))
df1
+---------+
|id|value|
+---------+
|a|30| ( 100-20 ) ( 80 - 50 )
|b|50| ( 100-50 ) skip operation since lhs < rhs ( 50 - 80 )
|c|50| ( 100 - 50 )
+---------+
我如何在 pyspark 中实现这一点?数据帧也将有 > 50k 行
【问题讨论】:
不应该左连接和一些聚合帮助吗? 【参考方案1】:您可以通过加入两个数据框并使用groupBy 聚合来自df2
的值来确定value
是大于还是小于聚合值来实现此目的。
组合数据帧
input_str1 = """
|a|100
|b|100
|c|100
""".split("|")
input_values1 = list(map(lambda x:x.strip(),input_str1))[1:]
input_list1 = [(x,y) for x,y in zip(input_values1[0::2],input_values1[1::2])]
sparkDF1 = sql.createDataFrame(input_list1,['id','value'])
input_str2 = """
|a|20 |2020-01-02 01:30
|a|50 |2020-01-02 05:30
|b|50 |2020-01-15 07:30
|b|80 |2020-02-01 09:30
|c|50 |2020-02-01 09:30
""".split("|")
input_values2 = list(map(lambda x:x.strip(),input_str2))[1:]
input_list2 = [(x,y,z) for x,y,z in zip(input_values2[0::3]
,input_values2[1::3],input_values2[2::3])]
sparkDF2 = sql.createDataFrame(input_list2,['id','value','timestamp'])
finalDF = (sparkDF1.join(sparkDF2
,sparkDF1['id'] == sparkDF2['id']
,'inner'
).select(sparkDF1["*"],sparkDF2['value'].alias('value_right')))
finalDF.show()
+---+-----+-----------+
| id|value|value_right|
+---+-----+-----------+
| c| 100| 50|
| b| 100| 50|
| b| 100| 80|
| a| 100| 20|
| a| 100| 50|
+---+-----+-----------+
分组依据
agg_lst = [
F.first(F.col('value')).alias('value')
,F.sum(F.col('value_right')).alias('sum_value_right')
,F.min(F.col('value_right')).alias('min_value_right')
]
finalDF = finalDF.groupBy('id').agg(*agg_lst).orderBy('id')
finalDF = finalDF.withColumn('final_value'
,F.when(F.col('value') > F.col('sum_value_right')
,F.col('value') - F.col('sum_value_right'))\
.otherwise(F.col('value') - F.col('min_value_right'))
)
finalDF.show()
+---+-----+---------------+---------------+-----------+
| id|value|sum_value_right|min_value_right|final_value|
+---+-----+---------------+---------------+-----------+
| a| 100| 70.0| 20| 30.0|
| b| 100| 130.0| 50| 50.0|
| c| 100| 50.0| 50| 50.0|
+---+-----+---------------+---------------+-----------+
注意 - 如果上述逻辑不适用于您的整个集合,则使用您的自定义逻辑实现 UDF 以及 groupBy 将是理想的解决方案
【讨论】:
谢谢@Vaebhav!以上是关于Pyspark循环遍历数据框并减少列值的主要内容,如果未能解决你的问题,请参考以下文章
pyspark - 使用最大值为一列创建一个从 0 到该值的行值循环,并为其重复其他列值