如何在多列上执行 udfs - 动态

Posted

技术标签:

【中文标题】如何在多列上执行 udfs - 动态【英文标题】:How to perform udfs on multiple columns- dynamically 【发布时间】:2018-01-16 06:39:29 【问题描述】:

我有 30 列,例如 DPF_1、DPF_2、DPF_3......DPF_30,我需要在其上应用数据帧。所有 30 列数据类型均为 String。 我的要求是将这 30 列中存在的所有“Na”值转换为“null”

我试过下面的代码,但它不是动态的。

def udf_A(x:StringType()):
    if x == "Na": return "null"
    else:return x
udf_B = udf(udf_A, StringType())

df.withColumn("DPF_1" udf_B("DPF_1"))
df.withColumn("DPF_2" udf_B("DPF_2"))
.
.
repeated till DPF_30 

现在我希望在 pyspark/scala 中以动态方式进行此过程,因为后面的列可能会随着不同的列名而增加。

【问题讨论】:

您总是可以使用regexp_replaceNa 转换为null @philantrovert 我还有很多其他要求。这可以通过使用 UDFS 轻松完成 所以你正试图将 Na 转换为 Null 并使用单个 UDF 做一堆其他的事情? @philantrovert:是的 【参考方案1】:

这里是 Scala 中的解决方案:

// columns which you want to keep 
val colsToSelect : Seq[Column] = ???
// columns which are applied to UDF
val selectUDFs : Seq[Column] = (1 to 30).map(i => udf_B(col(s"DPF_$i")).as(s"DPF_$i"))

df.select((colsToSelect++selectUDFs):_*)

【讨论】:

我认为这些列已经被称为DPF_1。你可以做df.columns.map(c => udf_B(c).as(c) ) 他没有提到他的数据框是否有任何其他列,因此我不想假设任何事情【参考方案2】:

您可以简单地将 30 列数据框 转换为 na dataframe 并将 replace 方法应用为

df.na.replace(df.columns, Map("Na" -> "null"))

您将所有Na 字符串替换为null 字符串。

【讨论】:

我还有很多其他要求。这可以通过使用 UDFS 轻松完成。 好的,所以对于这些要求,您可以编写 udfs 。所有列的要求是否相同?其他要求是什么? 是的,所有其他列的要求相同。需求取决于数据,这些需求主要可以使用 UDFS 中的 If、else 语句来处理。 每次我看到这个downvoters please do comment before you downvote 我都忍不住想downovte ;) 不开玩笑——我们不希望用户解释他们的投票,而且绝对不需要那样做。投票本身是不言自明的——向下箭头有它的title,它是这个答案没有用(对于答案)或这个问题没有显示任何研究工作;不清楚或没用。在某些情况下(答案是完全错误的,但听起来是正确的)提供解释是有道理的,但我一般不会期望。 我能理解你在说什么@user8371915。但是,如果答案没有缺陷,为什么有人会投反对票?【参考方案3】:

在 Scala 中,一种方法是使用过滤器组装列列表并遍历列表以使用您的 UDF 转换 DataFrame:

val cols = df.columns.filter(_.startsWith("DPF_"))

val df2 = cols.foldLeft( df )( (acc, c) => acc.withColumn(c, udf_B(df(c))) )

【讨论】:

【参考方案4】:

试试下面的代码,希望对你有帮助。

def udf_A(x:StringType()):
    if x == "Na": return "null"
    else:return x
udf_B = udf(udf_A, StringType())

import pyspark.sql.functions as psf

for c in df.dtypes:
    if "string" in c[1]:
        df=df.withColumn(c[0],udf_B(psf.col(c[0])))
df.show()

在这里, df.dtypes 为您提供具有列名和数据类型的元组数组

[('DPF_1', 'string'), ('DPF_2', 'string'), ('DPF_3', 'string')... ]

c[0] 表示列名,c[1] 表示数据类型,在您的情况下为 string

【讨论】:

这工作得很好。谢谢,能否请您解释一下dtypes 在上述过程中实际上会做什么

以上是关于如何在多列上执行 udfs - 动态的主要内容,如果未能解决你的问题,请参考以下文章

Hive UDF 返回多列输出

Pyspark udf 在接受多列作为输入的条件定义上返回一列

如何基于查找数据框创建数据框并在特定列中的动态和映射值上创建多列

ExcelDNA 在运行时动态注册 UDF

Pyspark:在UDF中传递多列以及参数

如何在不更改 pvalue 的情况下为 kruskal wallis 测试执行循环比较多列上的 3 个组?