PySpark UDF 优化挑战

Posted

技术标签:

【中文标题】PySpark UDF 优化挑战【英文标题】:PySpark UDF optimization challenge 【发布时间】:2020-07-10 19:06:47 【问题描述】:

我正在尝试优化下面的代码。当运行 1000 行数据时,大约需要 12 分钟才能完成。我们的用例将要求数据大小在 25K 到 50K 行左右,这将使此实现完全不可行。

import pyspark.sql.types as Types
import numpy
import spacy
from pyspark.sql.functions import udf

inputPath = "s3://myData/part-*.parquet"
df = spark.read.parquet(inputPath)

test_df = df.select('uid', 'content').limit(1000).repartition(10)

# print(df.rdd.getNumPartitions()) -> 4
# print(test_df.rdd.getNumPartitions()) -> 1

def load_glove(fn):
    vector_dict = 
    count = 0
    with open(fn) as inf:
        for line in inf:
            count += 1
            eles = line.strip().split()
            token = eles[0]
            try:
                vector_dict[token] = numpy.array([float(x) for x in eles[1:]])
                assert len(vector_dict[token]) == 300
            except:
                print("Exception in load_glove")
                pass
    return vector_dict

# Returning an Array of doubles from the udf
@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = spacy.load('en', max_length=6000000)
  gloveEmbeddingsPath = "/home/hadoop/short_glove_1000.300d.txt"
  glove_embeddings_dict = load_glove(gloveEmbeddingsPath)
  spacy_doc = nlp(text)
  doc_vec = numpy.array([0.0] * 300)
  doc_vec = numpy.float32(doc_vec)
  wordcount = 0
  for sentence_id, sentence in enumerate(spacy_doc.sents):
      for word in sentence:
          if word.text in glove_embeddings_dict:
              # Pre-convert to glove dictionary to float32 representations
              doc_vec += numpy.float32(glove_embeddings_dict[word.text])
              wordcount += 1

  # Document Vector is the average of all word vectors in the document
  doc_vec = doc_vec/(1.0 * wordcount)
  return doc_vec.tolist()

spark.udf.register("generateVectorRepresentation", generateVectorRepresentation)

document_vector_df = test_df.withColumn("Glove Document Vector", generateVectorRepresentation('content'))

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_document_vector_df = document_vector_df.toPandas()

# print(pandas_document_vector_df)
pandas_document_vector_df.head()

我想知道你们是否可以帮助回答以下问题

每次迭代都会调用 spacy.load() 和 load_glove() 方法吗? 有没有办法为每个工作节点准备一次 load_glove() 数据,而不是为每行数据准备一次? load_glove 方法返回一个字典对象,它可能大到 5GB。有没有办法在主节点上准备它,然后作为参数传递给 UDF?

感谢您的建议。提前致谢!

【问题讨论】:

【参考方案1】:

是的,在当前实现中,每次运行您的函数时都会执行所有模型加载代码,这远非最优。没有办法将它从驱动程序直接传递到工作节点,但有一种类似的方法 - 在每个工作人员上初始化模型,但只有一次。为此,您必须使用惰性函数,该函数仅在需要实际结果时才会执行 - 因此,在工人身上。

尝试做这样的事情:

# Here we are not loading the model at the loading time, only the worker code
# will invoke this routine and gets the spacy object. Which means we are loading
# new spacy models on every executors.
SPACY_MODEL = None
def get_spacy_model():
    global SPACY_MODEL
    if not SPACY_MODEL:
       _model = spacy.load('en', max_length=6000000)
    SPACY_MODEL = _model
    return SPACY_MODEL

@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = get_spacy_model()
  # your further processing

我认为您可以尝试将手套加载代码添加到类似的函数中。

您可以尝试在此处阅读更多相关信息:https://haridas.in/run-spacy-jobs-on-apache-spark.html(这不是我的博客,只是在尝试使用 Spacy 模型做同样的事情时发现了此信息)。

【讨论】:

Rai 非常感谢您的回复,这对于加载 spacy 模型和 glove 字典都有效。它带来了巨大的性能提升!感谢您的回复。对上述模式的一个小修正是 SPACY_MODEL = _model 行需要位于 if 语句中。 我尝试使用来自不同库的另一个类似的 nlp 管道进行复制,但遇到了这个问题:***.com/questions/65565976/… UDF 并行运行时会导致竞争条件吗?【参考方案2】:

使 udf-s 如此缓慢的主要原因是它们无法通过 spark 优化(它们被视为黑匣子)。因此,为了使其更快,您需要尽可能多地取出并用香草火花功能代替它。理想的做法是只将 spacy 部分(我不熟悉该模块)留在 udf 中,得到结果 DF,然后使用 vanilla spark 函数进行其余的转换。

例如,load_glove() 将按照其他答案的说明对每一行执行。但是看代码,它的格式好像可以变成301列的dataframe。然后你可以加入它以获得所需的值。 (如果你能让另一个 DF 以 word.text 作为键,没有数据就有点难以判断,但理论上看起来是可能的)。

【讨论】:

感谢您的回复。是的,我也在探索这种方式。

以上是关于PySpark UDF 优化挑战的主要内容,如果未能解决你的问题,请参考以下文章

PySpark UDF 返回可变大小的元组

如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?

udf(用户定义函数)如何在 pyspark 中工作?

Pyspark:从 Python 到 Pyspark 实现 lambda 函数和 udf

Pyspark:访问 UDF 中行内的列

在 PySpark 中重新加载 UDF