具有数百万条记录的 2 个数据帧之间的 Pyspark 交叉连接
Posted
技术标签:
【中文标题】具有数百万条记录的 2 个数据帧之间的 Pyspark 交叉连接【英文标题】:Pyspark crossjoin between 2 dataframes with millions of records 【发布时间】:2020-05-29 18:44:18 【问题描述】:我有 2 个数据框 A(3500 万条记录)和 B(30000 条记录)
一个
|Text |
-------
| pqr |
-------
| xyz |
-------
B
|Title |
-------
| a |
-------
| b |
-------
| c |
-------
下面的dataframe C是A和B交叉连接后得到的。
c = A.crossJoin(B, on = [A.text == B.Title)
C
|text | Title |
---------------
| pqr | a |
---------------
| pqr | b |
---------------
| pqr | c |
---------------
| xyz | a |
---------------
| xyz | b |
---------------
| xyz | c |
---------------
上面的两列都是字符串类型。
我正在执行以下操作并导致 Spark 错误(作业因阶段故障而中止)
display(c.withColumn("Contains", when(col('text').contains(col('Title')), 1).otherwise(0)).filter(col('Contains') == 0).distinct())
关于如何完成此连接以避免结果操作上的 Spark error() 的任何建议?
Spark error message
【问题讨论】:
你真的在这里做交叉连接吗?就像根本没有加入标准一样? 查看您编辑的问题,如果您指定连接列,则不需要交叉连接。我建议您使用 少量的数据对此进行测试。如果 Spark 对这些数据集进行完全交叉连接,如果我的数学正确的话,你最终会得到超过 1 万亿行。 能否在问题中也粘贴 spark 错误? 我有兴趣查看数据框 B 标题列中的任何记录是否作为数据框 A 文本列中的子字符串存在。这就是我做交叉连接的原因。体积非常大。我会尝试使用一个样本,看看它是否有帮助。谢谢 【参考方案1】:尝试使用broadcast
连接
from pyspark.sql.functions import broadcast
c = broadcast(A).crossJoin(B)
如果您不需要额外的“包含”列,则可以将其过滤为
display(c.filter(col("text").contains(col("Title"))).distinct())
【讨论】:
以上是关于具有数百万条记录的 2 个数据帧之间的 Pyspark 交叉连接的主要内容,如果未能解决你的问题,请参考以下文章
处理具有数百万条记录更新和大量读数的 MySQL 表的最佳方法
即使使用 parallel(8) 提示,具有数百万条记录的表中的 Count(1) 也很慢