两个数据帧的数组列的平均值并在pyspark中找到最大索引

Posted

技术标签:

【中文标题】两个数据帧的数组列的平均值并在pyspark中找到最大索引【英文标题】:Average of array column of two dataframes and find the maximum index in pyspark 【发布时间】:2021-11-19 09:51:27 【问题描述】:

我想在执行一些操作后合并两个数据帧的列值以在 pyspark 中创建一个新的数据帧。每个数据帧的列都是具有整数值的向量。完成的操作是取数据帧向量中每个值的平均值,并找到创建的新向量的最大元素的索引。

数据框1:

       |id| |value1 |
       |:.| |:......|
       | 0| |[0,1,2]|
       | 1| |[3,4,5]|

数据框2:

        |id| |value2 |
        |:.| |:......|
        | 0| |[1,2,3]|
        | 1| |[4,5,6]| 
         
         
         

数据框3:

         |value3       |
         |:............|
         |[0.5,1.5,2.5]|
         |[3.5,4.5,5.5]|

数据框4:

         |value4|
         |:.....|
         |2     |
         |2     |

Dataframe3 是通过对 dataframe 1 和 2 的每个向量的每个元素取平均值得到的,即:dataframe3 的第一个向量 [0.5,1.5,2.5] 由 [0+1/2,1+2/2 得到, 2+3/2]。 Dataframe4是通过取每个向量的最大值的索引得到的。即;取 dataframe3[0.5,1.5,2.5] 的第一个向量,最大值为 2.5,它出现在索引 2 处,因此 Dataframe4 中的第一个元素为 2。我们如何在 pyspark 中实现这一点。

V1:

           +--------------------------------------+---+
           |p1                                    |id |
           +--------------------------------------+---+
           |[0.01426862, 0.010903089, 0.9748283]  |0  |
           |[0.068229124, 0.89613986, 0.035630997]|1  |
           +--------------------------------------+---+

V2:

           +-------------------------+---+
           |p2                       |id |
           +-------------------------+---+
           |[0.0, 0.0, 1.0]          |0  |
           |[2.8160464E-27, 1.0, 0.0]|1  |
           +-------------------------+---+

当使用 df3 = v1.join(v2,on="id") 时

df3= 这就是我得到的

           +-------------------------------------+---------------+
           |p1                                   |p2             |
           +-------------------------------------+---------------+
           |[0.02203844, 0.010056663, 0.9679049] |[0.0, 0.0, 1.0]|
           |[0.039553806, 0.015186918, 0.9452593]|[0.0, 0.0, 1.0]|
           +-------------------------------------+---------------+

什么时候

     df3 = df3.withColumn( "p3", F.expr("transform(arrays_zip(p1, p2), x -> (x.p1 + x.p2) / 2)"),)
     df4 = df3.withColumn("p4",F.expr("array_position(p3, array_max(p3))"))

p3 是平均值吗?我将 df4 的所有值都设为零

【问题讨论】:

你需要数据帧 3 的中间状态吗?还是只是数据框 4 中的最终状态? 我没有加入数据框。我只有 dataframe 1 和 dataframe2 。我想知道如何获取数据框 3 和 4 @Steven。是的,我想知道如何获取数据框 3 和 4 【参考方案1】:

首先,我重新创建您的测试数据:

a = [
    [0, [0,1,2]],
    [1, [3,4,5]],
]
b = ["id", "value1"]
df1 = spark.createDataFrame(a,b)

c = [
    [0, [1,2,3]],
    [1, [4,5,6]],
]
d = ["id", "value2"]
df2 = spark.createDataFrame(c,d)

然后,我处理数据:

    加入
df3 = df1.join(df2, on="id")

df3.show()
+---+---------+---------+                                                       
| id|   value1|   value2|
+---+---------+---------+
|  0|[0, 1, 2]|[1, 2, 3]|
|  1|[3, 4, 5]|[4, 5, 6]|
+---+---------+---------+
    创建平均数组
from pyspark.sql import functions as F, types as T

@F.udf(T.ArrayType(T.FloatType()))
def avg_array(array1, array2):
    return list(map(lambda x: (x[0] + x[1]) / 2, zip(array1, array2)))

df3 = df3.withColumn("value3", avg_array(F.col("value1"), F.col("value2")))

# OR without UDF 

df3 = df3.withColumn(
    "value3",
    F.expr("transform(arrays_zip(value1, value2), x -> (x.value1 + x.value2) / 2)"),
)

df3.show()
+---+---------+---------+---------------+                                       
| id|   value1|   value2|         value3|
+---+---------+---------+---------------+
|  0|[0, 1, 2]|[1, 2, 3]|[0.5, 1.5, 2.5]|
|  1|[3, 4, 5]|[4, 5, 6]|[3.5, 4.5, 5.5]|
+---+---------+---------+---------------+
    获取索引(array_position 从 1 开始,如果需要,你可以做一个-1
df4 = df3.withColumn("value4",F.expr("array_position(value3, array_max(value3))"))

df4.show()
+---+---------+---------+---------------+------+                                
| id|   value1|   value2|         value3|value4|
+---+---------+---------+---------------+------+
|  0|[0, 1, 2]|[1, 2, 3]|[0.5, 1.5, 2.5]|     3|
|  1|[3, 4, 5]|[4, 5, 6]|[3.5, 4.5, 5.5]|     3|
+---+---------+---------+---------------+------+

【讨论】:

df3 也可以在没有UDF 的情况下计算,例如:df3 = df3.withColumn("value3", F.expr("transform(arrays_zip(value1, value2), x -> (x.value1 + x.value2) / 2)")) @Anu 你看到需要加入 @Steven 是的。实际上,我的数组值包含浮点值,其中包含科学记数法指数。对于那个数组数据框,当我执行联合时,我没有得到正确的结果。能否请你帮忙。我已经编辑了我的问题。

以上是关于两个数据帧的数组列的平均值并在pyspark中找到最大索引的主要内容,如果未能解决你的问题,请参考以下文章

如何根据pyspark中的索引查找数组列的平均值

如何在 PySpark 中找到数组数组的平均值

解析 Pyspark 数据帧的 json 列,其中一个键值为 None

在 pyspark 中,如何创建一个数组列,它是两个或多个数组列的总和?

将一个数据帧的数组列与scala中另一个数据帧的数组列的子集进行比较

如何将多个功能应用于dask数据帧的多个块?