使用 PySpark 中的列表中的 withColumn 函数动态创建新列

Posted

技术标签:

【中文标题】使用 PySpark 中的列表中的 withColumn 函数动态创建新列【英文标题】:dynamically create new columns using withColumn function from a list in PySpark 【发布时间】:2017-03-27 17:18:55 【问题描述】:

我试图弄清楚如何通过使用 withColumn() 函数并在 pySpark 的 withColumn() 函数中调用 udf 为列表中的每个项目(在这种情况下为 cp_codeset 列表)动态创建列。以下是我编写的代码,但它给了我一个错误。

from pyspark.sql.functions import udf, col, lit
from pyspark.sql import Row
from pyspark.sql.types import IntegerType


codeset = set(cp_codeset['CODE'])

for col_name in cp_codeset.col_names.unique():
    def flag(d):
        if (d in codeset):
            name = cp_codeset[cp_codeset['CODES']==d].col_names
            if(name==col_name):
                return 1
            else:
                return 0

    cpf_udf = udf(flag, IntegerType())

    p.withColumn(col_name, cpf_udf(p.codes)).show()

另一种选择是手动执行,但在这种情况下,我必须编写相同的 udf 函数并使用 withColumn() 函数调用它 75 次(这是 cp_codeset["col_names"] 的大小)

以下是我的两个数据框,我正在尝试了解 result 的显示方式

P(这是 Pyspark 数据帧,这个数据帧太大,熊猫无法处理)

id|codes
1|100
2|102
3|104

cp_codeset(熊猫数据框)

codes| col_names
100|a
101|b
102|c
103|d
104|e
105|f

结果(pyspark 数据帧)

id|codes|a|c|e
1|100   |1|0|0
2|102   |0|1|0   
3|104   |0|0|1

【问题讨论】:

E-num / get Dummies in pyspark的可能重复 that 是你想要的吗?顺便说一句,如果您需要 PySpark 解决方案,为什么要设置 [pandas] 标签?这令人困惑...... @MaxU 很抱歉造成混淆。我只有 cp_codeset 作为 Panda 数据帧,而 P 作为 pyspark 数据帧。我想要 pyspark 数据框的结果。谢谢。 @Viraj,在这种情况下,您有来自piRSquared 和Boud 的两个答案 - 从 Pandas DF 生成 Spark DataFrame 应该非常简单...... 【参考方案1】:

过滤后的数据:

cp_codeset.set_index('codes').loc[p.codes]
Out[44]: 
      col_names
codes          
100           a
102           c
104           e

只需使用get_dummies:

pd.get_dummies(cp_codeset.set_index('codes').loc[p.codes])
Out[45]: 
       col_names_a  col_names_c  col_names_e
codes                                       
100              1            0            0
102              0            1            0
104              0            0            1

【讨论】:

【参考方案2】:

我会使用 get_dummiesjoin + map

m = cp_codeset.set_index('codes').col_names

P.join(pd.get_dummies(P.codes.map(m)))

   id  codes  a  c  e
0   1    100  1  0  0
1   2    102  0  1  0
2   3    104  0  0  1

【讨论】:

这给出了“TypeError: 'Column' object is not callable”的错误。如果我写 P.select("codes").map(m) 那么它给出“AttributeError: 'DataFrame' object has no attribute '_jdf'” @Viraj 你为什么写P.select?这在我的代码中没有。

以上是关于使用 PySpark 中的列表中的 withColumn 函数动态创建新列的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 过滤器使用列表中的startswith

Pyspark:检查数组类型列是不是包含列表中的值[重复]

pyspark中的未嵌套列表

reduceByKey PySpark 中的列表列表

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

是否可以使用 pyspark 过滤 Spark DataFrames 以返回列值在列表中的所有行?