使用 PySpark 数据框的成对列操作(例如点积)

Posted

技术标签:

【中文标题】使用 PySpark 数据框的成对列操作(例如点积)【英文标题】:Pairwise column operations (such as dot product) with a PySpark Dataframe 【发布时间】:2019-12-17 13:37:45 【问题描述】:

抱歉,但我是 Spark 的新手,这似乎不是一个简单的操作。

如果我有这样的 PySpark 数据框:

    +-----+-----+-----+-----+-----+
    |  id |  A  |  B  | ... |  N  |
    +-----+-----+-----+-----+-----+
    |  0  | 0.1 | 0.5 | ... | 0.9 |
    |  1  | 0.2 | 0.2 | ... | 0.1 |
    |  2  | 0.4 | 0.4 | ... | 0.3 |
    |  3  | 0.7 | 0.2 | ... | 0.2 |
    +-----+-----+-----+-----+-----+

并且我想在 Dataframe 的列的所有列组合上运行像点积一样的成对(列)操作,我该怎么做?

我想要的是一个像这样的数据框:

+-------+--------+-------+
|  n_1  |  n_2   |  dot  |
+-------+--------+-------+
|   A   |   A    |  1.3  | 
|   A   |   B    |  1.9  |
|   A   |   C    |  3.6  |
|   A   |   D    |  0.7  |
...
|   B   |   A    |  4.6  |
...
+-------+--------+-------+

它包含 N x N 列产品及其对应的点产品的每个组合。

仅作记录,我有大约 1800 列 (N),以及多达几百万个 id。

谢谢!

更新:我在上面犯了一个错误。现在我已经澄清我想要的是列产品,而不是行产品。

【问题讨论】:

【参考方案1】:

编辑

如您在更新中描述的那样计算所有列之间的组合, 您可以先将每一列与所有其他列相乘,然后再与总和相加。

结果列被命名为X_Y = sum(x*y)。在此之后,您只需要转置结果 DataFrame。以下示例基于与第一个答案相同的数据:

# get all possible combinations and calculate dot product
products = list()
for c in df.columns:
    if c != 'ID':
        for c2 in df.columns:
            if c2 != 'ID':
                products.append(sum(col(c) * col(c2)).alias(f"c_c2"))

dot_sums = df.select(*products)

# transpose columns to rows
col_values = explode(
    array(*[struct(lit(c).alias("col_name"), col(c).alias("val")) for c in dot_sums.columns])
).alias("cols_values")

# split the column name to get back the original columns
dot_sums.select(col_values) \
    .select(*[split(col("cols_values.col_name"), "_").getItem(0).alias("n_1"),
              split(col("cols_values.col_name"), "_").getItem(1).alias("n_2"),
              col("cols_values.val").alias("dot")]) \
    .show()

+---+---+------------------+
|n_1|n_2|               dot|
+---+---+------------------+
|  A|  A|               0.7|
|  A|  B|              0.39|
|  A|  C|              0.37|
|  B|  A|              0.39|
|  B|  B|0.4900000000000001|
|  B|  C|0.6300000000000001|
|  C|  A|              0.37|
|  C|  B|0.6300000000000001|
|  C|  C|0.9500000000000001|
+---+---+------------------+

原答案

一种可能的方法是使用crossJoin 获取id_1 <-> id_2 列的所有组合。 您可以将所有其他列 A to N 放入一个数组中,以便稍后计算点积。

除了以下解决方案,您可能还想看看mlib dot 函数。

这是一个例子:

data = [(0, 0.1, 0.5, 0.9), (1, 0.2, 0.2, 0.1),
        (2, 0.4, 0.4, 0.3), (3, 0.7, 0.2, 0.2)
        ]

df = spark.createDataFrame(data, ["ID", "A", "B", "C"])
df.show()

# get all cols except the ID col
op_cols = [c for c in df.columns if c != 'ID']

# transform those cols to array
df1 = df.select(col("ID").alias("ID_1"), array(*op_cols).alias("other_cols_array1"))
df2 = df.select(col("ID").alias("ID_2"), array(*op_cols).alias("other_cols_array2"))

# crossJoin
matrix = df1.crossJoin(df2)

现在,您有一个 DataFrame matrix,其中包含 ID_1other_cols_array1ID_2other_cols_array2 列。

因此,您可以像这样使用 UDF 计算每个组合 ID_1 - ID_2 的点积:

dot_product = udf(lambda v1, v2: sum([x*y for x, y in zip(v1, v2)]), DoubleType())

matrix.withColumn("dot", dot_product(col("other_cols_array1"), col("other_cols_array2")))\
      .select("ID_1", "ID_2", "dot")\
      .show()

【讨论】:

抱歉,我犯了一个错误,我现在澄清一下:我想要垂直(列)点产品,而不是行产品。原始数据帧第一列中的 ID 将丢失,因为点积将对它们求和。尚不确定我是否必须考虑到这一点才能使您的解决方案发挥作用。 @KristianD'Amato 我不确定我是否理解您的更新。您能否添加一个包含几列和所需输出的小示例? 我刚刚这样做了。请参阅 OP 中的第二个数据框。 ID 是 AB 等,原来我的 0...3 是错误的。【参考方案2】:

如果您只考虑数据(即没有行和列标题),可以使用numpy 分三行完成:

import numpy as np

a = np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]])
dot = np.dot(a,np.transpose(a))
idx = np.triu_indices(len(a))
out = np.array([idx[0],idx[1],dot[idx]]).T

