如何根据 PySpark 数据框的另一列中的值修改列? F.当边缘情况

Posted

技术标签:

【中文标题】如何根据 PySpark 数据框的另一列中的值修改列? F.当边缘情况【英文标题】:How to modify a column based on the values in another column of a PySpark dataframe? F.when edge case 【发布时间】:2019-04-24 18:28:54 【问题描述】:

我想浏览 pyspark 数据框中的每一行,并根据另一列的内容更改一列的值。我将其更改为的值也是基于要更改的列的当前值。

具体来说,我有一列包含 DenseVectors,另一列包含我需要的向量的索引。

或者,我也可以将 DenseVector 替换为 DenseVector 中两个值中较大的一个。

我主要尝试将 F.when() 与 withColumn 结合使用,但我在使用 F.when() 的第二个元素时遇到了麻烦,因为我想存储向量的正确索引,但不能直接索引列。

   a                        b  
1  DenseVector([0.1, 0.9])  1.0
2  DenseVector([0.6, 0.4])  0.0
.
.
.
df = df.withColumn('a', F.when(df.b == 0.0, df.a[0])
                  .otherwise(df.a[1])

【问题讨论】:

你能解释一下这是什么意思吗:I want to store the correct index of the vector, but cannot directly index on a column. @JayramKumar 我想将向量的第 0 个或第一个索引存储在“a”中,具体取决于“b”列中该行是 1.0 还是 0.0。但是,在上述代码的上下文中,我无法使用 df.a[1] 之类的东西对给定行的“a”中的向量进行索引。根据F.when 通常的工作方式,您希望能够对向量进行索引,但在这种情况下您不能。 【参考方案1】:

按照this question 的答案中找到的信息,我能够得出一个解决方案。

Spark 中似乎存在一个错误,它不允许您对 Spark 数据帧中包含的 DenseVectors 进行索引。这可以通过创建一个用户定义的函数来解决,该函数可以像 numpy 数组一样访问元素。

from pyspark.sql import functions as F
from pyspark.sql import types as T

firstelement=F.udf(lambda v:float(v[0]),T.FloatType())
secondelement=F.udf(lambda v:float(v[1]),T.FloatType())


df = df.withColumn('a', F.when(df['b'] == 0.0, 
     firstelement('a')).otherwise(secondelement('a'))

【讨论】:

以上是关于如何根据 PySpark 数据框的另一列中的值修改列? F.当边缘情况的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:如何根据另一列中的匹配值从数组中的第一次出现中选择直到最后的值

基于另一列中的值的一列上的pyspark滞后函数

如何计算包含一组列中的值和 Pandas 数据框中另一列中的另一个值的行数?

Pyspark数据帧:根据另一列的值提取列

Pyspark 通过使用另一列中的值替换 Spark 数据框列中的字符串

如果元素存在于数据框的另一列中,则删除列表元素