如何在 pySpark 中有效地从字符串数据框中替换多个正则表达式模式的所有实例?

Posted

技术标签:

【中文标题】如何在 pySpark 中有效地从字符串数据框中替换多个正则表达式模式的所有实例?【英文标题】:How can I efficiently replace all instances of multiple regex patterns from a dataframe of strings, in pySpark? 【发布时间】:2019-05-29 20:41:39 【问题描述】:

我在 Hadoop 中有一个表,其中包含 70 亿个字符串,这些字符串本身可以包含任何内容。我需要从包含字符串的列中删除每个名称。一个示例字符串是“约翰去了公园”,我需要从中删除“约翰”,理想情况下只需替换为“[名称]”。

在“John and Mary go to market”的情况下,输出将是“[NAME] and [NAME] go to market”。

为了支持这一点,我有一个最常出现的 20k 名称的有序列表。

我可以访问 Hue(Hive、Impala)和 Zeppelin(Spark、Python 和库)来执行此操作。

我已经在数据库中尝试过,但无法更新列或迭代变量使其无法启动,因此使用 Python 和 PySpark 似乎是最佳选择,尤其是考虑到计算数量(20k 名称* 7bil 个输入字符串)

#nameList contains ['John','Emma',etc]
def removeNames(line, nameList):
    str_line= line[0]
    for name in nameList:
        rx = f"(^| |[[:^alpha:]])(name)( |$|[[:^alpha:]])"
        str_line = re.sub(rx,'[NAME]', str_line)
    str_line= [str_line]
    return tuple(str_line)

df = session.sql("select free_text from table")
rdd = df.rdd.map(lambda line: removeNames(line, nameList))
rdd.toDF().show()

代码正在执行,但即使我将输入文本限制为 1000 行(这对 Spark 来说不算什么)也需要一个半小时,并且这些行实际上并没有在最终输出中被替换。

我想知道的是:为什么 map 实际上没有更新 RDD 的行,我怎样才能提高效率,以便在合理的时间内执行?

这是我第一次发帖,所以如果缺少必要的信息,我会尽可能多地填写。

谢谢!

【问题讨论】:

你可以试试pyspark.sql.functions.regexp_replace(),而不是使用udf:Spark functions vs UDF performance? 您好 pault,感谢您的评论。我尝试在第一个实例中使用 regex_replace 并且它确实具有所需的结果,但是我看不到如何将 map 与它一起使用并并行化操作。我正在遍历名称列表,在整个数据集上运行 regex_replace,然后转到下一个名称。这是 20k 次迭代,由于沿袭导致堆栈溢出。检查点和通过血统进行黑客攻击有效,但性能仍然太慢。 正则表达式功能是一项繁重的操作,因为func/udfregexp_replace 两种解决方案都很慢,也许您可​​以尝试增加作业并行化。为了更准确地回答这个问题,最好具有正在执行作业的集群的一些特征。您的集群目前有多少个节点,每个节点上有哪些可用资源? 您可以尝试创建一个模式,它是您列表中所有值的 OR。像rx = "(^| |[[:^alpha:]])(name)( |$|[[:^alpha:]])".format(name="|".join(nameList)) 这样的东西,然后调用regexp_replace 而不是迭代。 【参考方案1】:

如果您仍然对此感到好奇,通过使用 udf(您的 removeNames 函数)Spark 将您的所有数据序列化到主节点,基本上打败了您使用 Spark 在分布式时尚。正如 cmets 中建议的方法,如果您使用 regexp_replace() 方法,Spark 将能够将所有数据保留在分布式节点上,保持一切分布式并提高性能。

【讨论】:

以上是关于如何在 pySpark 中有效地从字符串数据框中替换多个正则表达式模式的所有实例?的主要内容,如果未能解决你的问题,请参考以下文章

如何有效地从点创建线串?

如何在 PySpark 中为数据框中的所有列替换字符串值与 NULL?

在 PySpark 数据框中拆分字符串

如何对 Pyspark spark.sql 数据框中的数据进行同质化

PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?

在单个 spark 数据框中减去两个字符串列的最佳 PySpark 实践是啥?