Pyspark UDF 无法使用大字典

Posted

技术标签:

【中文标题】Pyspark UDF 无法使用大字典【英文标题】:Pyspark UDF unable to use large dictionary 【发布时间】:2019-08-19 15:55:00 【问题描述】:

我有一个字典,其中包含键 = 单词、值 = 300 个浮点数的数组。 我无法在我的 pyspark UDF 中使用这本字典。 当这个字典的大小是 200 万键时,它不起作用。但是当我将大小减小到 200K 时,它就可以工作了。

这是我将函数转换为 UDF 的代码

def get_sentence_vector(sentence, dictionary_containing_word_vectors):
     cleanedSentence = list(clean_text(sentence))  
     words_vector_list = np.zeros(300)# 300 dimensional vector
     for x in cleanedSentence:
          try: 
               words_vector_list = np.add(words_vector_list, dictionary_containing_word_vectors[str(x)])
          except Exception as e:
               print("Exception caught while finding word vector from Fast text pretrained model Dictionary: ",e)
     return words_vector_list.tolist()

这是我的 UDF

get_sentence_vector_udf = F.udf(lambda val: get_sentence_vector(val, fast_text_dictionary), ArrayType(FloatType()))

这就是我如何调用 udf 作为列添加到我的数据框中

dmp_df_with_vectors = df.filter(df.item_name.isNotNull()).withColumn("sentence_vector", get_sentence_vector_udf(df.item_name))

这是错误的堆栈跟踪

Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/broadcast.py", line 83, in dump
    pickle.dump(value, f, 2)
SystemError: error return without exception set
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1957, in wrapper
    return udf_obj(*args)
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1916, in __call__
    judf = self._judf
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1900, in _judf
    self._judf_placeholder = self._create_judf()
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1909, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1866, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2377, in _prepare_for_python_RDD
    broadcast = sc.broadcast(pickled_command)
  File "/usr/lib/spark/python/pyspark/context.py", line 799, in broadcast
    return Broadcast(self, value, self._pickled_broadcast_vars)
  File "/usr/lib/spark/python/pyspark/broadcast.py", line 74, in __init__
    self._path = self.dump(value, f)
  File "/usr/lib/spark/python/pyspark/broadcast.py", line 90, in dump
    raise pickle.PicklingError(msg)
cPickle.PicklingError: Could not serialize broadcast: SystemError: error return without exception set

【问题讨论】:

是序列化的问题。 cPickle 无法处理大物体。尝试广播变量,看看是否有帮助。 【参考方案1】:

你的fast_text_dictionary 在 2M 的情况下有多大?它可能太大了。 在运行udf 之前先尝试broadcast。例如

broadcastVar = sc.broadcast(fast_text_dictionary)

然后在您的udf 中使用broadcastVar

见document for broadcast

【讨论】:

您好,fast_text_dictionary 的大小为 100663576 字节。我在广播 py4j.protocol.Py4JJavaError 时收到以下错误:调用 z:org.apache.spark.api.python.PythonRDD.readBroadcastFromFile 时出错。 : java.lang.NullPointerException

以上是关于Pyspark UDF 无法使用大字典的主要内容,如果未能解决你的问题,请参考以下文章

PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)

为啥 pySpark 无法仅运行 udf 函数?

在 PySpark 中使用 pandas_udf 时无法填充数组

Pyspark UDF 酸洗错误,无法酸洗 SwigPyObject 对象

无法在 pyspark 中应用标量 pandas udf

PySpark UDF 无法识别参数数量