UDF 无法在 spark scala 中获取文件名

Posted

技术标签:

【中文标题】UDF 无法在 spark scala 中获取文件名【英文标题】:UDF is not working to get file name in spark scala 【发布时间】:2017-10-06 13:14:00 【问题描述】:

这就是我在 spark 数据帧中使用 UDF 的方式 ..

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark. SparkConf, SparkContext 
    import java.sql.Date, Timestamp
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4))


val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)

df1result.withColumn("DataPartition", get_cus_val(input_file_name)).show()

但是当我运行它时,我得到以下错误

<console>:545: error: not found: value get_cus_val
       df1result.withColumn("DataPartition", get_cus_val(input_file_name)).show() 

但是如果我这样做的话,我可以得到带有完整路径的文件名..

df1result.withColumn("DataPartition", input_file_name).show()

知道我错过了什么吗?

【问题讨论】:

【参考方案1】:

这不起作用,因为您只注册了 SQL 函数。你可以试试

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4))

df1result.selectExpr("*", "get_cus_val(input_file_name) as DataPartition").show()

【讨论】:

【参考方案2】:

你可以试试这个。它对我有用。

df.withColumn("file_name",callUDF("get_cus_val", input_file_name()))

【讨论】:

以上是关于UDF 无法在 spark scala 中获取文件名的主要内容,如果未能解决你的问题,请参考以下文章

SPARK 数据框错误:在使用 UDF 拆分列中的字符串时无法转换为 scala.Function2

为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]

如何使用替代方法解决重载方法值寄存器,UDF Spark scala

使用 Scala 从 Spark 的 withColumn 中调用 udf 时出错

Scala中的Spark分组映射UDF

使用 scala 在 spark sql 中编写 UDF