如何在多列上执行 udfs - 动态
Posted
技术标签:
【中文标题】如何在多列上执行 udfs - 动态【英文标题】:How to perform udfs on multiple columns- dynamically 【发布时间】:2018-01-16 06:39:29 【问题描述】:我有 30 列,例如 DPF_1、DPF_2、DPF_3......DPF_30,我需要在其上应用数据帧。所有 30 列数据类型均为 String
。
我的要求是将这 30 列中存在的所有“Na”值转换为“null”
我试过下面的代码,但它不是动态的。
def udf_A(x:StringType()):
if x == "Na": return "null"
else:return x
udf_B = udf(udf_A, StringType())
df.withColumn("DPF_1" udf_B("DPF_1"))
df.withColumn("DPF_2" udf_B("DPF_2"))
.
.
repeated till DPF_30
现在我希望在 pyspark/scala 中以动态方式进行此过程,因为后面的列可能会随着不同的列名而增加。
【问题讨论】:
您总是可以使用regexp_replace
将Na
转换为null
@philantrovert 我还有很多其他要求。这可以通过使用 UDFS 轻松完成
所以你正试图将 Na 转换为 Null 并使用单个 UDF 做一堆其他的事情?
@philantrovert:是的
【参考方案1】:
这里是 Scala 中的解决方案:
// columns which you want to keep
val colsToSelect : Seq[Column] = ???
// columns which are applied to UDF
val selectUDFs : Seq[Column] = (1 to 30).map(i => udf_B(col(s"DPF_$i")).as(s"DPF_$i"))
df.select((colsToSelect++selectUDFs):_*)
【讨论】:
我认为这些列已经被称为DPF_1
。你可以做df.columns.map(c => udf_B(c).as(c) )
他没有提到他的数据框是否有任何其他列,因此我不想假设任何事情【参考方案2】:
您可以简单地将 30 列数据框 转换为 na
dataframe
并将 replace
方法应用为
df.na.replace(df.columns, Map("Na" -> "null"))
您将所有Na
字符串替换为null
字符串。
【讨论】:
我还有很多其他要求。这可以通过使用 UDFS 轻松完成。 好的,所以对于这些要求,您可以编写 udfs 。所有列的要求是否相同?其他要求是什么? 是的,所有其他列的要求相同。需求取决于数据,这些需求主要可以使用 UDFS 中的 If、else 语句来处理。 每次我看到这个downvoters please do comment before you downvote
我都忍不住想downovte ;) 不开玩笑——我们不希望用户解释他们的投票,而且绝对不需要那样做。投票本身是不言自明的——向下箭头有它的title
,它是这个答案没有用(对于答案)或这个问题没有显示任何研究工作;不清楚或没用。在某些情况下(答案是完全错误的,但听起来是正确的)提供解释是有道理的,但我一般不会期望。
我能理解你在说什么@user8371915。但是,如果答案没有缺陷,为什么有人会投反对票?【参考方案3】:
在 Scala 中,一种方法是使用过滤器组装列列表并遍历列表以使用您的 UDF 转换 DataFrame:
val cols = df.columns.filter(_.startsWith("DPF_"))
val df2 = cols.foldLeft( df )( (acc, c) => acc.withColumn(c, udf_B(df(c))) )
【讨论】:
【参考方案4】:试试下面的代码,希望对你有帮助。
def udf_A(x:StringType()):
if x == "Na": return "null"
else:return x
udf_B = udf(udf_A, StringType())
import pyspark.sql.functions as psf
for c in df.dtypes:
if "string" in c[1]:
df=df.withColumn(c[0],udf_B(psf.col(c[0])))
df.show()
在这里,
df.dtypes
为您提供具有列名和数据类型的元组数组
[('DPF_1', 'string'), ('DPF_2', 'string'), ('DPF_3', 'string')... ]
c[0]
表示列名,c[1]
表示数据类型,在您的情况下为 string
。
【讨论】:
这工作得很好。谢谢,能否请您解释一下dtypes
在上述过程中实际上会做什么以上是关于如何在多列上执行 udfs - 动态的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark udf 在接受多列作为输入的条件定义上返回一列