如何使方法 JSON 可序列化以在自定义 Pyspark 转换器中使用

Posted

技术标签:

【中文标题】如何使方法 JSON 可序列化以在自定义 Pyspark 转换器中使用【英文标题】:How to make a method JSON serializable to be used in a Custom Pyspark Transformer 【发布时间】:2020-05-12 19:35:09 【问题描述】:

我正在使用 pyspark(2.3.0) api 来创建自定义转换器。我想创建一个简单的转换器,它可以将任何函数作为参数。我尝试在 TypeConverters 中使用身份。该代码有效。我面临的唯一问题是我无法保存它。它抛出函数对象不是 JSON 可序列化的错误。有没有办法解决这个问题?

我在 param 中发送一个函数对象,因为我想用它来处理 _transform 方法中的数据帧。 所以问题是如何修改此代码,以便我可以通过将其设置为 PipelineModel 对象中的阶段并使用该对象的 pyspark ML 编写器来保存转换器。

这是我从Create a custom Transformer in PySpark ML借来的自定义转换器代码

functions_dict = dict()
def add_to_dict(f):
    functions_dict[f.__name__] = f
    return f

@add_to_dict
def my_foo( df):
    return df.withColumn("dummy", lit(3))



class MyTransformer(
        Transformer,
        # Credits https://***.com/a/52467470
        # by https://***.com/users/234944/benjamin-manns
        DefaultParamsReadable, DefaultParamsWritable):

    param_dict = Param(Params._dummy(), "param_dict", "function object",
                      typeConverter=TypeConverters.identity)


    @keyword_only
    def __init__(self, param_dict=None):
        super(MyTransformer, self).__init__()
        self.param_dict = Param(self, "param_dict", "")
        self._setDefault(param_dict=param_dict)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, param_dict=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setParamdict(self, value):
        return self._set(param_dict=value)

    def getParamdict(self):
        return self.getOrDefault(self.param_dict)

    def _transform(self, dataset):
        df = self.getParamdict()(dataset)
        return df


spark = (SparkSession.builder.appName("spark_run")
                 .enableHiveSupport()
                 .getOrCreate()
                 )

df = spark.sql("select * from table_name")
t1 = MyTransformer(param_dict=functions_dict["my_foo"])
df3 = t1.transform(df)
df3.printSchema()
df3.show()

stages = [t1]
pmodel = PipelineModel(stages=stages)
pmodel.write().overwrite().save("mytransformer")

pmodel1 = PipelineModel.load("mytransformer")
df2 = pmodel1.transform(df)
df2.printSchema()
df2.show()

我无法保存我的 PipelineModel。我收到以下错误。

Traceback (most recent call last):
  File "/Users/code_v1/test.py", line 81, in <module>
    pmodel.write().overwrite().save("mytransformer")
  File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 135, in save
    self.saveImpl(path)
  File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/pipeline.py", line 226, in saveImpl
    PipelineSharedReadWrite.saveImpl(self.instance, stages, self.sc, path)
  File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/pipeline.py", line 363, in saveImpl
    .getStagePath(stage.uid, index, len(stages), stagesDir))
  File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 135, in save
    self.saveImpl(path)
  File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 384, in saveImpl
    DefaultParamsWriter.saveMetadata(self.instance, path, self.sc)
  File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 403, in saveMetadata
    paramMap)
  File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 427, in _get_metadata_to_save
    return json.dumps(basicMetadata, separators=[',',  ':'])
  File "/Users/anaconda3/envs/spark23/lib/python2.7/json/__init__.py", line 251, in dumps
    sort_keys=sort_keys, **kw).encode(obj)
  File "/Users/anaconda3/envs/spark23/lib/python2.7/json/encoder.py", line 207, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/Users/anaconda3/envs/spark23/lib/python2.7/json/encoder.py", line 270, in iterencode
    return _iterencode(o, 0)
  File "/Users/anaconda3/envs/spark23/lib/python2.7/json/encoder.py", line 184, in default
    raise TypeError(repr(o) + " is not JSON serializable")
TypeError: <function my_foo at 0x7f7f384049d0> is not JSON serializable

【问题讨论】:

【参考方案1】:

简短的回答是函数不能被序列化。但是,可能解决此问题的一种方法是将字符串映射到有效函数,该函数可以在here 中找到。然后,一旦您加载了所有内容,您就可以将字符串链接回您定义的方法

【讨论】:

以上是关于如何使方法 JSON 可序列化以在自定义 Pyspark 转换器中使用的主要内容,如果未能解决你的问题,请参考以下文章

JSON.Net无法在自定义JsonConverter中反序列化json数组

如何添加 CSS 过渡以在自定义模式窗口上创建打开/关闭效果

Django 如何将自定义变量传递给上下文以在自定义管理模板中使用?

Java Jackson如何在自定义序列化程序中为对象使用默认序列化程序

在自定义JsonConverter中反序列化嵌套对象List

如何添加类型约束以在泛型方法中包含任何可序列化的内容?