在现有列的基础上在 DataFrame 中添加新列

Posted

技术标签:

【中文标题】在现有列的基础上在 DataFrame 中添加新列【英文标题】:Add new column in DataFrame base on existing column 【发布时间】:2015-07-06 17:09:53 【问题描述】:

我有一个包含日期时间列的 csv 文件:“2011-05-02T04:52:09+00:00”。

我正在使用 scala,文件被加载到 spark DataFrame 中,我可以使用 jodas time 来解析日期:

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = new SQLContext(sc).load("com.databricks.spark.csv", Map("path" -> "data.csv", "header" -> "true")) 
val d = org.joda.time.format.DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ssZ")

我想根据日期时间字段创建新列以进行时间序列分析。

在 DataFrame 中,如何根据另一列的值创建一列?

我注意到 DataFrame 具有以下功能:df.withColumn("dt",column),有没有办法根据现有列的值创建列?

谢谢

【问题讨论】:

您需要创建一个UDF并注册它。请看***.com/questions/29479872/… 【参考方案1】:
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

val d = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ssZ")
val dtFunc: (String => Date) = (arg1: String) => DateTime.parse(arg1, d).toDate
val x = df.withColumn("dt", callUDF(dtFunc, DateType, col("dt_string")))

callUDFcol 包含在functions 中,如import 所示

col("dt_string") 中的 dt_string 是您要转换的 df 的原始列名称。

或者,您可以将最后一条语句替换为:

val dtFunc2 = udf(dtFunc)
val x = df.withColumn("dt", dtFunc2(col("dt_string")))

【讨论】:

您好,感谢您的帖子。我实际上正在按照您的建议进行操作,但出现以下错误:“scala.MatchError: java.util.Date (of class scala.reflect.internal.Types$TypeRef$$anon$6)”

以上是关于在现有列的基础上在 DataFrame 中添加新列的主要内容,如果未能解决你的问题,请参考以下文章

Flex3 AdvancedDataGrid:如何在现有列的基础上添加新列?

如何在 Spark SQL 中向现有 Dataframe 添加新列

如何更改 pandas DataFrame 的最后 7 列的列名? [复制]

Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”

合并两个具有重叠 MultiIndex 列的 DataFrame

Scala DataFrame,将非空列的值复制到新列中