Spark Scala:如何转换 DF 中的列

Posted

技术标签:

【中文标题】Spark Scala:如何转换 DF 中的列【英文标题】:Spark Scala: How to transform a column in a DF 【发布时间】:2016-05-04 23:44:46 【问题描述】:

我在 Spark 中有一个数据框,其中包含许多列和一个我定义的 udf。我想要返回相同的数据框,除了转换一列。此外,我的 udf 接受一个字符串并返回一个时间戳。是否有捷径可寻?我试过了

val test = myDF.select("my_column").rdd.map(r => getTimestamp(r)) 

但这会返回一个 RDD,并且只返回转换后的列。

【问题讨论】:

【参考方案1】:

如果你真的需要使用你的函数,我可以建议两个选项:

1) 使用地图/toDF:

import org.apache.spark.sql.Row
import sqlContext.implicits._

def getTimestamp: (String => java.sql.Timestamp) = // your function here

val test = myDF.select("my_column").rdd.map 
  case Row(string_val: String) => (string_val, getTimestamp(string_val))
.toDF("my_column", "new_column")

2) 使用 UDF (UserDefinedFunction):

import org.apache.spark.sql.functions._

def getTimestamp: (String => java.sql.Timestamp) = // your function here

val newCol = udf(getTimestamp).apply(col("my_column")) // creates the new column
val test = myDF.withColumn("new_column", newCol) // adds the new column to original DF

this nice article by Bill Chambers 中有更多关于 Spark SQL UDF 的详细信息。


或者

如果您只想将 StringType 列转换为 TimestampType 列,您可以使用自 Spark SQL 1.5 起提供的 unix_timestamp column function:

val test = myDF
  .withColumn("new_column", unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm").cast("timestamp"))

注意:对于 spark 1.5.x,需要在转换为时间戳之前将 unix_timestamp 的结果乘以 1000(问题 SPARK-11724)。结果代码将是:

val test = myDF
  .withColumn("new_column", (unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm") *1000L).cast("timestamp"))

编辑:添加 udf 选项

【讨论】:

感谢您的帮助。我遇到的唯一问题是当我执行 df.withColumn("start_date", unix_timestamp(df1("start_date"), "yyyy-MM-dd HH:mm:ss").cast("timestamp")) ,我的日期转换错误。例如:2013-08-12 06:40:54 转换为 1970-01-16 22:18:09.654。你碰巧知道会发生什么吗? 对于 spark 1.5,你必须在施法前乘以 1000

以上是关于Spark Scala:如何转换 DF 中的列的主要内容,如果未能解决你的问题,请参考以下文章

如何在 if-else 条件下的列中使用 Spark 值 - Scala

过滤包含Scala Spark数据帧中数组的列中的数组长度[重复]

Scala - 如果 DF1 中的数据与 DF2 中的列匹配,则从 DF1 中删除记录 [重复]

Scala(Spark)连接数据框中的列[重复]

SPARK 数据框错误:在使用 UDF 拆分列中的字符串时无法转换为 scala.Function2

使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]