如何使方法 JSON 可序列化以在自定义 Pyspark 转换器中使用
Posted
技术标签:
【中文标题】如何使方法 JSON 可序列化以在自定义 Pyspark 转换器中使用【英文标题】:How to make a method JSON serializable to be used in a Custom Pyspark Transformer 【发布时间】:2020-05-12 19:35:09 【问题描述】:我正在使用 pyspark(2.3.0) api 来创建自定义转换器。我想创建一个简单的转换器,它可以将任何函数作为参数。我尝试在 TypeConverters 中使用身份。该代码有效。我面临的唯一问题是我无法保存它。它抛出函数对象不是 JSON 可序列化的错误。有没有办法解决这个问题?
我在 param 中发送一个函数对象,因为我想用它来处理 _transform
方法中的数据帧。
所以问题是如何修改此代码,以便我可以通过将其设置为 PipelineModel
对象中的阶段并使用该对象的 pyspark ML 编写器来保存转换器。
这是我从Create a custom Transformer in PySpark ML借来的自定义转换器代码
functions_dict = dict()
def add_to_dict(f):
functions_dict[f.__name__] = f
return f
@add_to_dict
def my_foo( df):
return df.withColumn("dummy", lit(3))
class MyTransformer(
Transformer,
# Credits https://***.com/a/52467470
# by https://***.com/users/234944/benjamin-manns
DefaultParamsReadable, DefaultParamsWritable):
param_dict = Param(Params._dummy(), "param_dict", "function object",
typeConverter=TypeConverters.identity)
@keyword_only
def __init__(self, param_dict=None):
super(MyTransformer, self).__init__()
self.param_dict = Param(self, "param_dict", "")
self._setDefault(param_dict=param_dict)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, param_dict=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def setParamdict(self, value):
return self._set(param_dict=value)
def getParamdict(self):
return self.getOrDefault(self.param_dict)
def _transform(self, dataset):
df = self.getParamdict()(dataset)
return df
spark = (SparkSession.builder.appName("spark_run")
.enableHiveSupport()
.getOrCreate()
)
df = spark.sql("select * from table_name")
t1 = MyTransformer(param_dict=functions_dict["my_foo"])
df3 = t1.transform(df)
df3.printSchema()
df3.show()
stages = [t1]
pmodel = PipelineModel(stages=stages)
pmodel.write().overwrite().save("mytransformer")
pmodel1 = PipelineModel.load("mytransformer")
df2 = pmodel1.transform(df)
df2.printSchema()
df2.show()
我无法保存我的 PipelineModel。我收到以下错误。
Traceback (most recent call last):
File "/Users/code_v1/test.py", line 81, in <module>
pmodel.write().overwrite().save("mytransformer")
File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 135, in save
self.saveImpl(path)
File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/pipeline.py", line 226, in saveImpl
PipelineSharedReadWrite.saveImpl(self.instance, stages, self.sc, path)
File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/pipeline.py", line 363, in saveImpl
.getStagePath(stage.uid, index, len(stages), stagesDir))
File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 135, in save
self.saveImpl(path)
File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 384, in saveImpl
DefaultParamsWriter.saveMetadata(self.instance, path, self.sc)
File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 403, in saveMetadata
paramMap)
File "/Users/anaconda3/envs/spark23/lib/python2.7/site-packages/pyspark/ml/util.py", line 427, in _get_metadata_to_save
return json.dumps(basicMetadata, separators=[',', ':'])
File "/Users/anaconda3/envs/spark23/lib/python2.7/json/__init__.py", line 251, in dumps
sort_keys=sort_keys, **kw).encode(obj)
File "/Users/anaconda3/envs/spark23/lib/python2.7/json/encoder.py", line 207, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/Users/anaconda3/envs/spark23/lib/python2.7/json/encoder.py", line 270, in iterencode
return _iterencode(o, 0)
File "/Users/anaconda3/envs/spark23/lib/python2.7/json/encoder.py", line 184, in default
raise TypeError(repr(o) + " is not JSON serializable")
TypeError: <function my_foo at 0x7f7f384049d0> is not JSON serializable
【问题讨论】:
【参考方案1】:简短的回答是函数不能被序列化。但是,可能解决此问题的一种方法是将字符串映射到有效函数,该函数可以在here 中找到。然后,一旦您加载了所有内容,您就可以将字符串链接回您定义的方法
【讨论】:
以上是关于如何使方法 JSON 可序列化以在自定义 Pyspark 转换器中使用的主要内容,如果未能解决你的问题,请参考以下文章
JSON.Net无法在自定义JsonConverter中反序列化json数组
如何添加 CSS 过渡以在自定义模式窗口上创建打开/关闭效果
Django 如何将自定义变量传递给上下文以在自定义管理模板中使用?
Java Jackson如何在自定义序列化程序中为对象使用默认序列化程序