Pyspark 自加入创建网络数据

Posted

技术标签:

【中文标题】Pyspark 自加入创建网络数据【英文标题】:Pyspark self join to create network data 【发布时间】:2018-07-18 21:29:31 【问题描述】:

我有一个包含 3000 万个观测值的 spark 表。

DF = sc.parallelize([ 
[('comp1'),('P1'), '2016-01-01'],
[('comp1'),('P4'),'2015-01-01'],
[('comp2'),('P1'),'2017-01-01'],
[('comp2'),('P2'),'2015-01-01'],
[('comp2'),('P4'),'2016-01-01'],
[('comp3'),('P3'),'2014-01-01'],
[('comp1'),('P2'),'2016-01-01'],
[('comp3'),('P2'),'2017-01-01']
]).toDF(["company", "Project",'Date'])
DF.show()

我想创建一个有向网络数据集来计算过去 5 年公司之间每个项目的移动。当我在我的表上进行自连接时,它会创建不在数据集中的边:

DF.alias('l').join(DF.alias('r'), on='Project')\
.where('r.Date > l.Date')\
.select(F.col('l.company').alias('company1'), 
F.col('r.company').alias('company2'), 'l.Project')\
.show()

+--------+--------+-------+
|company1|company2|Project|
+--------+--------+-------+
|   comp1|   comp2|     P1|
|   comp1|   comp3|     P2|
|   comp2|   comp1|     P2|
|   comp2|   comp3|     P2| #This is wrong
|   comp1|   comp2|     P4|
+--------+--------+-------+

我尝试创建一个计数器并在 where 子句中添加另一个条件:

DF =DF.withColumn("row_num", 
F.row_number().over(Window.partitionBy("Project"))).orderBy('Project', 
'Date')

DF.alias('l').join(DF.alias('r'), on='Project')\
.where(('r.Date > l.Date')& ('r.row_num  - l.row_num < 2' ))\
.select(F.col('l.company').alias('company1'), 
F.col('r.company').alias('company2'), 'l.Project')\
.show()

但我得到这个错误:

TypeError: unsupported operand type(s) for &: 'str' and 'str'

如何更改 Where 子句中的条件以更正此问题?

我在集群上工作,无法安装库,我只安装了 networkx,我的 Spark 版本是 1.6

【问题讨论】:

你能发布等效的 Pandas 代码吗? @pissall 请看下面的链接:***.com/questions/22979480/… 您对词典的表现不满意吗?在 Pyspark 中,您只需要获取 2 个数据框,将它们加入项目和日期,在 company1_col、company2_col 上调用 groupby count() @pissall 自联接在这里不起作用,因为它会创建实际上不在数据中的边。 【参考方案1】:

我找到了一种方法来创建我正在寻找的输出:

df_lag = DF.withColumn('comp1',F.lag(DF['company']).\
                   over(Window.partitionBy("Project")))\
.filter(~F.isnull(F.col('comp1'))).select(F.col('comp1'), 
F.col('company').alias('comp2'),F.col('project')).show()


+-----+-----+-------+
|comp1|comp2|project|
+-----+-----+-------+
|comp1|comp2|     P1|
|comp2|comp1|     P2|
|comp1|comp3|     P2|
|comp1|comp2|     P4|
+-----+-----+-------+

【讨论】:

以上是关于Pyspark 自加入创建网络数据的主要内容,如果未能解决你的问题,请参考以下文章

加入两个数据框和结果数据框在 PySpark 中包含不重复的项目?

PySpark 函数基于多列数据框创建自定义输出

docker存储与网络

Pyspark 与 AWS Glue 在多个列上加入创建重复项

加入两个数据框pyspark

优先加入 PySpark 数据帧