两个数据帧的数组列的平均值并在pyspark中找到最大索引
Posted
技术标签:
【中文标题】两个数据帧的数组列的平均值并在pyspark中找到最大索引【英文标题】:Average of array column of two dataframes and find the maximum index in pyspark 【发布时间】:2021-11-19 09:51:27 【问题描述】:我想在执行一些操作后合并两个数据帧的列值以在 pyspark 中创建一个新的数据帧。每个数据帧的列都是具有整数值的向量。完成的操作是取数据帧向量中每个值的平均值,并找到创建的新向量的最大元素的索引。
数据框1:
|id| |value1 |
|:.| |:......|
| 0| |[0,1,2]|
| 1| |[3,4,5]|
数据框2:
|id| |value2 |
|:.| |:......|
| 0| |[1,2,3]|
| 1| |[4,5,6]|
数据框3:
|value3 |
|:............|
|[0.5,1.5,2.5]|
|[3.5,4.5,5.5]|
数据框4:
|value4|
|:.....|
|2 |
|2 |
Dataframe3 是通过对 dataframe 1 和 2 的每个向量的每个元素取平均值得到的,即:dataframe3 的第一个向量 [0.5,1.5,2.5] 由 [0+1/2,1+2/2 得到, 2+3/2]。 Dataframe4是通过取每个向量的最大值的索引得到的。即;取 dataframe3[0.5,1.5,2.5] 的第一个向量,最大值为 2.5,它出现在索引 2 处,因此 Dataframe4 中的第一个元素为 2。我们如何在 pyspark 中实现这一点。
V1:
+--------------------------------------+---+
|p1 |id |
+--------------------------------------+---+
|[0.01426862, 0.010903089, 0.9748283] |0 |
|[0.068229124, 0.89613986, 0.035630997]|1 |
+--------------------------------------+---+
V2:
+-------------------------+---+
|p2 |id |
+-------------------------+---+
|[0.0, 0.0, 1.0] |0 |
|[2.8160464E-27, 1.0, 0.0]|1 |
+-------------------------+---+
当使用 df3 = v1.join(v2,on="id") 时
df3= 这就是我得到的
+-------------------------------------+---------------+
|p1 |p2 |
+-------------------------------------+---------------+
|[0.02203844, 0.010056663, 0.9679049] |[0.0, 0.0, 1.0]|
|[0.039553806, 0.015186918, 0.9452593]|[0.0, 0.0, 1.0]|
+-------------------------------------+---------------+
什么时候
df3 = df3.withColumn( "p3", F.expr("transform(arrays_zip(p1, p2), x -> (x.p1 + x.p2) / 2)"),)
df4 = df3.withColumn("p4",F.expr("array_position(p3, array_max(p3))"))
p3 是平均值吗?我将 df4 的所有值都设为零
【问题讨论】:
你需要数据帧 3 的中间状态吗?还是只是数据框 4 中的最终状态? 我没有加入数据框。我只有 dataframe 1 和 dataframe2 。我想知道如何获取数据框 3 和 4 @Steven。是的,我想知道如何获取数据框 3 和 4 【参考方案1】:首先,我重新创建您的测试数据:
a = [
[0, [0,1,2]],
[1, [3,4,5]],
]
b = ["id", "value1"]
df1 = spark.createDataFrame(a,b)
c = [
[0, [1,2,3]],
[1, [4,5,6]],
]
d = ["id", "value2"]
df2 = spark.createDataFrame(c,d)
然后,我处理数据:
-
加入
df3 = df1.join(df2, on="id")
df3.show()
+---+---------+---------+
| id| value1| value2|
+---+---------+---------+
| 0|[0, 1, 2]|[1, 2, 3]|
| 1|[3, 4, 5]|[4, 5, 6]|
+---+---------+---------+
-
创建平均数组
from pyspark.sql import functions as F, types as T
@F.udf(T.ArrayType(T.FloatType()))
def avg_array(array1, array2):
return list(map(lambda x: (x[0] + x[1]) / 2, zip(array1, array2)))
df3 = df3.withColumn("value3", avg_array(F.col("value1"), F.col("value2")))
# OR without UDF
df3 = df3.withColumn(
"value3",
F.expr("transform(arrays_zip(value1, value2), x -> (x.value1 + x.value2) / 2)"),
)
df3.show()
+---+---------+---------+---------------+
| id| value1| value2| value3|
+---+---------+---------+---------------+
| 0|[0, 1, 2]|[1, 2, 3]|[0.5, 1.5, 2.5]|
| 1|[3, 4, 5]|[4, 5, 6]|[3.5, 4.5, 5.5]|
+---+---------+---------+---------------+
-
获取索引(
array_position
从 1 开始,如果需要,你可以做一个-1
)
df4 = df3.withColumn("value4",F.expr("array_position(value3, array_max(value3))"))
df4.show()
+---+---------+---------+---------------+------+
| id| value1| value2| value3|value4|
+---+---------+---------+---------------+------+
| 0|[0, 1, 2]|[1, 2, 3]|[0.5, 1.5, 2.5]| 3|
| 1|[3, 4, 5]|[4, 5, 6]|[3.5, 4.5, 5.5]| 3|
+---+---------+---------+---------------+------+
【讨论】:
df3
也可以在没有UDF
的情况下计算,例如:df3 = df3.withColumn("value3", F.expr("transform(arrays_zip(value1, value2), x -> (x.value1 + x.value2) / 2)"))
@Anu 你看到需要加入
@Steven 是的。实际上,我的数组值包含浮点值,其中包含科学记数法指数。对于那个数组数据框,当我执行联合时,我没有得到正确的结果。能否请你帮忙。我已经编辑了我的问题。以上是关于两个数据帧的数组列的平均值并在pyspark中找到最大索引的主要内容,如果未能解决你的问题,请参考以下文章
解析 Pyspark 数据帧的 json 列,其中一个键值为 None
在 pyspark 中,如何创建一个数组列,它是两个或多个数组列的总和?