使用 PySpark 数据框的成对列操作(例如点积)
Posted
技术标签:
【中文标题】使用 PySpark 数据框的成对列操作(例如点积)【英文标题】:Pairwise column operations (such as dot product) with a PySpark Dataframe 【发布时间】:2019-12-17 13:37:45 【问题描述】:抱歉,但我是 Spark 的新手,这似乎不是一个简单的操作。
如果我有这样的 PySpark 数据框:
+-----+-----+-----+-----+-----+
| id | A | B | ... | N |
+-----+-----+-----+-----+-----+
| 0 | 0.1 | 0.5 | ... | 0.9 |
| 1 | 0.2 | 0.2 | ... | 0.1 |
| 2 | 0.4 | 0.4 | ... | 0.3 |
| 3 | 0.7 | 0.2 | ... | 0.2 |
+-----+-----+-----+-----+-----+
并且我想在 Dataframe 的列的所有列组合上运行像点积一样的成对(列)操作,我该怎么做?
我想要的是一个像这样的数据框:
+-------+--------+-------+
| n_1 | n_2 | dot |
+-------+--------+-------+
| A | A | 1.3 |
| A | B | 1.9 |
| A | C | 3.6 |
| A | D | 0.7 |
...
| B | A | 4.6 |
...
+-------+--------+-------+
它包含 N x N 列产品及其对应的点产品的每个组合。
仅作记录,我有大约 1800 列 (N),以及多达几百万个 id。
谢谢!
更新:我在上面犯了一个错误。现在我已经澄清我想要的是列产品,而不是行产品。
【问题讨论】:
【参考方案1】:编辑
如您在更新中描述的那样计算所有列之间的组合, 您可以先将每一列与所有其他列相乘,然后再与总和相加。
结果列被命名为X_Y = sum(x*y)
。在此之后,您只需要转置结果 DataFrame。以下示例基于与第一个答案相同的数据:
# get all possible combinations and calculate dot product
products = list()
for c in df.columns:
if c != 'ID':
for c2 in df.columns:
if c2 != 'ID':
products.append(sum(col(c) * col(c2)).alias(f"c_c2"))
dot_sums = df.select(*products)
# transpose columns to rows
col_values = explode(
array(*[struct(lit(c).alias("col_name"), col(c).alias("val")) for c in dot_sums.columns])
).alias("cols_values")
# split the column name to get back the original columns
dot_sums.select(col_values) \
.select(*[split(col("cols_values.col_name"), "_").getItem(0).alias("n_1"),
split(col("cols_values.col_name"), "_").getItem(1).alias("n_2"),
col("cols_values.val").alias("dot")]) \
.show()
+---+---+------------------+
|n_1|n_2| dot|
+---+---+------------------+
| A| A| 0.7|
| A| B| 0.39|
| A| C| 0.37|
| B| A| 0.39|
| B| B|0.4900000000000001|
| B| C|0.6300000000000001|
| C| A| 0.37|
| C| B|0.6300000000000001|
| C| C|0.9500000000000001|
+---+---+------------------+
原答案
一种可能的方法是使用crossJoin
获取id_1 <-> id_2
列的所有组合。
您可以将所有其他列 A to N
放入一个数组中,以便稍后计算点积。
除了以下解决方案,您可能还想看看mlib dot
函数。
这是一个例子:
data = [(0, 0.1, 0.5, 0.9), (1, 0.2, 0.2, 0.1),
(2, 0.4, 0.4, 0.3), (3, 0.7, 0.2, 0.2)
]
df = spark.createDataFrame(data, ["ID", "A", "B", "C"])
df.show()
# get all cols except the ID col
op_cols = [c for c in df.columns if c != 'ID']
# transform those cols to array
df1 = df.select(col("ID").alias("ID_1"), array(*op_cols).alias("other_cols_array1"))
df2 = df.select(col("ID").alias("ID_2"), array(*op_cols).alias("other_cols_array2"))
# crossJoin
matrix = df1.crossJoin(df2)
现在,您有一个 DataFrame matrix
,其中包含 ID_1
、other_cols_array1
、ID_2
、other_cols_array2
列。
因此,您可以像这样使用 UDF 计算每个组合 ID_1 - ID_2
的点积:
dot_product = udf(lambda v1, v2: sum([x*y for x, y in zip(v1, v2)]), DoubleType())
matrix.withColumn("dot", dot_product(col("other_cols_array1"), col("other_cols_array2")))\
.select("ID_1", "ID_2", "dot")\
.show()
【讨论】:
抱歉,我犯了一个错误,我现在澄清一下:我想要垂直(列)点产品,而不是行产品。原始数据帧第一列中的 ID 将丢失,因为点积将对它们求和。尚不确定我是否必须考虑到这一点才能使您的解决方案发挥作用。 @KristianD'Amato 我不确定我是否理解您的更新。您能否添加一个包含几列和所需输出的小示例? 我刚刚这样做了。请参阅 OP 中的第二个数据框。 ID 是A
、B
等,原来我的 0...3
是错误的。【参考方案2】:
如果您只考虑数据(即没有行和列标题),可以使用numpy
分三行完成:
import numpy as np
a = np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]])
dot = np.dot(a,np.transpose(a))
idx = np.triu_indices(len(a))
out = np.array([idx[0],idx[1],dot[idx]]).T
print(out)
给予:
[[ 0 0 14]
[ 0 1 32]
[ 0 2 50]
[ 0 3 68]
[ 1 1 77]
[ 1 2 122]
[ 1 3 167]
[ 2 2 194]
[ 2 3 266]
[ 3 3 365]]
当然,将 PySpark DF 转换为 Numpy 数组或从 Numpy 数组转换是微不足道的......
【讨论】:
如果它只在内存中工作,这将是一个非常聪明的解决方案 - 如果内存不是问题,我会更喜欢 numpy/pandas 而不是 PySpark。【参考方案3】:在 pyspark 中,它没有 scala 中那么优雅。然而,即使以通用方式也是完全可以实现的(无需假设我们知道列数及其名称,它就可以工作)。
我们可以做的是数据帧与自身 (crossJoin
) 的笛卡尔积,然后使用 map/reduce 方案计算点积。
我会这样做:
# creating sample data
data = [(0, .1, .5, .9), (1, .2, .2, .1), (2, .4, .4, .3), (3, .7, .2, .2)]
df = spark.createDataFrame(data, ['id', 'A', 'B', 'C'])
# all the columns but 'id'
cols = [c for c in df.columns if c != 'id']
# the same df with column names suffixed with '_2'
df2 = df.select(*[df[c].alias(c + '_2') for c in df.columns])
# the dot product
products = [F.col(c) * F.col(c+'_2') for c in cols]
dot_product = reduce(lambda a, b: a+b, products).alias('dot')
# and the cross join
df.crossJoin(df2).select(F.col('id'), F.col('id_2'), dot_product).show()
+---+----+-------------------+
| id|id_2| dot|
+---+----+-------------------+
| 0| 0| 1.07|
| 0| 1|0.21000000000000002|
| 0| 2| 0.51|
| 0| 3| 0.35|
| 1| 0|0.21000000000000002|
| 1| 1|0.09000000000000002|
| 1| 2|0.19000000000000003|
| 1| 3| 0.2|
| 2| 0| 0.51|
| 2| 1|0.19000000000000003|
....
【讨论】:
抱歉,我在上面犯了一个错误,我现在澄清一下:我想要垂直(列)点产品,而不是行产品。原始数据帧第一列中的 ID 将丢失,因为点积将对它们求和。尚不确定我是否必须考虑到这一点才能使您的解决方案发挥作用。 您能否提供示例数据(足以了解您想要做什么,而不是更多)和预期输出,以便我们验证我们的解决方案? 我更正了原始帖子中的第二个数据框以反映澄清。谢谢!【参考方案4】:所以基本上你想计算每一列与其他列及其自身的点积。
一种解决方案是首先为每行 i 计算一个列,用于所有 k
和 j
的 col[j]*col[k]
的每个组合。如果您有数千列,那么我不建议这样做,因为 Spark SQL 往往会处理那么多列。
我会首先分解数据框并为每个 id 和列名创建一行。然后我会根据 id 将数据框与自身连接起来。这将产生一个数据框,每个 id 一行,每个列名的组合。
最后,我将两列的值相乘,按两列名称分组,将值相加得到点积。
代码如下所示:
from pyspark.sql import functions as F
data = [(0, .1, .5, .9), (1, .2, .2, .1), (2, .4, .4, .3), (3, .7, .2, .2)]
df = spark.createDataFrame(data, ['id', 'A', 'B', 'C'])
cols = [c for c in df.columns if c != 'id']
flat_df = df.select(F.col('id'),
F.explode(F.array(*[F.struct(F.lit(c).alias('name'),
F.col(c).alias('value')) for c in cols])))
第一部分完成了,数据已经扁平化成这样了:
>>> flat_df.show()
+---+-------+
| id| col|
+---+-------+
| 0|[A,0.1]|
| 0|[B,0.5]|
| 0|[C,0.9]|
| 1|[A,0.2]|
| 1|[B,0.2]|
| 1|[C,0.1]|
| 2|[A,0.4]|
| 2|[B,0.4]|
| 2|[C,0.3]|
| 3|[A,0.7]|
| 3|[B,0.2]|
| 3|[C,0.2]|
+---+-------+
然后是第二部分:连接、乘法、分组和求和:
flat_df_2 = flat_df.select('id',
F.col('col.name').alias('name2'),
F.col('col.value').alias('value2'))
result = flat_df\
.join(flat_df_2, ['id'])\
.withColumn('m', F.col('col.value') * F.col('value2'))\
.groupBy(F.col('col.name').alias('n_1'), F.col('name2').alias('n_2'))\
.agg(F.sum('m').alias('dot'))
产生:
>>> result.show()
+---+---+------------------+
|n_1|n_2| dot|
+---+---+------------------+
| B| C| 0.63|
| A| A| 0.7|
| A| C| 0.37|
| C| B| 0.63|
| C| C|0.9500000000000001|
| C| A| 0.37|
| B| B|0.4900000000000001|
| B| A| 0.39|
| A| B| 0.39|
+---+---+------------------+
【讨论】:
以上是关于使用 PySpark 数据框的成对列操作(例如点积)的主要内容,如果未能解决你的问题,请参考以下文章