如何在 pyspark 操作中轻松使用我的自定义类方法?

Posted

技术标签:

【中文标题】如何在 pyspark 操作中轻松使用我的自定义类方法?【英文标题】:How can i easily use my custom class methods among pyspark operations? 【发布时间】:2019-01-29 00:18:18 【问题描述】:

我有一个 Age 类、一个 csv 文件和一个 pyspark 运行时会话

ages.csv

Name;Age
alpha;noise20noise
beta;noi 3 sE 0
gamma;n 4 oi 0 se
phi;n50ise
detla;3no5ise
kappa;No 4 i 5 sE
omega;25noIsE

这实际上读作(在解析 Age 列之后):

Name;Age
alpha;20
beta;30
gamma;40
phi;50
detla;35
kappa;45
omega;25

定义类:年龄 年龄.py

import re

class Age:
    # age is a number representing the age of a person
    def __init__(self, age):
        self.age = age

    def __eq__(self, other):
        return self.age == self.__parse(other)

    def __lt__(self, other):
        return self.age < self.__parse(other)

    def __gt__(self, other):
        return self.age > self.__parse(other)

    def __le__(self, other):
        return self.age <= self.__parse(other)

    def __ge__(self, other):
        return self.age >= self.__parse(other)

    def __parse(self, age):
        return int(''.join(re.findall(r'\d', age)))

# Let's test this class
if __name__ == '__main__':
    print(Age(18) == 'noise18noise')
    print(Age(18) <= 'aka 1 fakj 8 jal')
    print(Age(18) >= 'jaa 18 ka')
    print(Age(18) < '1 kda 9')
    print(Age(18) > 'akfa 1 na 7 noise')

Output:
True
True
True
True
True

测试确实有效。我想在pyspark中使用它

运行 pyspark,读取ages.csv 并导入 Age

Using Python version 3.6.7 (default, Oct 23 2018 19:16:44)
SparkSession available as 'spark'.
>>> ages = spark.read.csv('ages.csv', sep=';', header=True)
19/01/28 14:44:18 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
>>> ages.show()
+-----+------------+
| Name|         Age|
+-----+------------+
|alpha|noise20noise|
| beta|  noi 3 sE 0|
|gamma| n 4 oi 0 se|
|  phi|      n50ise|
|detla|     3no5ise|
|kappa| No 4 i 5 sE|
|omega|     25noIsE|
+-----+------------+

现在我想获取所有年龄为 20 岁的人,例如

>>> from age import Age
>>> ages.filter(ages.Age == Age(20)).show()

这是我得到的错误

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/column.py", line 116, in _
    njc = getattr(self._jc, name)(jc)
File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1248, in __call__
File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in _build_args
File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in <listcomp>
File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 298, in get_command_part
AttributeError: 'Age' object has no attribute '_get_object_id'

所以我的第一个问题是如何解决这个错误

这是我第一次尝试解决这个问题:我将class Age的定义更改为扩展str,如下所示:

年龄.py

...
class Age(str):
    ....

作为第二次尝试:

>>> ages.filter(ages.Age == Age(20)).show()
+----+---+
|Name|Age|
+----+---+
+----+---+

尽管如此,我们仍然有:

>>> 'noise20noise' == Age(20)
True

如您所见,AttributeError: 'Age' object has no attribute '_get_object_id' 消失了,但它没有计算出正确的答案,这是我的第二个问题

这又是我的尝试: 我使用 pyspark 用户定义函数

>>> import pyspark.sql.functions as F
>>> import pyspark.sql.types as T
>>> eq20 = F.udf(lambda c: c == Age(20), T.BooleanType())
>>> ages.filter(eq20(ages.Age)).show()
+-----+------------+
| Name|         Age|
+-----+------------+
|alpha|noise20noise|
+-----+------------+

现在可以了。 但事情是这样的: 我最喜欢第一个成语

>>> ages.filter(ages.Age == Age(20)).show()

更简单,更具表现力。我不想每次都定义像eq20, eq21, less_than50, greater_than30, etc 这样的函数

我可以在 Age 类本身中进行该定义,但我不知道该怎么做。尽管如此,这是我迄今为止尝试使用python decorator

年龄.py

# other imports here
...

import pyspark.sql.functions as F
import pyspark.sql.types as T

def connect_to_pyspark(function):
    return F.udf(function, T.BooleanType())

class Age(str):
    ...

    @connect_to_pyspark
    def __eq__(self, other):
        return self.age == self.__parse(other)

    ...
    # do the same decorator for the other comparative methods

再次测试:

>>> ages.filter(ages.Age == Age(20)).show()
+----+---+
|Name|Age|
+----+---+
+----+---+

而且它不起作用。还是我的装饰器写得不好?

如何解决这一切? 我对第一个问题的解决方案是否足够好?如果没有,应该怎么做?如果是,如何解决第二个问题?

【问题讨论】:

【参考方案1】:

获取ages.Age == Age(20) 将非常困难,因为spark 不遵守python 实现__eq__ 的约定。稍后会详细介绍,但如果你可以做Age(20) == ages.Age,那么你有一些选择。恕我直言,最简单的方法是仅将解析逻辑包装在 udf 中:

parse_udf = F.udf(..., T.IntegerType())
class Age:
    ...
    def __eq__(self, other: Column):
        return F.lit(self.age) == parse_udf(other)

请注意,Age 不是 str 的子类,这只会造成伤害。如果你想使用装饰器,你的装饰器不应该返回udf,它应该返回一个应用udf的函数。像这样:

import re
import pyspark.sql.functions as F
import pyspark.sql.types as T

def connect_to_pyspark(function):
  def helper(age, other):
    myUdf = F.udf(lambda item_from_other: function(age, item_from_other), T.BooleanType())
    return myUdf(other)
  return helper

class Age:

    def __init__(self, age):
      self.age = 45

    def __parse(self, other):
      return int(''.join(re.findall(r'\d', other)))

    @connect_to_pyspark
    def __eq__(self, other):
        return self.age == self.__parse(other)

ages.withColumn("eq20", Age(20) == df.Age).show()

详细了解为什么需要使用 Ages(20) == ages.Age。在python中,如果你做a == b并且a的类不知道如何与b比较,它应该返回NotImplemented然后python会尝试b.__eq__(a),但spark永远不会返回NotImplemented所以__eq__Age 只有在表达式中首先出现时才会被调用:(。

【讨论】:

这仅适用于Age(20) == df.Age 的一种方式,但不适用于另一种方式df.Age == Age(20) 我仍然收到以下错误AttributeError: 'Age' object has no attribute '_get_object_id' 。后者是我真正需要的,因为那时我什至可以做df.Age.between(Age(20), Age(30)) 之类的事情。我希望这可以解决。 @mctrjalloh 在 spark 邮件列表中提出它,如果不更新 pyspark 就无法解决这个问题:( 好的。我会尝试提出这个问题。

以上是关于如何在 pyspark 操作中轻松使用我的自定义类方法?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?

在 PySpark 中写入数据帧的自定义文件名

如何响应从 xib 加载的自定义视图中发生的操作?

如何在 PySpark 中创建自定义 Estimator

PySpark 中 JDBC 上的自定义分区

如何在 Apache Spark (pyspark) 中使用自定义类?