在两个 Spark RDD(在 PySpark 中)上进行半连接的正确方法是啥?

Posted

技术标签:

【中文标题】在两个 Spark RDD(在 PySpark 中)上进行半连接的正确方法是啥?【英文标题】:What is the right way to do a semi-join on two Spark RDDs (in PySpark)?在两个 Spark RDD(在 PySpark 中)上进行半连接的正确方法是什么? 【发布时间】:2015-06-30 07:21:37 【问题描述】:

在我的 PySpark 应用程序中,我有两个 RDD:

items - 这包含所有有效项目的项目 ID 和项目名称。大约 100000 项。

attributeTable - 包含字段用户 ID、项目 ID 和此组合的属性值,按此顺序排列。这些是系统中每个用户-项目组合的特定属性。此 RDD 有数百行,其中 1000 行。

我想丢弃属性表 RDD 中与项目 RDD 中的有效项目 ID(或名称)不对应的所有行。换句话说,通过项目 ID 进行半连接。例如,如果这些是 R 数据帧,我会做 semi_join(attributeTable, items, by="itemID")

我首先尝试了以下方法,但发现这需要很长时间才能返回(在我的 PC 上的 VM 上运行的本地 Spark 安装上)。可以理解,因为涉及的比较数量如此之多:

# Create a broadcast variable of all valid item IDs for doing filter in the drivers
validItemIDs = sc.broadcast(items.map(lambda (itemID, itemName): itemID)).collect())
attributeTable = attributeTable.filter(lambda (userID, itemID, attributes): itemID in set(validItemIDs.value))

经过一番折腾,我发现以下方法运行得非常快(在我的系统上运行一分钟左右)。

# Create a broadcast variable for item ID to item name mapping (dictionary) 
itemIdToNameMap = sc.broadcast(items.collectAsMap())

# From the attribute table, remove records that don't correspond to a valid item name.
# First go over all records in the table and add a dummy field indicating whether the item name is valid
# Then, filter out all rows with invalid names. Finally, remove the dummy field we added.
attributeTable = (attributeTable
                  .map(lambda (userID, itemID, attributes): (userID, itemID, attributes, itemIdToNameMap.value.get(itemID, 'Invalid')))
                  .filter(lambda (userID, itemID, attributes, itemName): itemName != 'Invalid')
                  .map(lambda (userID, itemID, attributes, itemName): (userID, itemID, attributes)))

虽然这对我的应用程序来说效果很好,但它感觉更像是一种肮脏的解决方法,我很确定在 Spark 中必须有另一种更清洁或惯用正确(并且可能更有效)的方式来做到这一点。你有什么建议?我是 Python 和 Spark 的新手,因此如果您能指出正确的资源,任何 RTFM 建议也会有所帮助。

我的 Spark 版本是 1.3.1。

【问题讨论】:

如果你是Spark 1.3及以上版本,可以考虑使用dataframe和Spark SQL。 我还没有探索过 Spark 中的数据帧。将通过文档。谢谢你的建议。 由于 Spark-sql 已经支持 Joins ,所以你的工作会更简单直接 【参考方案1】:

只需进行常规连接,然后丢弃“查找”关系(在您的情况下为 items rdd)。

如果这些是您的 RDD (取自另一个答案的示例)

items = sc.parallelize([(123, "Item A"), (456, "Item B")])
attributeTable = sc.parallelize([(123456, 123, "Attribute for A")])

那么你会这样做:

attributeTable.keyBy(lambda x: x[1])
  .join(items)
  .map(lambda (key, (attribute, item)): attribute)

因此,您只有来自 attributeTable RDD 的元组,它们在 items RDD 中有相应的条目:

[(123456, 123, 'Attribute for A')]

按照另一个答案中的建议通过leftOuterJoin 进行操作也可以完成这项工作,但效率较低。此外,另一个答案半连接 itemsattributeTable 而不是 attributeTableitems

【讨论】:

【参考方案2】:

正如其他人所指出的,这可能通过利用 DataFrame 最容易实现。但是,您可以通过使用 leftOuterJoinfilter 函数来实现您的预​​期目标。像下面这样有点骇人听闻的东西可能就足够了:

items = sc.parallelize([(123, "Item A"), (456, "Item B")])
attributeTable = sc.parallelize([(123456, 123, "Attribute for A")])
sorted(items.leftOuterJoin(attributeTable.keyBy(lambda x: x[1]))
       .filter(lambda x: x[1][1] is not None)
       .map(lambda x: (x[0], x[1][0])).collect())

返回

[(123, 'Item A')]

【讨论】:

从来没有听说过什么叫做左内连接,但我明白你的意思。但是,我认为 leftOuterJoin API 假定两个 RDD 中的第一个字段都将用于连接,这在我的情况下并不成立。您可以按照加入 API 期望的方式重新排列 RDD,但这与我在问题中提到的方法一样肮脏和低效。此外,Spark 连接操作返回第二个 RDD 中的元素与第一个 RDD 中的元素进行元组,我需要将其过滤掉。我只需要第一个 RDD 中的字段。像这样:en.wikipedia.org/wiki/Relational_algebra#Semijoin 明白了。我稍微更新了代码。可以肯定的是,它与您的代码相似,但我试图避免您正在使用的 collectbroadcast 模式,因为这会产生瓶颈并且如果 attributeTable 很大,则不会扩展。 (也许这不是问题,正如你所说,它永远只有 1000 行。无论如何,它似乎仍然更简单一点。) 这看起来很合理。我没有注意到 keyBy API。我还没有运行你的代码,我无法评论效率。 您介意在您的答案中添加一行(就像 ABC 指出的那样)数据框可能更适合这种事情,所以我可以将您的答案标记为已接受?这将有助于将来登陆此页面的人寻找解决方案。 当然。我刚刚修改了它。如果我能解决它,那么也许我也可以使用 DataFrame 解决方案:-)

以上是关于在两个 Spark RDD(在 PySpark 中)上进行半连接的正确方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 连接两个 RDD 导致一个空 RDD

在 python (pyspark) 中使用 combinebykey spark rdd 计算组上的聚合

如何在 PySpark 中获得不同的字典 RDD?

我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

将 RDD 的值作为变量传递给另一个 RDD - Spark #Pyspark [重复]