如何使用 Pyspark 中的 Graphframes 和 Spark Dataframe 中的原始数据获取连接的组件?

Posted

技术标签:

【中文标题】如何使用 Pyspark 中的 Graphframes 和 Spark Dataframe 中的原始数据获取连接的组件?【英文标题】:How to Get Connected Component with Graphframes in Pyspark and Raw Data in Spark Dataframe? 【发布时间】:2020-12-28 16:31:10 【问题描述】:

我有一个如下所示的 spark 数据框:

+--+-----+---------+
|id|phone|  address|
+--+-----+---------+
| 0|  123| james st|
| 1|  177|avenue st|
| 2|  123|spring st|
| 3|  999|avenue st|
| 4|  678|  5th ave|
+--+-----+---------+

我正在尝试使用graphframes 包来识别 ids 使用 phoneaddress 从上面的 spark 数据框中的连接组件。所以这个数据框可以看作是图的vertices数据框。

我想知道创建图表的 edges 数据框以馈送到graphframes 中的connectedComponents() 函数的最佳方法是什么?

理想情况下,edges 数据框应如下所示:

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| 0 |  2|  same_phone|
| 1 |  3|same_address|
+---+---+------------+

最后,connectedComponents() 结果应该如下所示。 id 0 & 1 基于 same_phone 关系在同一个组件中,1 & 3 基于 1 & 3 strong>same_address 关系。然后,这会将 4 作为另一个组件,与其他 ids 没有任何联系。

+---+-------------------+
|id |connected_component|
+---+-------------------+
|0  |1                  |
|1  |2                  |
|2  |1                  |
|3  |2                  |
|4  |3                  |
+---+-------------------+

提前致谢!

【问题讨论】:

如果还有一行 id=5、phone=123 和 address=avenue st,那么想要的connected_component 会是什么? 这里有一个很好的例子来解释如何找到图中的连通分量:towardsdatascience.com/… 嗨@jxc,在这种情况下,id (0, 1, 2, 3, 5) 将在同一个组件中。它将 id 4 留在一个单独的组件中。 @user238607。是的,这是一个很好的参考。我调查了它和其他一些。他们都通过手动创建边缘数据框来创建边缘列表。但是,这在我的情况下不起作用,因为我的顶点数据有几百万条记录。 【参考方案1】:
from functools import reduce

edges = reduce(
    lambda x, y: x.union(y),
    [df.alias('t1')
       .join(df.alias('t2'), c)
       .filter('t1.id < t2.id')
       .selectExpr('t1.id src', 't2.id dst', "'same_%s' relationship"% c) for c in df.columns[1:]
    ]
)

edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  0|  2|  same_phone|
|  1|  3|same_address|
+---+---+------------+
import pyspark.sql.functions as F
from pyspark.sql.window import Window

connect = edges.select(
    F.array_sort(F.array('src', 'dst')).alias('arr')
).distinct().union(
    df.join(edges, (df.id == edges.src) | (df.id == edges.dst), 'anti').select(F.array('id'))
).withColumn(
    'connected_component', 
    F.row_number().over(Window.orderBy('arr'))
).select(F.explode('arr').alias('id'), 'connected_component')

connect.show()
+---+-------------------+
| id|connected_component|
+---+-------------------+
|  0|                  1|
|  2|                  1|
|  1|                  2|
|  3|                  2|
|  4|                  3|
+---+-------------------+

【讨论】:

这个需要 graphframe.connectedComponents 作为 OP 提到的,从他的另一篇文章中,现在应该很容易解决这个任务。 感谢两位回答我的问题。这个和我们在另一篇文章中讨论的自我加入解决方案都适用于我的任务!

以上是关于如何使用 Pyspark 中的 Graphframes 和 Spark Dataframe 中的原始数据获取连接的组件?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用pyspark流计算csv文件中的条目数

如何在 pyspark 中的多个键上使用 GroupByKey?

如何阅读pyspark中的特定列?

如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?

如何删除 Spark 表列中的空格(Pyspark)

如何使用 Pyspark 在 Databricks 中合并 Hive 表中的记录?