如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?

Posted

技术标签:

【中文标题】如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?【英文标题】:How to add multiple columns to pyspark DF using pandas_udf with multiple source columns? 【发布时间】:2021-01-14 11:21:11 【问题描述】:

我需要根据时区从utc_timestamp 中提取其日期和时间到两个不同的列中。时区名称由配置常量变量中的id 定义。

    Input DF                                           Output DF
+-------------+--+                         +-------------+--+----------+----+
|utc_timestamp|id|                         |utc_timestamp|id|date      |hour|
+-------------+--+                         +-------------+--+----------+----|
|1608000000782|1 |                         |1608000000782|1 |2020-12-14|20  |
+-------------+--+                         +-------------+--+----------+----+
|1608000240782|2 |                         |1608000240782|2 |2020-12-15|11  |
+-------------+--+                         +-------------+--+----------+----+

我有 pandas_udf,它允许我一次提取一列,我必须创建它两次:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType, IntegerType 
import pandas as pd
import pytz

TIMEZONE_LIST = 1: 'America/Chicago', 2: 'Asia/Tokyo'


class TimezoneUdfProvider(object):
    def __init__(self):
        self.extract_date_udf = pandas_udf(self._extract_date, DateType(), PandasUDFType.SCALAR)
        self.extract_hour_udf = pandas_udf(self._extract_hour, IntegerType(), PandasUDFType.SCALAR)
        
     def _extract_date(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_date(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

     def _extract_hour(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_hour(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

def extract_date(utc_timestamp: int, id: str):
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).date()

def extract_hour(utc_timestamp: int, id: str) -> int:
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).hour
    

def extract_from_utc(df: DataFrame) -> DataFrame:
     timezone_udf1 = TimezoneUdfProvider()
     df_with_date = df.withColumn('date', timezone_udf1.extract_date_udf(f.col(utc_timestamp), f.col(id)))
     timezone_udf2 = TimezoneUdfProvider()
     df_with_hour = df_with_date.withColumn('hour', timezone_udf2.extract_hour_udf(f.col(utc_timestamp), f.col(id)))
    return df_with_hour

有没有更好的方法呢?不需要两次使用同一个 udf 提供程序?

【问题讨论】:

【参考方案1】:

您可以在不使用 udf 的情况下使用 spark 内置函数来做到这一点。

我们可以使用create_map 映射字典并创建新的时区列,然后使用from_unixtimefrom_utc_timestamp 将时区作为新映射的列进行转换。一旦我们根据时区获得时间戳,我们就可以获取小时和日期字段。

TIMEZONE_LIST = 1: 'America/Chicago', 2: 'Asia/Tokyo'

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields]

+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+

编辑:将create_map 替换为udf

import pyspark.sql.functions as F
from pyspark.sql.functions import StringType
TIMEZONE_LIST = 1: 'America/Chicago', 2: 'Asia/Tokyo'
def fun(x):
  return TIMEZONE_LIST.get(x,None)
map_udf = F.udf(fun,StringType())


final = (df.withColumn("TimeZone", map_udf("id")).withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

【讨论】:

create_map 对我不起作用,我收到错误消息:Traceback (most recent call last): File "<input>", line 1, in <module> File "<input>", line 1, in <listcomp> File "/home/ninav/venvs/acutecare-rdb-nlp-processor/lib/python3.6/site-packages/pyspark/sql/functions.py", line 44, in _ jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) AttributeError: 'NoneType' object has no attribute '_jvm' 我尝试了示例 TIMEZONE_LIST 并收到错误 @NinaVolfenzon 更新了我使用 udf 而不是 create_map 的答案,您可以在共享的示例数据框上尝试第二种方法吗? 我按照您在第一个示例中展示的方式进行了导入。我使用的是 python 3.6,这可能是我出错的原因吗? @NinaVolfenzon 当您使用显示的第二种使用 udf 的方法时,您是否仍然收到错误?

以上是关于如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?的主要内容,如果未能解决你的问题,请参考以下文章

ADF IF 条件 - 计算源列

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?

ClickHouse:从同列名的select中访问源列

如何在 listView Xamarin Android 中使用具有多个 Textview 列的 ArrayAdapter

SQLFlow的几种关系

通过 vba ms 访问将多值列的数据绑定到组合框中