字符串中的 Pyspark 双字符替换避免某些单词而不映射到 pandas 或 rdd

Posted

技术标签:

【中文标题】字符串中的 Pyspark 双字符替换避免某些单词而不映射到 pandas 或 rdd【英文标题】:Pyspark double character replacement in strings avoiding certain words without mapping to pandas or rdd 【发布时间】:2021-03-15 12:59:26 【问题描述】:

我继承了一个程序,它修改了 pyspark 数据帧中的一些字符串。其中一个步骤涉及从字符串中的某些单词中删除双/三/等字母,并附加一个例外列表,即使它们有重复的字母,它们也会被单独留下。目前,这是通过使用 udf 将数据帧转换为 pandas 来完成的,然后将自定义函数应用于生成的 pandas 数据帧中的字符串,然后再读回 pyspark。不幸的是,对要求的更改意味着代码在任何情况下都不能使用 pandas udf 或映射到 rdd。我需要直接在 pyspark 中执行相同的功能。

连续字符删除函数逐字读取字符串,检查该单词是否在例外列表中,如果不在,则逐字符移动,将其与前一个字符进行比较,如果匹配,则检查是否逐个字符创建一个新单词,省略重复。

下面是当前实现的 MWE,在 pyspark 数据帧转换为 pandas 之后。

import pandas as pd


exception_list = ['ACCOUNTING', 'LOOK', 'FOOOOO']

cols = ['input']
data = [
    ["BOOK TOOK LOOK HOUSE SHOOK"],
    ["ACCOUNTING SHEEP"],
    ["FOO FOOO FOOOO FOOOOO FOOOOOO"]
]

df = pd.DataFrame(data, columns=cols)
df.head()


def drop_consecutive_chars(phrase, exception):
    if phrase == '':
        return phrase
    else:
        new_phrase = []
        for word in phrase.split():
            if word not in exception:
                prev = word[0]
                new_word = prev
                for char in word[1:]:
                    if char != prev:
                        new_word += char
                    prev = char
            else:
                new_word = word
            new_phrase += [new_word]
        new_phrase = ' '.join(new_phrase)
        return new_phrase


df['output'] = df['input'].apply(drop_consecutive_chars,
                                 exception=exception_list)

df.head()

在 pyspark 中有什么方法可以做到这一点?我愿意使用 RegexTokenizer 之类的东西并稍后将其加入,以及创建额外的真值列,这些列随后会被删除。它只需要在数据框不离开 pyspark 或映射到其他任何东西的情况下完成。

【问题讨论】:

【参考方案1】:

与我之前的回答类似,您可以使用高阶函数来表示您的 Python 代码的逻辑:

import pyspark.sql.functions as F

df2 = sdf.withColumn(
    'exception_list', 
    F.array(*[F.lit(w) for w in exception_list])
).withColumn(
    'output', 
    F.expr("""
        concat_ws(' ', 
            transform(
                split(input, ' '), 
                w -> case when array_contains(exception_list, w) 
                     then w 
                     else concat_ws('', 
                         transform(
                             split(w, ''), 
                             (c, i) -> case when i = 0 or c != split(w, '')[i-1]
                                            then c
                                            else ''
                                            end
                         )
                     )
                     end
            )
       )
    """)
).drop('exception_list')

df2.show(truncate=False)
+-----------------------------+-----------------------+
|input                        |output                 |
+-----------------------------+-----------------------+
|BOOK TOOK LOOK HOUSE SHOOK   |BOK TOK LOOK HOUSE SHOK|
|ACCOUNTING SHEEP             |ACCOUNTING SHEP        |
|FOO FOOO FOOOO FOOOOO FOOOOOO|FO FO FO FOOOOO FO     |
+-----------------------------+-----------------------+

【讨论】:

我感觉答案会很相似。不幸的是,我不知道表达式代码(我假设这是 Scala?)。我尝试了一段时间来让这样的事情起作用,但是以错误的方式进行。再次感谢你。现在将进行测试。 这不是 Scala - 它是 SQL。但是函数式编程方法(转换/过滤/聚合)非常类似于 Scala 嗯,我收到一个错误:AnalysisException:“lambda 函数参数 '2' 的数量与高阶函数 '1' 预期的参数数量不匹配。它指向表达式内部的某处,可能在 w -> array_contains 行,但我不能确定。 你的 spark 版本是什么? 2.4? 我将代码更改为与 spark 2.4 兼容。

以上是关于字符串中的 Pyspark 双字符替换避免某些单词而不映射到 pandas 或 rdd的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 用 pyspark 中的第一个单词替换 2 个或更多连续单词

如何替换/删除 PySpark RDD 中的正则表达式?

替换不需要的字符时如何防止某些单词一起运行?

用双引号替换单引号并排除某些元素

替换innerHTML中的字符串

查找和替换文件中的单词/行