如何在数据框中的每一列上运行 udf?
Posted
技术标签:
【中文标题】如何在数据框中的每一列上运行 udf?【英文标题】:How to run udf on every column in a dataframe? 【发布时间】:2018-09-06 20:08:44 【问题描述】:我有一个 UDF:
val TrimText = (s: AnyRef) =>
//does logic returns string
还有一个数据框:
var df = spark.read.option("sep", ",").option("header", "true").csv(root_path + "/" + file)
我想对数据框中每一列的每个值执行TrimText
。
但是,问题是,我的列数是动态的。我知道我可以通过df.columns
获取列列表。但我不确定这将如何帮助我解决我的问题。我该如何解决这个问题?
TLDR 问题 - 当数据帧的列数未知时,对数据帧中的每一列执行 UDF
尝试使用:
df.columns.foldLeft( df )( (accDF, c) =>
accDF.withColumn(c, TrimText(col(c)))
)
抛出此错误:
error: type mismatch;
found : String
required: org.apache.spark.sql.Column
accDF.withColumn(c, TrimText(col(c)))
TrimText
假设返回一个字符串,并期望输入是列中的一个值。因此,它将标准化整个数据帧每一行中的每个值。
【问题讨论】:
【参考方案1】:您可以使用foldLeft
遍历列列表以使用您的UDF 将withColumn
迭代应用到DataFrame:
df.columns.foldLeft( df )( (accDF, c) =>
accDF.withColumn(c, TrimText(col(c)))
)
【讨论】:
@test acc,您需要将TrimText
包裹在udf()
中以使其成为UDF。我还假设该函数具有处理各种数据类型的必要逻辑。
抛出类似的错误,withColumn
在其第二个参数中期待一个字符串值,而udf(TrimText(col(c)))
返回一个org.apache.spark.sql.expressions.UserDefinedFunction
对象
你申请udf()
的方式不对。您要么直接将trimText
定义为udf( (...) => ... )
;或定义val trimTextUdf = udf(trimText)
并像trimTextUdf(col(c))
一样使用它
@testacc:这是最好的方法。例如,请参阅:***.com/questions/44819019/…,了解如何创建和应用 UDF。【参考方案2】:
>> I would like to perform TrimText on every value in every column in the dataframe.
>> I have a dynamic number of columns.
当 sql 函数可用于修剪 UDF 时,可以看到下面的代码适合您吗?
import org.apache.spark.sql.functions._
spark.udf.register("TrimText", (x:String) => ..... )
val df2 = sc.parallelize(List(
(26, true, 60000.00),
(32, false, 35000.00)
)).toDF("age", "education", "income")
val cols2 = df2.columns.toSet
df2.createOrReplaceTempView("table1")
val query = "select " + buildcolumnlst(cols2) + " from table1 "
println(query)
val dfresult = spark.sql(query)
dfresult.show()
def buildcolumnlst(myCols: Set[String]) =
myCols.map(x => "TrimText(" + x + ")" + " as " + x).mkString(",")
结果,
select trim(age) as age,trim(education) as education,trim(income) as income from table1
+---+---------+-------+
|age|education| income|
+---+---------+-------+
| 26| true|60000.0|
| 32| false|35000.0|
+---+---------+-------+
【讨论】:
因为TrimText
的作用远超过trim
文本。它就是它的名字。不受我控制。
好吧,那么你可以在那边的方法“buildcolumnlst”中修补UDF函数,而不是sql函数“trim”。 (对上述代码进行了编辑)【参考方案3】:
val a = sc.parallelize(Seq(("1 "," 2"),(" 3","4"))).toDF()
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
def TrimText(s: Column): Column =
//does logic returns string
trim(s)
a.select(a.columns.map(c => TrimText(col(c))):_*).show
【讨论】:
trim
spark 中的函数与我创建的TrimText
函数不同,该函数可能命名不当,不是一回事。
但是如果你想在列上操作任何东西都没有关系,像这样传递你的参数并且返回类型将是列
好的。我尝试了您的解决方案并引发了此错误 - error: no `: _*' annotation allowed here (such annotations are only allowed in arguments to *-parameters) df = df.select(df.columns.map(c => TrimText(col(c))) : _*)
请分享你的trimtext函数逻辑,它的返回类型应该是一列
它是专有的。它返回一个字符串。我期待一个列值(字符串)。因此,如果我有一百万条记录和 5 列。该函数将被调用 500 万次,每列的每条记录的每个值都调用一次。输出将始终是一个字符串,以上是关于如何在数据框中的每一列上运行 udf?的主要内容,如果未能解决你的问题,请参考以下文章