计算每行并在 DataFrame PySpark 中添加新列 - 更好的解决方案?
Posted
技术标签:
【中文标题】计算每行并在 DataFrame PySpark 中添加新列 - 更好的解决方案?【英文标题】:Calculate per row and add new column in DataFrame PySpark - better solution? 【发布时间】:2017-01-21 14:28:09 【问题描述】:我在 PySpark 中使用数据框 我有以下任务:检查每列中有多少“次”值对于所有列都大于 2。对于 u1 它是 0,对于 u2 => 2 等等
user a b c d times
u1 1 0 1 0 0
u2 0 1 4 3 2
u3 2 1 7 0 1
我的解决方案如下。它有效,我不确定这是不是最好的方法,也没有尝试过真正的大数据。我不喜欢转换为 rdd 并返回数据框。有更好的吗?我一开始是按每列的 UDF 计算的,但没有找到一种方法来累积和汇总每行的所有结果:
def calculate_times(row):
times = 0
for index, item in enumerate(row):
if not isinstance(item, basestring):
if item > 2:
times = times+1
return times
def add_column(pair):
return dict(pair[0].asDict().items() + [("is_outlier", pair[1])])
def calculate_times_for_all(df):
rdd_with_times = df.map(lambda row: (calculate_times(row))
rdd_final = df.rdd.zip(rdd_with_times).map(add_column)
df_final = sqlContext.createDataFrame(rdd_final)
return df_final
对于这个解决方案,我使用了这个主题 How do you add a numpy.array as a new column to a pyspark.SQL DataFrame?
谢谢!
【问题讨论】:
【参考方案1】:这只是一个简单的单行。示例数据:
df = sc.parallelize([
("u1", 1, 0, 1, 0), ("u2", 0, 1, 4, 3), ("u3", 2, 1, 7, 0)
]).toDF(["user", "a", "b", "c", "d"])
withColumn
:
df.withColumn("times", sum((df[c] > 2).cast("int") for c in df.columns[1:]))
结果:
+----+---+---+---+---+-----+
|user| a| b| c| d|times|
+----+---+---+---+---+-----+
| u1| 1| 0| 1| 0| 0|
| u2| 0| 1| 4| 3| 2|
| u3| 2| 1| 7| 0| 1|
+----+---+---+---+---+-----+
注意:
它的列是nullable
,你应该纠正它,例如使用coalesce
:
from pyspark.sql.functions import coalesce
sum(coalesce((df[c] > 2).cast("int"), 0) for c in df.columns[1:])
【讨论】:
以上是关于计算每行并在 DataFrame PySpark 中添加新列 - 更好的解决方案?的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 无法计算 Koalas DataFrame 中的按列标准差