Pyspark - 如何将函数仅应用于 DataFrame 中的列子集?

Posted

技术标签:

【中文标题】Pyspark - 如何将函数仅应用于 DataFrame 中的列子集?【英文标题】:Pyspark - How to apply a function only to a subset of columns in a DataFrame? 【发布时间】:2020-07-01 19:41:02 【问题描述】:

我想用不同的方法将函数应用于Spark DataFrame 的某些列:fnfn1。我是这样做的:

def fn(column):
    return(x*2)

udf_1 = udf(fn, DecimalType())

def fn1(column):
    return(x*3)

udf_2 = udf(fn1, DecimalType())
    
def process_df1(df, col_name):
    df1 = df.withColumn(col_name, udf_1(col_name))
    return df1

def process_df2(df, col_name):
    df2 = df.withColumn(col_name, udf_2(col_name))
    return df2

对于单个列,它可以正常工作。但现在我得到了dicts 中的list,其中包含有关各个列的信息:

cols_info = ['col_name': 'metric_1', 'process': 'True', 'method':'simple', 'col_name': 'metric_2', 'process': 'False', 'method':'hash'] 

我应该如何解析cols_info 列表并将上述逻辑仅应用于具有process:True 并使用必需的method 的列?

首先想到的是过滤掉带有process:False的列

list(filter(lambda col_info: col_info['process'] == 'True', cols_info))

但我在这里仍然缺少更通用的方法。

【问题讨论】:

【参考方案1】:

selectExpr 函数在这里很有用

import pyspark.sql.functions as F
from pyspark.sql.window import Window
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(1,3,4,1),(1,4,5,5),(1,6,7,8),(2,1,9,2),(2,2,9,9)],schema=['col1','col2','col3','col4'])    

def fn(x):
    return(x*2)

def fn1(x):
    return(x*3)

sqlContext.udf.register("fn1", fn)
sqlContext.udf.register("fn2", fn1)

cols_info =['col_name':'col1','encrypt':False,,'col_name':'col2','encrypt':True,'method':'fn1','col_name':'col3','encrypt':True,'method':'fn2']
# determine which columns have any of the encryption
modified_columns = [x['col_name'] for x in cols_info if x['encrypt']]
# select which colulmns have to be retained
columns_retain = list(set(tst.columns)-set(modified_columns))
#%
expr =columns_retain+[((x['method'])+'('+(x['col_name'])+') as '+ x['col_name']) for x in cols_info if x['encrypt']]
#%
tst_res = tst.selectExpr(*expr)

结果将是:

+----+----+----+----+
|col4|col1|col2|col3|
+----+----+----+----+
|   4|   1|   4|   9|
|   1|   1|   6|  12|
|   5|   1|   8|  15|
|   8|   1|  12|  21|
|   2|   2|   2|  27|
|   9|   2|   4|  27|
+----+----+----+----+

【讨论】:

谢谢。但是如何将 withColumn 用于整个 DF,然后将函数应用于特定列? 或者选择一个DF的所有列,包括修改后的原始名称列? 你的意思是,你想将这些列合并到整个数据框?这里不需要withColumn,可以在expr函数中添加已有的列。查看我编辑的答案,让我知道它是否有效 是的,我需要保留整个 DF,并且列应保留其原始名称(col2,col3),其中包含修改后的值。不保留旧值 您的意思是,您必须将计算列重命名为现有列?然后你可以像这样在你的 expr 中包含 as 语句: expr =[((x['method'])+'('+(x['col_name'])+') as '+ x['col_name'] ) 对于 cols_info 中的 x 如果 x['encrypt']]。请注意,我们这里的 expr 中没有包含 tst.columns。

以上是关于Pyspark - 如何将函数仅应用于 DataFrame 中的列子集?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 将模式应用于 csv - 仅返回空值

如何按行将函数应用于 PySpark 数据帧的一组列?

如何使用 pyspark.resultiterable.ResultIterable 对象

将用户定义的函数应用于 PySpark 数据帧并返回字典

PySpark 结构化流将 udf 应用于窗口

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?