PySpark 按最接近的时间值连接两个数据帧
Posted
技术标签:
【中文标题】PySpark 按最接近的时间值连接两个数据帧【英文标题】:PySpark joining two data frames by closest time value 【发布时间】:2016-11-16 12:24:08 【问题描述】:我有两个数据框(tx_df 和 login_df)。 第一个包含 player_id、tx_id 和 tx_time 列,而第二个包含 player_id 和 login_time。
我想要做的是使用 player_id 列加入这两个数据框,但除此之外,只加入来自 login_df 的最新登录行。 比如有这样的tx_df:
pid_1, txid_1, '2016-11-16 00:01:00'
pid_1, txid_2, '2016-11-16 00:01:02'
pid_1, txid_3, '2016-11-16 00:02:15'
pid_1, txid_4, '2016-11-16 00:02:16'
pid_1, txid_5, '2016-11-16 00:02:17'
和 login_df 是这样的:
pid_1, '2016-11-16 00:02:10'
pid_1, '2016-11-16 00:00:55'
pid_1, '2016-11-13 00:03:00'
pid_1, '2016-11-10 16:30:00'
我希望生成的数据框如下所示:
pid_1, txid_1, '2016-11-16 00:01:00', pid_1, '2016-11-16 00:00:55'
pid_1, txid_2, '2016-11-16 00:01:02', pid_1, '2016-11-16 00:00:55'
pid_1, txid_3, '2016-11-16 00:02:15', pid_1, '2016-11-16 00:02:10'
pid_1, txid_4, '2016-11-16 00:02:16', pid_1, '2016-11-16 00:02:10'
pid_1, txid_5, '2016-11-16 00:02:17', pid_1, '2016-11-16 00:02:10'
我不是强制绑定到数据帧的,因此我们将不胜感激如何使用 RDD 或任何其他方法很好地完成它。
我担心数据爆炸,因为 tx_df 可以为每个玩家 id 拥有数千个交易条目(然后是数千个玩家 id),而 login_df 也可以有未知数量的玩家登录信息。简单地在 player_id 上加入这两个会创建一个巨大的数据框,因为笛卡尔积是不可接受的。
注意:我正在使用 Python API for Spark。
【问题讨论】:
【参考方案1】:为了将来的参考,我设法用稍微不同的方法解决了这个问题。 我很幸运,第二个数据帧小到可以广播它。更准确地说,我广播了值的哈希图,但这只是因为我发现它非常适合这个目的。 (见:broadcast variables in Spark)
然后,我像这样遍历第一个数据帧的行
tx_df.rdd.map(my_map_function)
在 my_map_function 中,我访问了广播的 hasmap,确实需要排序和其他操作,最后选择了要附加到第一个数据帧的行的值。
作为广播值的哈希图的一个很好的副作用,我能够删除数据帧的连接并加快处理速度。 在这样做之前,脚本有
将数据加载到数据帧中 将数据帧合并成一个大帧 过滤掉不需要的大数据帧行这个广播解决方案后,脚本有
将数据加载到数据帧中 第二个的广播值 仅迭代第一个,直接访问第二个的值并将它们附加到当前行第二种方法不需要过滤,因为已经提取了正确的值,因此脚本执行速度更快。
【讨论】:
以上是关于PySpark 按最接近的时间值连接两个数据帧的主要内容,如果未能解决你的问题,请参考以下文章
pyspark 内连接的替代方法来比较 pyspark 中的两个数据帧
Pyspark:内部连接两个 pyspark 数据帧并从第一个数据帧中选择所有列,从第二个数据帧中选择几列