在 PySpark 中使用日期滚动连接?
Posted
技术标签:
【中文标题】在 PySpark 中使用日期滚动连接?【英文标题】:Rolling join using dates in PySpark? 【发布时间】:2021-01-14 14:58:46 【问题描述】:我正在尝试在两个 PySpark 数据帧之间进行连接,连接一个键,但是第一个表的日期应该总是在第二个表的日期之后。举个例子。我们正在尝试加入两个表:
表 1:
Date1 value1 key
13 Feb 2020 1 a
01 Mar 2020 2 a
31 Mar 2020 3 a
15 Apr 2020 4 a
表 2:
Date2 value2 key
10 Feb 2020 11 a
15 Mar 2020 22 a
加入后,结果应该是这样的:
Date1 value1 value2 key
13 Feb 2020 1 11 a
01 Mar 2020 2 null a
31 Mar 2020 3 22 a
15 Apr 2020 4 null a
有什么想法吗?
【问题讨论】:
【参考方案1】:这是一个有趣的连接。我的做法是先在key上join,选择最早的日期,找到最早的日期后进行self join。
from pyspark.sql import functions as F, Window
# Clean up date format first
df3 = df1.withColumn('Date1', F.to_date('Date1', 'dd MMM yyyy'))
df4 = df2.withColumn('Date2', F.to_date('Date2', 'dd MMM yyyy'))
result = (df3.join(df4, 'key')
.filter('Date1 > Date2')
.withColumn('rn', F.row_number().over(Window.partitionBy('Date2').orderBy('Date1')))
.filter('rn = 1')
.drop('key', 'rn', 'Date2')
.join(df3, ['Date1', 'value1'], 'right')
)
result.show()
+----------+------+------+---+
|Date1 |value1|value2|key|
+----------+------+------+---+
|2020-02-13|1 |11 |a |
|2020-03-01|2 |null |a |
|2020-03-31|3 |22 |a |
|2020-04-15|4 |null |a |
+----------+------+------+---+
【讨论】:
这很复杂...有什么办法可以避免按 value2 分组吗?真实案例有更多列,value2 在这里只是一个说明。为什么我们按 value2 分组? @Ehrendil 那是因为我们要匹配每个 Date2/value2 对应的最早的 Date1。 谢谢。我的 F 模块似乎没有 array_min。这似乎是一个较新的功能。没有它我能做什么? @Ehrendil 我已经清理了我的答案。不再按 value2 分组,也不再有 array_min。让我知道它是否有效! 花了一些时间来适应我的实际用例,但这非常有效。谢谢! 【参考方案2】:你可以试试window lag功能,它是scala的,python版本也差不多。
// change col names for union all and add extra col to indentify dataset
val df1A = df1.toDF("Date","value","key").withColumn("df",lit(1))
val df2A = df2.toDF("Date","value","key").withColumn("df",lit(2))
import org.apache.spark.sql.expressions.Window
df1A.unionAll(df2A)
.withColumn("value2",lag(array('value,'df),1) over Window.partitionBy('key).orderBy(to_date('Date,"dd MMM yyyy")))
.filter('df===1)
.withColumn("value2",when(element_at('value2,2)===2,element_at('value2,1)))
.drop("df")
.show
输出:
+-----------+-----+---+------+
| Date|value|key|value2|
+-----------+-----+---+------+
|13 Feb 2020| 1| a| 11|
|01 Mar 2020| 2| a| null|
|31 Mar 2020| 3| a| 22|
|15 Apr 2020| 4| a| null|
+-----------+-----+---+------+
【讨论】:
以上是关于在 PySpark 中使用日期滚动连接?的主要内容,如果未能解决你的问题,请参考以下文章
使用 Jupyter Notebook 为 PySpark 内核设置 spark.app.name
如何在不使用 StandardScaler 的情况下标准化 PySpark 中的列?
我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?