PySpark UDF 优化挑战
Posted
技术标签:
【中文标题】PySpark UDF 优化挑战【英文标题】:PySpark UDF optimization challenge 【发布时间】:2020-07-10 19:06:47 【问题描述】:我正在尝试优化下面的代码。当运行 1000 行数据时,大约需要 12 分钟才能完成。我们的用例将要求数据大小在 25K 到 50K 行左右,这将使此实现完全不可行。
import pyspark.sql.types as Types
import numpy
import spacy
from pyspark.sql.functions import udf
inputPath = "s3://myData/part-*.parquet"
df = spark.read.parquet(inputPath)
test_df = df.select('uid', 'content').limit(1000).repartition(10)
# print(df.rdd.getNumPartitions()) -> 4
# print(test_df.rdd.getNumPartitions()) -> 1
def load_glove(fn):
vector_dict =
count = 0
with open(fn) as inf:
for line in inf:
count += 1
eles = line.strip().split()
token = eles[0]
try:
vector_dict[token] = numpy.array([float(x) for x in eles[1:]])
assert len(vector_dict[token]) == 300
except:
print("Exception in load_glove")
pass
return vector_dict
# Returning an Array of doubles from the udf
@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
# TODO: move the load function out if posible, and remove unused modules
# nlp = spacy.load('en', disable=['parser', 'tagger'])
nlp = spacy.load('en', max_length=6000000)
gloveEmbeddingsPath = "/home/hadoop/short_glove_1000.300d.txt"
glove_embeddings_dict = load_glove(gloveEmbeddingsPath)
spacy_doc = nlp(text)
doc_vec = numpy.array([0.0] * 300)
doc_vec = numpy.float32(doc_vec)
wordcount = 0
for sentence_id, sentence in enumerate(spacy_doc.sents):
for word in sentence:
if word.text in glove_embeddings_dict:
# Pre-convert to glove dictionary to float32 representations
doc_vec += numpy.float32(glove_embeddings_dict[word.text])
wordcount += 1
# Document Vector is the average of all word vectors in the document
doc_vec = doc_vec/(1.0 * wordcount)
return doc_vec.tolist()
spark.udf.register("generateVectorRepresentation", generateVectorRepresentation)
document_vector_df = test_df.withColumn("Glove Document Vector", generateVectorRepresentation('content'))
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_document_vector_df = document_vector_df.toPandas()
# print(pandas_document_vector_df)
pandas_document_vector_df.head()
我想知道你们是否可以帮助回答以下问题
每次迭代都会调用 spacy.load() 和 load_glove() 方法吗? 有没有办法为每个工作节点准备一次 load_glove() 数据,而不是为每行数据准备一次? load_glove 方法返回一个字典对象,它可能大到 5GB。有没有办法在主节点上准备它,然后作为参数传递给 UDF?
感谢您的建议。提前致谢!
【问题讨论】:
【参考方案1】:是的,在当前实现中,每次运行您的函数时都会执行所有模型加载代码,这远非最优。没有办法将它从驱动程序直接传递到工作节点,但有一种类似的方法 - 在每个工作人员上初始化模型,但只有一次。为此,您必须使用惰性函数,该函数仅在需要实际结果时才会执行 - 因此,在工人身上。
尝试做这样的事情:
# Here we are not loading the model at the loading time, only the worker code
# will invoke this routine and gets the spacy object. Which means we are loading
# new spacy models on every executors.
SPACY_MODEL = None
def get_spacy_model():
global SPACY_MODEL
if not SPACY_MODEL:
_model = spacy.load('en', max_length=6000000)
SPACY_MODEL = _model
return SPACY_MODEL
@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
# TODO: move the load function out if posible, and remove unused modules
# nlp = spacy.load('en', disable=['parser', 'tagger'])
nlp = get_spacy_model()
# your further processing
我认为您可以尝试将手套加载代码添加到类似的函数中。
您可以尝试在此处阅读更多相关信息:https://haridas.in/run-spacy-jobs-on-apache-spark.html(这不是我的博客,只是在尝试使用 Spacy 模型做同样的事情时发现了此信息)。
【讨论】:
Rai 非常感谢您的回复,这对于加载 spacy 模型和 glove 字典都有效。它带来了巨大的性能提升!感谢您的回复。对上述模式的一个小修正是 SPACY_MODEL = _model 行需要位于 if 语句中。 我尝试使用来自不同库的另一个类似的 nlp 管道进行复制,但遇到了这个问题:***.com/questions/65565976/… UDF 并行运行时会导致竞争条件吗?【参考方案2】:使 udf-s 如此缓慢的主要原因是它们无法通过 spark 优化(它们被视为黑匣子)。因此,为了使其更快,您需要尽可能多地取出并用香草火花功能代替它。理想的做法是只将 spacy
部分(我不熟悉该模块)留在 udf 中,得到结果 DF,然后使用 vanilla spark 函数进行其余的转换。
例如,load_glove()
将按照其他答案的说明对每一行执行。但是看代码,它的格式好像可以变成301列的dataframe。然后你可以加入它以获得所需的值。 (如果你能让另一个 DF 以 word.text
作为键,没有数据就有点难以判断,但理论上看起来是可能的)。
【讨论】:
感谢您的回复。是的,我也在探索这种方式。以上是关于PySpark UDF 优化挑战的主要内容,如果未能解决你的问题,请参考以下文章
如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?