Spark Scala:如何转换 DF 中的列
Posted
技术标签:
【中文标题】Spark Scala:如何转换 DF 中的列【英文标题】:Spark Scala: How to transform a column in a DF 【发布时间】:2016-05-04 23:44:46 【问题描述】:我在 Spark 中有一个数据框,其中包含许多列和一个我定义的 udf。我想要返回相同的数据框,除了转换一列。此外,我的 udf 接受一个字符串并返回一个时间戳。是否有捷径可寻?我试过了
val test = myDF.select("my_column").rdd.map(r => getTimestamp(r))
但这会返回一个 RDD,并且只返回转换后的列。
【问题讨论】:
【参考方案1】:如果你真的需要使用你的函数,我可以建议两个选项:
1) 使用地图/toDF:
import org.apache.spark.sql.Row
import sqlContext.implicits._
def getTimestamp: (String => java.sql.Timestamp) = // your function here
val test = myDF.select("my_column").rdd.map
case Row(string_val: String) => (string_val, getTimestamp(string_val))
.toDF("my_column", "new_column")
2) 使用 UDF (UserDefinedFunction
):
import org.apache.spark.sql.functions._
def getTimestamp: (String => java.sql.Timestamp) = // your function here
val newCol = udf(getTimestamp).apply(col("my_column")) // creates the new column
val test = myDF.withColumn("new_column", newCol) // adds the new column to original DF
this nice article by Bill Chambers 中有更多关于 Spark SQL UDF 的详细信息。
或者,
如果您只想将 StringType
列转换为 TimestampType
列,您可以使用自 Spark SQL 1.5 起提供的 unix_timestamp
column function:
val test = myDF
.withColumn("new_column", unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm").cast("timestamp"))
注意:对于 spark 1.5.x,需要在转换为时间戳之前将 unix_timestamp
的结果乘以 1000
(问题 SPARK-11724)。结果代码将是:
val test = myDF
.withColumn("new_column", (unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm") *1000L).cast("timestamp"))
编辑:添加 udf 选项
【讨论】:
感谢您的帮助。我遇到的唯一问题是当我执行 df.withColumn("start_date", unix_timestamp(df1("start_date"), "yyyy-MM-dd HH:mm:ss").cast("timestamp")) ,我的日期转换错误。例如:2013-08-12 06:40:54 转换为 1970-01-16 22:18:09.654。你碰巧知道会发生什么吗? 对于 spark 1.5,你必须在施法前乘以 1000以上是关于Spark Scala:如何转换 DF 中的列的主要内容,如果未能解决你的问题,请参考以下文章
如何在 if-else 条件下的列中使用 Spark 值 - Scala
过滤包含Scala Spark数据帧中数组的列中的数组长度[重复]
Scala - 如果 DF1 中的数据与 DF2 中的列匹配,则从 DF1 中删除记录 [重复]