print(out)

给予:

[[  0   0  14]
 [  0   1  32]
 [  0   2  50]
 [  0   3  68]
 [  1   1  77]
 [  1   2 122]
 [  1   3 167]
 [  2   2 194]
 [  2   3 266]
 [  3   3 365]]

当然,将 PySpark DF 转换为 Numpy 数组或从 Numpy 数组转换是微不足道的......

【讨论】:

如果它只在内存中工作,这将是一个非常聪明的解决方案 - 如果内存不是问题,我会更喜欢 numpy/pandas 而不是 PySpark。【参考方案3】:

在 pyspark 中,它没有 scala 中那么优雅。然而,即使以通用方式也是完全可以实现的(无需假设我们知道列数及其名称,它就可以工作)。

我们可以做的是数据帧与自身 (crossJoin) 的笛卡尔积,然后使用 map/reduce 方案计算点积。

我会这样做:

# creating sample data
data = [(0, .1, .5, .9), (1, .2, .2, .1), (2, .4, .4, .3), (3, .7, .2, .2)]
df = spark.createDataFrame(data, ['id', 'A', 'B', 'C'])

# all the columns but 'id'
cols = [c for c in df.columns if c != 'id']

# the same df with column names suffixed with '_2'
df2 = df.select(*[df[c].alias(c + '_2') for c in df.columns])

# the dot product
products = [F.col(c) * F.col(c+'_2') for c in cols]
dot_product = reduce(lambda a, b: a+b, products).alias('dot')

# and the cross join
df.crossJoin(df2).select(F.col('id'), F.col('id_2'), dot_product).show()
+---+----+-------------------+                                                  
| id|id_2|                dot|
+---+----+-------------------+
|  0|   0|               1.07|
|  0|   1|0.21000000000000002|
|  0|   2|               0.51|
|  0|   3|               0.35|
|  1|   0|0.21000000000000002|
|  1|   1|0.09000000000000002|
|  1|   2|0.19000000000000003|
|  1|   3|                0.2|
|  2|   0|               0.51|
|  2|   1|0.19000000000000003|
....

【讨论】:

抱歉,我在上面犯了一个错误,我现在澄清一下:我想要垂直(列)点产品,而不是行产品。原始数据帧第一列中的 ID 将丢失,因为点积将对它们求和。尚不确定我是否必须考虑到这一点才能使您的解决方案发挥作用。 您能否提供示例数据(足以了解您想要做什么,而不是更多)和预期输出,以便我们验证我们的解决方案? 我更正了原始帖子中的第二个数据框以反映澄清。谢谢!【参考方案4】:

所以基本上你想计算每一列与其他列及其自身的点积。

一种解决方案是首先为每行 i 计算一个列,用于所有 kjcol[j]*col[k] 的每个组合。如果您有数千列,那么我不建议这样做,因为 Spark SQL 往往会处理那么多列。

我会首先分解数据框并为每个 id 和列名创建一行。然后我会根据 id 将数据框与自身连接起来。这将产生一个数据框,每个 id 一行,每个列名的组合。

最后,我将两列的值相乘,按两列名称分组,将值相加得到点积。

代码如下所示:

from pyspark.sql import functions as F

data = [(0, .1, .5, .9), (1, .2, .2, .1), (2, .4, .4, .3), (3, .7, .2, .2)]
df = spark.createDataFrame(data, ['id', 'A', 'B', 'C'])
cols = [c for c in df.columns if c != 'id']

flat_df = df.select(F.col('id'),
          F.explode(F.array(*[F.struct(F.lit(c).alias('name'),
                                       F.col(c).alias('value')) for c in cols])))

第一部分完成了,数据已经扁平化成这样了:

>>> flat_df.show()
+---+-------+
| id|    col|
+---+-------+
|  0|[A,0.1]|
|  0|[B,0.5]|
|  0|[C,0.9]|
|  1|[A,0.2]|
|  1|[B,0.2]|
|  1|[C,0.1]|
|  2|[A,0.4]|
|  2|[B,0.4]|
|  2|[C,0.3]|
|  3|[A,0.7]|
|  3|[B,0.2]|
|  3|[C,0.2]|
+---+-------+

然后是第二部分:连接、乘法、分组和求和:

flat_df_2 = flat_df.select('id',
                           F.col('col.name').alias('name2'),
                           F.col('col.value').alias('value2'))
result = flat_df\
    .join(flat_df_2, ['id'])\
    .withColumn('m', F.col('col.value') * F.col('value2'))\
    .groupBy(F.col('col.name').alias('n_1'), F.col('name2').alias('n_2'))\
    .agg(F.sum('m').alias('dot'))

产生:

>>> result.show()
+---+---+------------------+
|n_1|n_2|               dot|
+---+---+------------------+
|  B|  C|              0.63|
|  A|  A|               0.7|
|  A|  C|              0.37|
|  C|  B|              0.63|
|  C|  C|0.9500000000000001|
|  C|  A|              0.37|
|  B|  B|0.4900000000000001|
|  B|  A|              0.39|
|  A|  B|              0.39|
+---+---+------------------+

【讨论】:

以上是关于使用 PySpark 数据框的成对列操作(例如点积)的主要内容,如果未能解决你的问题,请参考以下文章

几种分布的成对图形比较

数据框列中的字符串列表行之间的成对距离

处理 NaN 的成对距离

最后获取带有附加项的成对迭代器

pyspark数据框的区分大小写的列删除操作?

非常大的数据集中的成对距离