如何与前一组(月)的同一行执行自连接以在 Pyspark 中引入其他列
Posted
技术标签:
【中文标题】如何与前一组(月)的同一行执行自连接以在 Pyspark 中引入其他列【英文标题】:How to perform self join with same row of previous group(month) to bring in additional columns in Pyspark 【发布时间】:2020-07-20 18:23:56 【问题描述】:继续 *** 问题,因为它没有正确解释该问题:How to perform self join with same row of previous group(month) to bring in additional columns with different expressions in Pyspark(为了更好的解释)
基于以下输入,我必须根据输出数据最后一列中添加的公式推导出输出(显示)以进行解释。基本上,我需要根据 Input 中的值 1 ....n 列创建新的 opt 1 ....n 列。
输入:
|Month_no|value1 |value2 |
| 01 |10 |20 |
| 01 |20 |30 |
| 02 |30 |40 |
| 02 |40 |50 |
| 03 |50 |60 |
| 03 |60 |70 |
| 04 |70 |80 |
| 04 |80 |90 |
输出:
|Month_no|value1 |value2 |opt1 |opt2 |formula(just for explanation) to calculate opt1 an opt 2
| 01 |10 |20 |10 |20 |same as value 1 or value 2 for 01
| 01 |20 |30 |20 |30 |same as value 1 or value 2 for 01
| 02 |30 |40 |40 |60 |add 02 value 1 or value 2 + previous month 01 opt1 or opt 2 value
| 02 |40 |50 |60 |80 |add 02 value 1 or value 2 + previous month 01 opt1 or opt 2 value
| 03 |50 |60 |90 |120 |add 03 value 1 or value 2 + previous month 02 opt1 or opt 2 value
| 03 |60 |70 |120 |150 |add 03 value 1 or value 2 + previous month 02 opt1 or opt 2 value
| 04 |70 |80 |70 |80 |same as value 1 or value 2 for 04
| 04 |80 |90 |80 |90 |same as value 1 or value 2 for 04
我的编码方式:
从输入数据帧中获取不同月份# id 的列表。
month_list = sorted([row['Month_no'] for row in df.select(df.Month_no).distint().collect()])
使用 for 循环,遍历 month_list 变量
for date in month_list:
if date is in 01 or 04 or 07 or 10:
# Iterating on the value columns and creating the new opt1...n columns as same as value1....n values
else:
Filter the data for the current iteration
df_present = df.filter(df.Month_no == "02")
df_previous = df.filter(df.Month_no == "01") # present month - 1
# doing left join considering the value1...n columns from df_present and opt1...n columns (renamed clmn) from df_previous data to calculate the addition.
df_joined = df_present.join(df_previous, on="key_column", left).select(df_present.columns, df_prevous_renamed_columns.columns)
以相同的方式迭代循环并合并数据,以便上个月计算的新列对当前月份的迭代有用。然后返回最终的联合数据帧。
但是,使用 for 循环会消耗更多时间来计算大量记录的数据。要求我们是否可以采用任何其他方法??
【问题讨论】:
您是否有一列允许我们对数据框进行排序(可能是value1
)?
@cronoik,我们可以做到。
【参考方案1】:
您可以使用lag 窗口函数,它允许您访问当前行之前的一行:
import pyspark.sql.functions as F
from pyspark.sql import Window
l = [( '01' ,10 ,20 )
,( '01' ,20 ,30 )
,( '02' ,30 ,40 )
,( '02' ,40 ,50 )
,( '03' ,50 ,60 )
,( '03' ,60 ,70 )
,( '04' ,70 ,80 )
,( '04' ,80 ,90 )]
df = spark.createDataFrame(l, ['Month_no','value1', 'value2' ])
df = df.withColumn('opt1', df.value1)
df = df.withColumn('opt2', df.value2)
w = Window.orderBy('value1')
#We use lag(COLUMN, 2) to get the row before the previous row
calculationRequiredMonthsE = ['02', '05', '08', '11']
calculationRequiredMonthsU = ['03', '06', '09', '12']
df = df.withColumn('opt1', F.when(F.col('Month_no').isin(calculationRequiredMonthsE), F.col('opt1') + F.lag('opt1', 2).over(w)).otherwise(F.col('opt1')))
df = df.withColumn('opt2', F.when(F.col('Month_no').isin(calculationRequiredMonthsE), F.col('opt2') + F.lag('opt2', 2).over(w)).otherwise(F.col('opt2')))
df = df.withColumn('opt1', F.when(F.col('Month_no').isin(calculationRequiredMonthsU), F.col('opt1') + F.lag('opt1', 2).over(w)).otherwise(F.col('opt1')))
df = df.withColumn('opt2', F.when(F.col('Month_no').isin(calculationRequiredMonthsU), F.col('opt2') + F.lag('opt2', 2).over(w)).otherwise(F.col('opt2')))
df.show()
输出:
+--------+------+------+----+----+
|Month_no|value1|value2|opt1|opt2|
+--------+------+------+----+----+
| 01| 10| 20| 10| 20|
| 01| 20| 30| 20| 30|
| 02| 30| 40| 40| 60|
| 02| 40| 50| 60| 80|
| 03| 50| 60| 90| 120|
| 03| 60| 70| 120| 150|
| 04| 70| 80| 70| 80|
| 04| 80| 90| 80| 90|
+--------+------+------+----+----+
请记住,没有partitionBy 的窗口可能会导致性能问题。如果您有另一个允许分区的列(例如年份),您应该始终使用它。
【讨论】:
感谢您使用正确的火花线进行解释。我明白了做事的方式。但是,我可以看到使用 for 循环可能会导致顺序过程出现问题。即使我有一个用 for 循环测试的方法,它也没有很好的性能优势。请推荐? @Rocky1989 我已移除循环以减少执行计划。以上是关于如何与前一组(月)的同一行执行自连接以在 Pyspark 中引入其他列的主要内容,如果未能解决你的问题,请参考以下文章
如何使通过公共相关 ID 链接的变量显示在 Splunk 表的同一行上?
链表习题-寻找单链表中数据域大小为k的结点,并与前一结点交换,如果前一结点存在的情况下
如何设置 Kafka 连接器以在 Debezium 中使用自定义转换?
Java/Bukkit (Minecraft) - 如果文件包含玩家名,则检查存储在“:”之后的同一行中的密码并执行某些操作