如何在 pySpark 中有效地从字符串数据框中替换多个正则表达式模式的所有实例?
Posted
技术标签:
【中文标题】如何在 pySpark 中有效地从字符串数据框中替换多个正则表达式模式的所有实例?【英文标题】:How can I efficiently replace all instances of multiple regex patterns from a dataframe of strings, in pySpark? 【发布时间】:2019-05-29 20:41:39 【问题描述】:我在 Hadoop 中有一个表,其中包含 70 亿个字符串,这些字符串本身可以包含任何内容。我需要从包含字符串的列中删除每个名称。一个示例字符串是“约翰去了公园”,我需要从中删除“约翰”,理想情况下只需替换为“[名称]”。
在“John and Mary go to market”的情况下,输出将是“[NAME] and [NAME] go to market”。
为了支持这一点,我有一个最常出现的 20k 名称的有序列表。
我可以访问 Hue(Hive、Impala)和 Zeppelin(Spark、Python 和库)来执行此操作。
我已经在数据库中尝试过,但无法更新列或迭代变量使其无法启动,因此使用 Python 和 PySpark 似乎是最佳选择,尤其是考虑到计算数量(20k 名称* 7bil 个输入字符串)
#nameList contains ['John','Emma',etc]
def removeNames(line, nameList):
str_line= line[0]
for name in nameList:
rx = f"(^| |[[:^alpha:]])(name)( |$|[[:^alpha:]])"
str_line = re.sub(rx,'[NAME]', str_line)
str_line= [str_line]
return tuple(str_line)
df = session.sql("select free_text from table")
rdd = df.rdd.map(lambda line: removeNames(line, nameList))
rdd.toDF().show()
代码正在执行,但即使我将输入文本限制为 1000 行(这对 Spark 来说不算什么)也需要一个半小时,并且这些行实际上并没有在最终输出中被替换。
我想知道的是:为什么 map 实际上没有更新 RDD 的行,我怎样才能提高效率,以便在合理的时间内执行?
这是我第一次发帖,所以如果缺少必要的信息,我会尽可能多地填写。
谢谢!
【问题讨论】:
你可以试试pyspark.sql.functions.regexp_replace()
,而不是使用udf
:Spark functions vs UDF performance?
您好 pault,感谢您的评论。我尝试在第一个实例中使用 regex_replace 并且它确实具有所需的结果,但是我看不到如何将 map 与它一起使用并并行化操作。我正在遍历名称列表,在整个数据集上运行 regex_replace,然后转到下一个名称。这是 20k 次迭代,由于沿袭导致堆栈溢出。检查点和通过血统进行黑客攻击有效,但性能仍然太慢。
正则表达式功能是一项繁重的操作,因为func/udf
和regexp_replace
两种解决方案都很慢,也许您可以尝试增加作业并行化。为了更准确地回答这个问题,最好具有正在执行作业的集群的一些特征。您的集群目前有多少个节点,每个节点上有哪些可用资源?
您可以尝试创建一个模式,它是您列表中所有值的 OR。像rx = "(^| |[[:^alpha:]])(name)( |$|[[:^alpha:]])".format(name="|".join(nameList))
这样的东西,然后调用regexp_replace
而不是迭代。
【参考方案1】:
如果您仍然对此感到好奇,通过使用 udf
(您的 removeNames
函数)Spark 将您的所有数据序列化到主节点,基本上打败了您使用 Spark 在分布式时尚。正如 cmets 中建议的方法,如果您使用 regexp_replace()
方法,Spark 将能够将所有数据保留在分布式节点上,保持一切分布式并提高性能。
【讨论】:
以上是关于如何在 pySpark 中有效地从字符串数据框中替换多个正则表达式模式的所有实例?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 PySpark 中为数据框中的所有列替换字符串值与 NULL?
如何对 Pyspark spark.sql 数据框中的数据进行同质化