Pyspark循环遍历数据框并减少列值

Posted

技术标签:

【中文标题】Pyspark循环遍历数据框并减少列值【英文标题】:Pyspark loop over dataframe and decrement column value 【发布时间】:2021-07-30 22:07:39 【问题描述】:

我需要帮助在 pyspark 数据帧中逐行循环:

例如:

df1
+---------+
|id|value|
+---------+
|a|100|
|b|100|
|c|100|
+---------+

我需要根据另一个数据帧循环并减少值


df2 

+---------+---------------
|id|value|timestamp
+---------+---------------
|a|20    |2020-01-02 01:30
|a|50    |2020-01-02 05:30
|b|50    |2020-01-15 07:30
|b|80    |2020-02-01 09:30
|c|50    |2020-02-01 09:30
+---------+-------------

基于 udf 或函数的预期输出

customFunction(df1(row_n)))


 df1
+---------+
|id|value|
+---------+
|a|30| ( 100-20 ) ( 80 - 50 )
|b|50| ( 100-50 ) skip operation since lhs < rhs ( 50 - 80 )
|c|50| ( 100 - 50 ) 
+---------+

我如何在 pyspark 中实现这一点?数据帧也将有 > 50k 行

【问题讨论】:

不应该左连接和一些聚合帮助吗? 【参考方案1】:

您可以通过加入两个数据框并使用groupBy 聚合来自df2 的值来确定value 是大于还是小于聚合值来实现此目的。

组合数据帧

input_str1 = """
|a|100
|b|100
|c|100
""".split("|")

input_values1 = list(map(lambda x:x.strip(),input_str1))[1:]

input_list1 = [(x,y) for x,y in zip(input_values1[0::2],input_values1[1::2])]

sparkDF1 = sql.createDataFrame(input_list1,['id','value'])

input_str2 = """
|a|20    |2020-01-02 01:30
|a|50    |2020-01-02 05:30
|b|50    |2020-01-15 07:30
|b|80    |2020-02-01 09:30
|c|50    |2020-02-01 09:30
""".split("|")

input_values2 = list(map(lambda x:x.strip(),input_str2))[1:]

input_list2 = [(x,y,z) for x,y,z in zip(input_values2[0::3]
,input_values2[1::3],input_values2[2::3])]

sparkDF2 = sql.createDataFrame(input_list2,['id','value','timestamp'])

finalDF = (sparkDF1.join(sparkDF2
                        ,sparkDF1['id'] == sparkDF2['id']
                        ,'inner'
                       ).select(sparkDF1["*"],sparkDF2['value'].alias('value_right')))

finalDF.show()

+---+-----+-----------+
| id|value|value_right|
+---+-----+-----------+
|  c|  100|         50|
|  b|  100|         50|
|  b|  100|         80|
|  a|  100|         20|
|  a|  100|         50|
+---+-----+-----------+

分组依据

agg_lst = [
            F.first(F.col('value')).alias('value')
           ,F.sum(F.col('value_right')).alias('sum_value_right')
           ,F.min(F.col('value_right')).alias('min_value_right')
         ]
finalDF = finalDF.groupBy('id').agg(*agg_lst).orderBy('id')


finalDF = finalDF.withColumn('final_value'
                             ,F.when(F.col('value') > F.col('sum_value_right')
                                 ,F.col('value') - F.col('sum_value_right'))\
                             .otherwise(F.col('value') - F.col('min_value_right'))
                            )

finalDF.show()

+---+-----+---------------+---------------+-----------+
| id|value|sum_value_right|min_value_right|final_value|
+---+-----+---------------+---------------+-----------+
|  a|  100|           70.0|             20|       30.0|
|  b|  100|          130.0|             50|       50.0|
|  c|  100|           50.0|             50|       50.0|
+---+-----+---------------+---------------+-----------+

注意 - 如果上述逻辑不适用于您的整个集合,则使用您的自定义逻辑实现 UDF 以及 groupBy 将是理想的解决方案

【讨论】:

谢谢@Vaebhav!

以上是关于Pyspark循环遍历数据框并减少列值的主要内容,如果未能解决你的问题,请参考以下文章

pyspark - 使用最大值为一列创建一个从 0 到该值的行值循环,并为其重复其他列值

根据列值(字符串,子字符串)比较两个数据框并更新另一个列值

PySpark Dataframe 将两列转换为基于第三列值的元组新列

Pyspark - 过滤数据框并创建排名列

循环遍历数据框以更改列值-python [重复]

如何拆分pyspark数据框并创建新列