具有数百万条记录的 2 个数据帧之间的 Pyspark 交叉连接

Posted

技术标签:

【中文标题】具有数百万条记录的 2 个数据帧之间的 Pyspark 交叉连接【英文标题】:Pyspark crossjoin between 2 dataframes with millions of records 【发布时间】:2020-05-29 18:44:18 【问题描述】:

我有 2 个数据框 A(3500 万条记录)和 B(30000 条记录)

一个

|Text |
-------
| pqr  |
-------
| xyz  |
------- 

B

|Title |
-------
| a  |
-------
| b  |
-------
| c  |
------- 

下面的dataframe C是A和B交叉连接后得到的。

c = A.crossJoin(B, on = [A.text == B.Title)

C

|text | Title |
---------------
| pqr  | a    |
---------------
| pqr  | b    |
---------------
| pqr  | c    |
---------------
| xyz  | a    |
---------------
| xyz  | b    |
---------------
| xyz  | c    |
---------------

上面的两列都是字符串类型。

我正在执行以下操作并导致 Spark 错误(作业因阶段故障而中止)

display(c.withColumn("Contains", when(col('text').contains(col('Title')), 1).otherwise(0)).filter(col('Contains') == 0).distinct())

关于如何完成此连接以避免结果操作上的 Spark error() 的任何建议?

Spark error message

【问题讨论】:

你真的在这里做交叉连接吗?就像根本没有加入标准一样? 查看您编辑的问题,如果您指定连接列,则不需要交叉连接。我建议您使用 少量的数据对此进行测试。如果 Spark 对这些数据集进行完全交叉连接,如果我的数学正确的话,你最终会得到超过 1 万亿行。 能否在问题中也粘贴 spark 错误? 我有兴趣查看数据框 B 标题列中的任何记录是否作为数据框 A 文本列中的子字符串存在。这就是我做交叉连接的原因。体积非常大。我会尝试使用一个样本,看看它是否有帮助。谢谢 【参考方案1】:

尝试使用broadcast 连接

from pyspark.sql.functions import broadcast
c = broadcast(A).crossJoin(B)

如果您不需要额外的“包含”列,则可以将其过滤为

display(c.filter(col("text").contains(col("Title"))).distinct())

【讨论】:

以上是关于具有数百万条记录的 2 个数据帧之间的 Pyspark 交叉连接的主要内容,如果未能解决你的问题,请参考以下文章

处理具有数百万条记录更新和大量读数的 MySQL 表的最佳方法

即使使用 parallel(8) 提示,具有数百万条记录的表中的 Count(1) 也很慢

识别父数据帧中不存在于java子集数据帧中的记录

将数百批 500k - 300 万条记录插入 PostgreSQL 数据库的最快方法

PairWise 匹配数百万条记录

在没有并行提示和批量收集的情况下删除数百万条记录