使用 UDF 从 Apache Spark 中的其他列创建新列

Posted

技术标签:

【中文标题】使用 UDF 从 Apache Spark 中的其他列创建新列【英文标题】:Making new Column from other column in Apache Spark using UDF 【发布时间】:2018-10-26 16:17:19 【问题描述】:

我正在尝试从 Apache Spark 中的另一列创建一个新列。

数据(高度缩写)看起来像

Date    Day_of_Week
2018-05-26T00:00:00.000+0000    5
2018-05-05T00:00:00.000+0000    6

应该看起来像

Date    Day_of_Week    Weekday
2018-05-26T00:00:00.000+0000    5    Thursday
2018-05-05T00:00:00.000+0000    6    Friday

我已经尝试了手册中的建议https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#register-the-function-as-a-udf & How to pass a constant value to Python UDF? & PySpark add a column to a DataFrame from a TimeStampType column

导致:

def int2day (day_int):
  if day_int == 1:
    return 'Sunday'
  elif day_int == 2:
    return 'Monday'
  elif day_int == 3:
    return 'Tuesday'
  elif day_int == 4:
    return 'Wednesday'
  elif day_int == 5:
    return 'Thursday'
  elif day_int == 6:
    return 'Friday'
  elif day_int == 7:
    return 'Saturday'
  else:
    return 'FAIL'

spark.udf.register("day", int2day, IntegerType())
df2 = df.withColumn("Day", day("Day_of_Week"))

并给出一个很长的错误

SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 262, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 257, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 325, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 141, in dump_stream
    self._write_with_length(obj, stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 151, in _write_with_length
    serialized = self.dumps(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 556, in dumps
    return pickle.dumps(obj, protocol)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我不知道如何在此处申请 How to pass a constant value to Python UDF?,因为他们的示例要简单得多(仅对或错)

我也尝试过使用地图功能,如PySpark add a column to a DataFrame from a TimeStampType column

但是

df3 = df2.withColumn("weekday", map(lambda x: int2day, col("Date"))) 只是说TypeError: argument 2 to map() must support iteration 但我认为col 确实支持迭代。

我已经阅读了我能找到的所有在线示例。我不知道如何将其他问题提出的问题应用到我的案例中。

如何使用另一列的功能添加另一列?

【问题讨论】:

你shouldn't use a udf for this。请参阅this post 了解如何执行 IF-THEN-ELSE 逻辑。 如果您确实想使用 udf ,则您的语法不正确。返回类型应该是StringType() 而不是整数。有关正确语法的示例,您可以参考this post。 【参考方案1】:

您根本不需要 UDF 来完成您想要做的事情。您可以利用内置的 pyspark date_format 函数提取列中给定日期的一周中每一天的名称。

import pyspark.sql.functions as func
df = df.withColumn("day_of_week", func.date_format(func.col("Date"), "EEEE"))

结果是一个名为day_of_week 的新列添加到您的数据框中,它将根据Date 列中的值显示星期日、星期一、星期二等。

【讨论】:

在此示例中,"Date" 列看起来不像是 DateType。在您可以使用 date_format 之前,StringType 可能是 first needs to be converted。

以上是关于使用 UDF 从 Apache Spark 中的其他列创建新列的主要内容,如果未能解决你的问题,请参考以下文章

Scala Spark 中的 udf 运行时错误

Apache Spark Python UDF 失败

为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]

Apache Spark - UDF 似乎不适用于 spark-submit

更改 Spark GraphFrame 中的字符串列

在Apache Spark中使用UDF