Pyspark UDF 酸洗错误,无法酸洗 SwigPyObject 对象

Posted

技术标签:

【中文标题】Pyspark UDF 酸洗错误,无法酸洗 SwigPyObject 对象【英文标题】:Pyspark UDF Pickling error, can't pickle SwigPyObject objects 【发布时间】:2019-10-08 09:04:36 【问题描述】:

我正在尝试将 udf 函数应用于由字符串组成的数据框列。函数使用 TensorFlow GUSE 并将字符串转换为浮点数组。

import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import tf_sentencepiece
# Graph set up.
g = tf.Graph()
with g.as_default():
  text_input = tf.placeholder(dtype=tf.string, shape=[None])
  embed = hub.Module("https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/1")
  embedded_text = embed(text_input)
  init_op = tf.group([tf.global_variables_initializer(), tf.tables_initializer()])
g.finalize()

# Initialize session.
session = tf.Session(graph=g)
session.run(init_op)

def embed_mail(x): 
    embedding = session.run(embedded_text, feed_dict=text_input:[x])
    embedding = flatten(embedding)
    result = [np.float32(i).item() for i in embedding]
    return result

但每当我尝试使用以下方式运行此功能时:

embed_mail_udf = udf(embed_mail, ArrayType(FloatType()))
df = df.withColumn('embedding',embed_mail_udf(df.text))

我不断收到错误消息:无法序列化对象:TypeError:无法腌制 SwigPyObject 对象。我做错了什么?

【问题讨论】:

【参考方案1】:

要在集群上运行 UDF 的代码,Spark 需要能够序列化“附加”到该函数的所有数据。你的 UDF embed_mail 包含对 TF Session 的引用,所以函数是 closure,Spark 首先需要序列化 ​​tf.Session 的内容。我敢打赌,这是问题的原因。不幸的是,我没有使用 TF 的经验,但是您似乎可以在运行 Spark 之前从 TF 获取邮件数据,将其广播然后在您的 udf 中使用?

【讨论】:

以上是关于Pyspark UDF 酸洗错误,无法酸洗 SwigPyObject 对象的主要内容,如果未能解决你的问题,请参考以下文章

Python 酸洗错误:TypeError:对象泡菜未返回列表。 numpy的问题?

Python多处理队列酸洗错误

pickle.loads 给出“模块”对象在 Pyspark Pandas Udf 中没有属性“<ClassName>”

酸洗大型 NumPy 数组

如何在 Python 中解释 Dill 的酸洗跟踪输出? (分析(非)酸洗/(反)序列化瓶颈)

Python酸洗EOF问题