Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”

Posted

技术标签:

【中文标题】Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”【英文标题】:Pyspark - create new column from operations of DataFrame columns gives error "Column is not iterable" 【发布时间】:2017-06-08 01:08:09 【问题描述】:

我有一个 PySpark DataFrame,我尝试了许多示例来展示如何根据现有列的操作创建一个新列,但它们似乎都不起作用。

所以我有一个问题:

1- 为什么这段代码不起作用?

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

sc = SparkContext()
sqlContext = SQLContext(sc)

a = sqlContext.createDataFrame([(5, 5, 3)], ['A', 'B', 'C'])
a.withColumn('my_sum', F.sum(a[col] for col in a.columns)).show()

我得到错误: TypeError: Column is not iterable

编辑:答案 1

我发现了如何进行这项工作。我必须使用原生 Python sum 函数。 a.withColumn('my_sum', F.sum(a[col] for col in a.columns)).show()。它有效,但我不知道为什么。

2- 如果有办法使这个总和起作用,我该如何编写一个udf 函数来做到这一点(并将结果添加到 DataFrame 的新列中)?

import numpy as np
def my_dif(row):
    d = np.diff(row) # creates an array of differences element by element
    return d.mean() # returns the mean of the array

我正在使用 Python 3.6.1 和 Spark 2.1.1。

谢谢!

【问题讨论】:

你找到的答案在哪里? @eliasah,我的答案是使用原生 Python sum 函数。我已经用示例编辑了我的帖子。 【参考方案1】:
a = sqlContext.createDataFrame([(5, 5, 3)], ['A', 'B', 'C'])
a = a.withColumn('my_sum', F.UserDefinedFunction(lambda *args: sum(args), IntegerType())(*a.columns))
a.show()

+---+---+---+------+
|  A|  B|  C|my_sum|
+---+---+---+------+
|  5|  5|  3|    13|
+---+---+---+------+

【讨论】:

谢谢!这对我有用。但是你能解释一下这个例子中的参数是如何工作的吗?我对*args*a.columns 有点困惑。【参考方案2】:

你的问题出在这部分for col in a.columns 因为你不能迭代结果,所以你必须:

a = a.withColumn('my_sum', a.A + a.B + a.C)

【讨论】:

感谢您的回答,dannyeuu。但问题是,在我的真实数据集中,我有数百列,所以我不能明确地一一输入。我是从the version 2 of this post 那里得到的,而且我在许多其他答案中也看到过类似的情况。 类似***.com/questions/36584812/…

以上是关于Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:转换DataFrame中给定列的值

计算 PySpark DataFrame 列的模式?

基于pyspark中仅一列的两个DataFrame之间的差异[重复]

跨 PySpark DataFrame 列的字符串匹配

如何将 PySpark Dataframe 列的类型指定为 JSON

PySpark Dataframe:将一个单词附加到列的每个值