Pyspark UDF 中自定义 Python 对象的使用

Posted

技术标签:

【中文标题】Pyspark UDF 中自定义 Python 对象的使用【英文标题】:Usage of custom Python object in Pyspark UDF 【发布时间】:2017-10-11 15:38:45 【问题描述】:

运行以下 PySpark 代码时:

nlp = NLPFunctions()

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType()))

我收到以下错误: _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

我想这是因为 PySpark 无法序列化这个自定义类。但是如何避免在每次运行 parse_ingredients_line 函数时实例化这个昂贵的对象的开销?

【问题讨论】:

【参考方案1】:

假设您想使用这样定义的Identity 类 (identity.py):

class Identity(object):                   
    def __getstate__(self):
        raise NotImplementedError("Not serializable")

    def identity(self, x):
        return x

例如,您可以使用可调用对象 (f.py) 并将 Identity 实例存储为类成员:

from identity import Identity

class F(object):                          
    identity = None

    def __call__(self, x):
        if not F.identity:
            F.identity = Identity()
        return F.identity.identity(x)

并如下所示使用这些:

from pyspark.sql.functions import udf
import f

sc.addPyFile("identity.py")
sc.addPyFile("f.py")

f_ = udf(f.F())

spark.range(3).select(f_("id")).show()
+-----+
|F(id)|
+-----+
|    0|
|    1|
|    2|
+-----+

或独立函数和闭包:

from pyspark.sql.functions import udf
import identity

sc.addPyFile("identity.py")

def f(): 
    dict_ =                  
    @udf()              
    def f_(x):                 
        if "identity" not in dict_:
            dict_["identity"] = identity.Identity()
        return dict_["identity"].identity(x)
    return f_


spark.range(3).select(f()("id")).show()
+------+
|f_(id)|
+------+
|     0|
|     1|
|     2|
+------+

【讨论】:

我不太明白这个例子。您在哪里表明您能够在 udf 的执行之间保持状态? @Vitaliy 这是标准的 Python 代码——在这两种情况下,我们都将感兴趣的对象保留在外部范围内,因此它的生命周期不限于范围本身。如果您愿意,可以使用nonlocal 代替可变的dict。显然它不能比你无法控制的父解释器活得更久。否则,您可以轻松添加日志记录并使用调试器查看初始化仅在第一次调用时应用。 这很好用!!超快 - 这就是我们使用 spark 的原因 :) user6910411 - 你确定你的代码没有创建 3 个 Identity 类的实例吗?我检查了你的“独立函数和闭包”示例代码,这就是发生在我身上的事情。 @PawełBatko 此代码将创建与 Spark 生成的执行器解释器一样多的 Identity 实例(请记住,这里没有共享内存,每个执行器“线程”实际上是 PySpark 中的一个进程). 所以实际数量将取决于被重用的执行程序的数量 - 上限是任务总数(包括重新启动的任务)。有更复杂的策略,但这些策略超出了这个特定的范围回答。【参考方案2】:

我基于 (https://github.com/scikit-learn/scikit-learn/issues/6975) 通过使 NLPFunctions 类的所有依赖项可序列化来解决它。

【讨论】:

【参考方案3】:

编辑:这个答案是错误的。对象在广播的时候还是先序列化再反序列化,所以不能避免序列化。 (Tips for properly using large broadcast variables?)


尝试使用broadcast variable。

sc = SparkContext()
nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format.

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))

【讨论】:

以上是关于Pyspark UDF 中自定义 Python 对象的使用的主要内容,如果未能解决你的问题,请参考以下文章

udf(用户定义函数)如何在 pyspark 中工作?

从Pyspark UDF调用另一个自定义Python函数

PySpark 自定义 UDF ModuleNotFoundError: No module named

Spark篇---SparkSQL中自定义UDF和UDAF,开窗函数的应用

在 PySpark 中,有没有办法使用运行时给出的 Python 类的函数来动态注册 UDF? [复制]

PySpark UDF 无法识别参数数量