如何与前一组(月)的同一行执行自连接以在 Pyspark 中引入其他列

Posted

技术标签:

【中文标题】如何与前一组(月)的同一行执行自连接以在 Pyspark 中引入其他列【英文标题】:How to perform self join with same row of previous group(month) to bring in additional columns in Pyspark 【发布时间】:2020-07-20 18:23:56 【问题描述】:

继续 *** 问题,因为它没有正确解释该问题:How to perform self join with same row of previous group(month) to bring in additional columns with different expressions in Pyspark(为了更好的解释)

基于以下输入,我必须根据输出数据最后一列中添加的公式推导出输出(显示)以进行解释。基本上,我需要根据 Input 中的值 1 ....n 列创建新的 opt 1 ....n 列。

输入:

|Month_no|value1 |value2 |
|  01    |10     |20     |
|  01    |20     |30     |
|  02    |30     |40     |
|  02    |40     |50     |
|  03    |50     |60     |
|  03    |60     |70     |
|  04    |70     |80     |
|  04    |80     |90     |

输出:

|Month_no|value1 |value2 |opt1   |opt2  |formula(just for explanation) to calculate opt1 an opt 2
|  01    |10     |20     |10     |20    |same as value 1 or value 2 for 01
|  01    |20     |30     |20     |30    |same as value 1 or value 2 for 01
|  02    |30     |40     |40     |60    |add 02 value 1 or value 2 + previous month 01 opt1 or opt 2 value
|  02    |40     |50     |60     |80    |add 02 value 1 or value 2 + previous month 01 opt1 or opt 2 value
|  03    |50     |60     |90     |120   |add 03 value 1 or value 2 + previous month 02 opt1 or opt 2 value
|  03    |60     |70     |120    |150   |add 03 value 1 or value 2 + previous month 02 opt1 or opt 2 value
|  04    |70     |80     |70     |80    |same as value 1 or value 2 for 04
|  04    |80     |90     |80     |90    |same as value 1 or value 2 for 04

我的编码方式:

从输入数据帧中获取不同月份# id 的列表。

month_list = sorted([row['Month_no'] for row in df.select(df.Month_no).distint().collect()])

使用 for 循环,遍历 month_list 变量

for date in month_list:
  if date is in 01 or 04 or 07 or 10:
     # Iterating on the value columns and creating the new opt1...n columns as same as value1....n values
  else:
     Filter the data for the current iteration
     df_present = df.filter(df.Month_no == "02")
     df_previous = df.filter(df.Month_no == "01")  # present month - 1
     # doing left join considering the value1...n columns from df_present and opt1...n columns (renamed clmn) from df_previous data to calculate the addition.
     df_joined = df_present.join(df_previous, on="key_column", left).select(df_present.columns, df_prevous_renamed_columns.columns)

以相同的方式迭代循环并合并数据,以便上个月计算的新列对当前月份的迭代有用。然后返回最终的联合数据帧。

但是,使用 for 循环会消耗更多时间来计算大量记录的数据。要求我们是否可以采用任何其他方法??

【问题讨论】:

您是否有一列允许我们对数据框进行排序(可能是value1)? @cronoik,我们可以做到。 【参考方案1】:

您可以使用lag 窗口函数,它允许您访问当前行之前的一行:

import pyspark.sql.functions as F
from pyspark.sql import Window

l = [( '01'    ,10     ,20     )
,(  '01'   ,20     ,30     )
,(  '02'    ,30     ,40     )
,(  '02'    ,40     ,50     )
,(  '03'    ,50     ,60     )
,(  '03'    ,60     ,70     )
,(  '04'    ,70     ,80     )
,(  '04'    ,80     ,90    )]

df = spark.createDataFrame(l, ['Month_no','value1', 'value2' ])

df = df.withColumn('opt1', df.value1)
df = df.withColumn('opt2', df.value2)    

w = Window.orderBy('value1')
    
#We use lag(COLUMN, 2) to get the row before the previous row
calculationRequiredMonthsE = ['02', '05', '08', '11']
calculationRequiredMonthsU = ['03', '06', '09', '12']

df = df.withColumn('opt1', F.when(F.col('Month_no').isin(calculationRequiredMonthsE), F.col('opt1') + F.lag('opt1', 2).over(w)).otherwise(F.col('opt1')))
df = df.withColumn('opt2', F.when(F.col('Month_no').isin(calculationRequiredMonthsE), F.col('opt2') + F.lag('opt2', 2).over(w)).otherwise(F.col('opt2')))
df = df.withColumn('opt1', F.when(F.col('Month_no').isin(calculationRequiredMonthsU), F.col('opt1') + F.lag('opt1', 2).over(w)).otherwise(F.col('opt1')))
df = df.withColumn('opt2', F.when(F.col('Month_no').isin(calculationRequiredMonthsU), F.col('opt2') + F.lag('opt2', 2).over(w)).otherwise(F.col('opt2')))

df.show()

输出:

+--------+------+------+----+----+
|Month_no|value1|value2|opt1|opt2|
+--------+------+------+----+----+
|      01|    10|    20|  10|  20|
|      01|    20|    30|  20|  30|
|      02|    30|    40|  40|  60|
|      02|    40|    50|  60|  80|
|      03|    50|    60|  90| 120|
|      03|    60|    70| 120| 150|
|      04|    70|    80|  70|  80|
|      04|    80|    90|  80|  90|
+--------+------+------+----+----+

请记住,没有partitionBy 的窗口可能会导致性能问题。如果您有另一个允许分区的列(例如年份),您应该始终使用它。

【讨论】:

感谢您使用正确的火花线进行解释。我明白了做事的方式。但是,我可以看到使用 for 循环可能会导致顺序过程出现问题。即使我有一个用 for 循环测试的方法,它也没有很好的性能优势。请推荐? @Rocky1989 我已移除循环以减少执行计划。

以上是关于如何与前一组(月)的同一行执行自连接以在 Pyspark 中引入其他列的主要内容,如果未能解决你的问题,请参考以下文章

如何使通过公共相关 ID 链接的变量显示在 Splunk 表的同一行上?

链表习题-寻找单链表中数据域大小为k的结点,并与前一结点交换,如果前一结点存在的情况下

如何检查新的输入行是不是与前一行(数组)冗余

如何仅选择至少一个值与前一行发生更改的行

如何设置 Kafka 连接器以在 Debezium 中使用自定义转换?

Java/Bukkit (Minecraft) - 如果文件包含玩家名,则检查存储在“:”之后的同一行中的密码并执行某些操作