在 pyspark UDF 中使用类方法

Posted

技术标签:

【中文标题】在 pyspark UDF 中使用类方法【英文标题】:Using a class method inside of a pyspark UDF 【发布时间】:2019-11-25 17:52:53 【问题描述】:

你好数据工程师!

我正在尝试使用名为 Astral 的类中的方法编写 pyspark udf

这里是 udf:

def time_from_solar_noon(d, y):
    noon = astral.Astral().solar_noon_utc
    time = noon(d, y)
    return time 

solarNoon = F.udf(lambda d, y: time_from_solar_noon(d,y), TimestampType())

现在我理解它的方式,该类将为我的数据框中的每一行实例化,从而导致工作非常缓慢。

如果我从我的函数中取出类实例化:

noon = astral.Astral().solar_noon_utc

def time_from_solar_noon(d, y):
    time = noon(d, y)
    return time 

我收到以下错误消息:

  [Previous line repeated 326 more times]
    RecursionError: maximum recursion depth exceeded while calling a Python object

所以这是我的问题,我认为应该可以通过执行程序/线程至少有一个类实例化,而不是在我的数据框中逐行实例化,我该怎么做?

感谢您的帮助

【问题讨论】:

你可以试试astral_ = astral.Astral(); solarNoon = F.udf(astral_.solar_noon_utc, TimestampType()) 其他类可以按照你说的方式实例化。导致RecursionError 的行是File "/opt/conda/lib/python3.7/site-packages/astral.py", line 1576, in __getattr__ for name, value in self._groups.items(): @cylim 你能开发吗,这来自Geocoder类,在这种情况下我什至没有使用(我已经有纬度/经度)数据,我将用简化的功能制作一个叉子 @Manu Valdés 我尝试过并得到同样的错误 您是否考虑过使用 mapPartitions 而不是 UDF? 【参考方案1】:

就像使用数据库连接一样,您可以使用mapPartitions 仅实例化有限数量的这些类实例:

In [1]: from datetime import date
   ...: from astral import Astral
   ...: 
   ...: df = spark.createDataFrame(
   ...:     ((date(2019, 10, 4), 0),
   ...:      (date(2019, 10, 4), 19)),
   ...:     schema=("date", "longitude"))
   ...: 
   ...: 
   ...: def solar_noon(rows):
   ...:     a = Astral()  # initialize the class once per partition
   ...:     return ((a.solar_noon_utc(date=r.date, longitude=r.longitude), *r)
   ...:             for r in rows)  # reuses the same Astral instance for all rows in this partition
   ...: 
   ...: 
   ...: (df.rdd
   ...:  .mapPartitions(solar_noon)
   ...:  .toDF(schema=("solar_noon_utc", *df.columns))
   ...:  .show()
   ...:  )
   ...: 
   ...:  
+-------------------+----------+---------+                                      
|     solar_noon_utc|      date|longitude|
+-------------------+----------+---------+
|2019-10-04 13:48:58|2019-10-04|        0|
|2019-10-04 12:32:58|2019-10-04|       19|
+-------------------+----------+---------+

这是相当有效的,因为函数 (solar_noon) 被分配给每个工作人员,并且每个分区只初始化一次该类,它可以容纳许多行。

【讨论】:

以上是关于在 pyspark UDF 中使用类方法的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark withcolumn中使用udf和class

在 PySpark Pandas UDF 中指定用户定义函数的正确方法

如何在 pyspark 操作中轻松使用我的自定义类方法?

SparkSession 中的 udf 和 pyspark.sql.functions 中的 udf 有啥区别

pickle.loads 给出“模块”对象在 Pyspark Pandas Udf 中没有属性“<ClassName>”

在 PySpark 中,有没有办法使用运行时给出的 Python 类的函数来动态注册 UDF? [复制]