如何在数据框中的每一列上运行 udf?

Posted

技术标签:

【中文标题】如何在数据框中的每一列上运行 udf?【英文标题】:How to run udf on every column in a dataframe? 【发布时间】:2018-09-06 20:08:44 【问题描述】:

我有一个 UDF:

val TrimText = (s: AnyRef) => 
    //does logic returns string

还有一个数据框:

var df = spark.read.option("sep", ",").option("header", "true").csv(root_path + "/" + file)

我想对数据框中每一列的每个值执行TrimText

但是,问题是,我的列数是动态的。我知道我可以通过df.columns 获取列列表。但我不确定这将如何帮助我解决我的问题。我该如何解决这个问题?

TLDR 问题 - 当数据帧的列数未知时,对数据帧中的每一列执行 UDF


尝试使用:

df.columns.foldLeft( df )( (accDF, c) =>
  accDF.withColumn(c, TrimText(col(c)))
)

抛出此错误:

error: type mismatch;
 found   : String
 required: org.apache.spark.sql.Column
accDF.withColumn(c, TrimText(col(c)))

TrimText 假设返回一个字符串,并期望输入是列中的一个值。因此,它将标准化整个数据帧每一行中的每个值。

【问题讨论】:

【参考方案1】:

您可以使用foldLeft 遍历列列表以使用您的UDF 将withColumn 迭代应用到DataFrame:

df.columns.foldLeft( df )( (accDF, c) =>
  accDF.withColumn(c, TrimText(col(c)))
)

【讨论】:

@test acc,您需要将TrimText 包裹在udf() 中以使其成为UDF。我还假设该函数具有处理各种数据类型的必要逻辑。 抛出类似的错误,withColumn 在其第二个参数中期待一个字符串值,而udf(TrimText(col(c))) 返回一个org.apache.spark.sql.expressions.UserDefinedFunction 对象 你申请udf()的方式不对。您要么直接将trimText 定义为udf( (...) => ... );或定义val trimTextUdf = udf(trimText) 并像trimTextUdf(col(c)) 一样使用它 @testacc:这是最好的方法。例如,请参阅:***.com/questions/44819019/…,了解如何创建和应用 UDF。【参考方案2】:
>> I would like to perform TrimText on every value in every column in the dataframe.
>> I have a dynamic number of columns.

当 sql 函数可用于修剪 UDF 时,可以看到下面的代码适合您吗?

import org.apache.spark.sql.functions._

spark.udf.register("TrimText", (x:String) =>  ..... )

val df2 = sc.parallelize(List(
  (26, true, 60000.00),
  (32, false, 35000.00)
)).toDF("age", "education", "income")

val cols2 = df2.columns.toSet
df2.createOrReplaceTempView("table1")

val query = "select " + buildcolumnlst(cols2) + " from table1 "
println(query)
val dfresult = spark.sql(query)
dfresult.show()

def buildcolumnlst(myCols: Set[String]) = 
  myCols.map(x => "TrimText(" + x + ")" + " as " + x).mkString(",") 

结果,

select trim(age) as age,trim(education) as education,trim(income) as income from table1 
+---+---------+-------+
|age|education| income|
+---+---------+-------+
| 26|     true|60000.0|
| 32|    false|35000.0|
+---+---------+-------+

【讨论】:

因为TrimText 的作用远超过trim 文本。它就是它的名字。不受我控制。 好吧,那么你可以在那边的方法“buildcolumnlst”中修补UDF函数,而不是sql函数“trim”。 (对上述代码进行了编辑)【参考方案3】:
val a = sc.parallelize(Seq(("1 "," 2"),(" 3","4"))).toDF()
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
def TrimText(s: Column): Column = 
//does logic returns string
  trim(s)

a.select(a.columns.map(c => TrimText(col(c))):_*).show

【讨论】:

trimspark 中的函数与我创建的TrimText 函数不同,该函数可能命名不当,不是一回事。 但是如果你想在列上操作任何东西都没有关系,像这样传递你的参数并且返回类型将是列 好的。我尝试了您的解决方案并引发了此错误 - error: no `: _*' annotation allowed here (such annotations are only allowed in arguments to *-parameters) df = df.select(df.columns.map(c => TrimText(col(c))) : _*) 请分享你的trimtext函数逻辑,它的返回类型应该是一列 它是专有的。它返回一个字符串。我期待一个列值(字符串)。因此,如果我有一百万条记录和 5 列。该函数将被调用 500 万次,每列的每条记录的每个值都调用一次。输出将始终是一个字符串,

以上是关于如何在数据框中的每一列上运行 udf?的主要内容,如果未能解决你的问题,请参考以下文章

请问如何获得GridView选中行的每一列的信息?

将函数应用于数据框中的每一列,观察每一列现有的数据类型

用 Python 用该列的平均值减去数据框中的每一列

如何计算熊猫数据框中每一列的唯一性?

如何在不指定每一列的情况下将整行作为参数传递给 Spark(Java)中的 UDF?

R - 对于数据框中的每一行,如何检查是不是至少有一列不是 NA? [复制]