如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?
Posted
技术标签:
【中文标题】如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?【英文标题】:How to add multiple columns to pyspark DF using pandas_udf with multiple source columns? 【发布时间】:2021-01-14 11:21:11 【问题描述】:我需要根据时区从utc_timestamp
中提取其日期和时间到两个不同的列中。时区名称由配置常量变量中的id
定义。
Input DF Output DF
+-------------+--+ +-------------+--+----------+----+
|utc_timestamp|id| |utc_timestamp|id|date |hour|
+-------------+--+ +-------------+--+----------+----|
|1608000000782|1 | |1608000000782|1 |2020-12-14|20 |
+-------------+--+ +-------------+--+----------+----+
|1608000240782|2 | |1608000240782|2 |2020-12-15|11 |
+-------------+--+ +-------------+--+----------+----+
我有 pandas_udf,它允许我一次提取一列,我必须创建它两次:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType, IntegerType
import pandas as pd
import pytz
TIMEZONE_LIST = 1: 'America/Chicago', 2: 'Asia/Tokyo'
class TimezoneUdfProvider(object):
def __init__(self):
self.extract_date_udf = pandas_udf(self._extract_date, DateType(), PandasUDFType.SCALAR)
self.extract_hour_udf = pandas_udf(self._extract_hour, IntegerType(), PandasUDFType.SCALAR)
def _extract_date(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
return pd.Series([extract_date(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])
def _extract_hour(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
return pd.Series([extract_hour(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])
def extract_date(utc_timestamp: int, id: str):
timezone_name = TIMEZONE_LIST[id]
timezone_nw = pytz.timezone(timezone_name)
return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).date()
def extract_hour(utc_timestamp: int, id: str) -> int:
timezone_name = TIMEZONE_LIST[id]
timezone_nw = pytz.timezone(timezone_name)
return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).hour
def extract_from_utc(df: DataFrame) -> DataFrame:
timezone_udf1 = TimezoneUdfProvider()
df_with_date = df.withColumn('date', timezone_udf1.extract_date_udf(f.col(utc_timestamp), f.col(id)))
timezone_udf2 = TimezoneUdfProvider()
df_with_hour = df_with_date.withColumn('hour', timezone_udf2.extract_hour_udf(f.col(utc_timestamp), f.col(id)))
return df_with_hour
有没有更好的方法呢?不需要两次使用同一个 udf 提供程序?
【问题讨论】:
【参考方案1】:您可以在不使用 udf 的情况下使用 spark 内置函数来做到这一点。
我们可以使用create_map
映射字典并创建新的时区列,然后使用from_unixtime
和from_utc_timestamp
将时区作为新映射的列进行转换。一旦我们根据时区获得时间戳,我们就可以获取小时和日期字段。
TIMEZONE_LIST = 1: 'America/Chicago', 2: 'Asia/Tokyo'
import pyspark.sql.functions as F
from itertools import chain
map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])
final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
.withColumn("Timestamp",
F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
.withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
.drop("Timestamp"))
final.show()
(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields]
+-------------+---+---------------+----------+----+
|utc_timestamp| id| TimeZone| date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782| 1|America/Chicago|2020-12-14| 20|
|1608000240782| 2| Asia/Tokyo|2020-12-15| 11|
+-------------+---+---------------+----------+----+
编辑:将create_map
替换为udf
:
import pyspark.sql.functions as F
from pyspark.sql.functions import StringType
TIMEZONE_LIST = 1: 'America/Chicago', 2: 'Asia/Tokyo'
def fun(x):
return TIMEZONE_LIST.get(x,None)
map_udf = F.udf(fun,StringType())
final = (df.withColumn("TimeZone", map_udf("id")).withColumn("Timestamp",
F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
.withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
.drop("Timestamp"))
final.show()
【讨论】:
create_map
对我不起作用,我收到错误消息:Traceback (most recent call last): File "<input>", line 1, in <module> File "<input>", line 1, in <listcomp> File "/home/ninav/venvs/acutecare-rdb-nlp-processor/lib/python3.6/site-packages/pyspark/sql/functions.py", line 44, in _ jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) AttributeError: 'NoneType' object has no attribute '_jvm'
我尝试了示例 TIMEZONE_LIST 并收到错误
@NinaVolfenzon 更新了我使用 udf 而不是 create_map
的答案,您可以在共享的示例数据框上尝试第二种方法吗?
我按照您在第一个示例中展示的方式进行了导入。我使用的是 python 3.6,这可能是我出错的原因吗?
@NinaVolfenzon 当您使用显示的第二种使用 udf 的方法时,您是否仍然收到错误?以上是关于如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?
如何在 listView Xamarin Android 中使用具有多个 Textview 列的 ArrayAdapter