pyspark 内连接的替代方法来比较 pyspark 中的两个数据帧

Posted

技术标签:

【中文标题】pyspark 内连接的替代方法来比较 pyspark 中的两个数据帧【英文标题】:alternative of pyspark inner join to compare two dataframes in pyspark 【发布时间】:2020-07-21 12:47:44 【问题描述】:

我在 pyspark 中有两个数据框。如下所示,df1 包含来自传感器的整个 long_lat。第二个数据帧 df2 是第一个数据帧的子集,其中 lat-long 值四舍五入到小数点后 2 位,然后删除重复项以保留唯一的 lat_long 数据点。

df1:

+-----------------+---------+-----+--------------------+----------+------------+
|              UID|    label|value|            datetime|  latitude|   longitude|
+-----------------+---------+-----+--------------------+----------+------------+
|1B0545GD6546Y|evnt     | 3644|2020-06-08T23:32:...|40.1172005|-105.0823546|
|1B0545GD6FG67|evnt     | 3644|2020-06-08T23:32:...|40.1172201|-105.0821007|
|15GD6546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1172396|-105.0818468|
|1BGD6546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1172613|-105.0815929|
|1BGD6546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1172808|-105.0813368|
|1B054546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1173003|-105.0810742|
|1B056546YFG67|evnt     | 3644|2020-06-08T23:32:...| 40.117322|-105.0808073|

df2:

+-------+--------+----------------+--------------+                              
|new_lat|new_long|        lat_long|    State_name|
+-------+--------+----------------+--------------+
|  40.13|  -105.1|[40.13, -105.1] |      Colorado|
|  40.15| -105.11|[40.15, -105.11]|      Colorado|
|  40.12| -105.07|[40.12, -105.07]|      Colorado|
|  40.13| -104.99|[40.13, -104.99]|      Colorado|
|  40.15| -105.09|[40.15, -105.09]|      Colorado|
|  40.15| -105.13|[40.15, -105.13]|      Colorado|
|  40.12| -104.94|[40.12, -104.94]|      Colorado|

因此,df2 的行数比第一个少得多。在 df2 中,我应用了一个 udf 来计算州名。

现在我想在 df1 中填充州名。由于 df2 的 lat_long 值四舍五入到小数点后 2,为了匹配我使用如下阈值,我在这里使用连接操作。

threshold = 0.01

df4 = df1.join(df2)\
        .filter(df2.new_lat - threshold < df1.latitude)\
        .filter(df1.latitude < df2.new_lat + threshold)

还有其他有效的方法可以达到同样的效果吗?因为连接操作是做笛卡尔积,需要时间和大量的任务。

考虑一下,我的 df1 将有 10000 亿条记录。

任何帮助将不胜感激。

【问题讨论】:

为什么不添加过滤条件作为连接条件呢? 【参考方案1】:

每当您将一个大 DataFrame 与一个较小的 DataFrame 连接时,您应该始终尝试执行broadcast join。

如果df2 小到可以广播,那么df1.join(broadcast(df2)) 的性能会更好。

join() 方法的第二个参数应该是连接条件。

def approx_equal(col1, col2, threshold):
    return abs(col1 - col2) < threshold

threshold = lit(0.01)

df4 = df1.join(broadcast(df2), approx_equal(df2.new_lat, df1.latitude, threshold) && approx_equal(df2.new_long, df1. longitude, threshold))

编辑:我将approx_equal 函数添加到quinn,因此您的代码可以更简洁:

import quinn as Q

threshold = lit(0.01)

df4 = df1.join(broadcast(df2), Q.approx_equal(df2.new_lat, df1.latitude, threshold) && Q.approx_equal(df2.new_long, df1. longitude, threshold))

【讨论】:

以上是关于pyspark 内连接的替代方法来比较 pyspark 中的两个数据帧的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中按行连接字符串

pyspark 中的 df.show() 问题

Pyspark 命令无法识别(Ubuntu)

在 pyspark 中应用用户定义的聚合函数的替代方法

在 python 中使用 pandas,numpy 是不是有 pyspark.ml.feature StringIndexer 的替代方法?

Pyspark - 配置 Amazon Redshift JDBC jar