我们可以在 Spark DataFrame 列中使用 Pandas 函数吗?如果是这样,怎么做?

Posted

技术标签:

【中文标题】我们可以在 Spark DataFrame 列中使用 Pandas 函数吗?如果是这样,怎么做?【英文标题】:Can we use a Pandas function in a Spark DataFrame column ? If so, how? 【发布时间】:2016-06-10 21:46:01 【问题描述】:

我有一个名为“pd_df”的熊猫数据框。

我想修改它的列,所以我这样做:

    import pandas as pd

    pd_df['notification_dt'] = pd.to_datetime(pd_df['notification_dt'], format="%Y-%m-%d")

有效。

在同一个数据库上,我创建了一个名为“spark_df”的 spark 数据框

我希望在它的列上使用相同的函数 (pd.to_datatime) 来执行相同的操作。所以我就这么做了。

    from pyspark.sql.functions import UserDefinedFunction

    from pyspark.sql.types import TimestampType

    udf = UserDefinedFunction(lambda x: pd.to_datetime(x, format="%Y-%m-%d"), TimestampType())

    spark_df2 = spark_df.withColumn("notification_dt1", (udf(spark_df["notification_dt"])))

据我说,它应该可以工作。然而在

   spark_df.show()

大约一分钟后我遇到以下错误:

【问题讨论】:

【参考方案1】:

所以,解决了。

 udf = UserDefinedFunction(lambda x: pd.to_datetime(x, format="%Y-%m-%d"), TimestampType())

应该是

 udf = UserDefinedFunction(lambda x: str(pd.to_datetime(x, format="%Y-%m-%d")), TimestampType())

基本上是无法将结果转换为 TimestampType()

【讨论】:

以上是关于我们可以在 Spark DataFrame 列中使用 Pandas 函数吗?如果是这样,怎么做?的主要内容,如果未能解决你的问题,请参考以下文章

Spark DataFrame ArrayType 或 MapType 用于检查列中的值

将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中

使用 Spark Dataframe 列中的数据作为条件或输入另一个列表达式

Spark- How to concatenate DataFrame columns

使用 Spark 和 Scala 清理大小约为 40GB 的 CSV/Dataframe

Spark Scala 聚合组 Dataframe