PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)

Posted

技术标签:

【中文标题】PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)【英文标题】:PySpark UDF optimization challenge using a dictionary with regex's (Scala?) 【发布时间】:2020-08-12 04:47:31 【问题描述】:

我正在尝试优化下面的代码 (PySpark UDF)。

它给了我想要的结果(基于我的数据集),但是在非常大的数据集(大约 180M)上它太慢了。

结果(准确性)优于可用的 Python 模块(例如 geotext、hdx-python-country)。所以我不是在寻找另一个模块。

数据帧:

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],   
  ["Kalverstraat Amsterdam","Mary"],   
  ["Kalverstraat Amsterdam, Netherlands","Lex"] 
]).toDF("address","name")

正则表达式.csv:

iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$

......<many, many more>

regex.csv 创建一个 Pandas DataFrame,按iso2 分组并加入keywords (\bArizona\b|\bTexas\b\bFlorida\b|\bUS$)。

df = pd.read_csv(regex.csv, sep=';')
df_regex = df.groupby('iso2').agg('keywords': '|'.join ).reset_index()

功能:

def get_iso2(x): 
 
    iso2=
    
    for j, row in df_regex.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex, x)
        
        for m in matches:
            iso2[row['iso2']] = iso2.get(row['iso2'], 0) + 1
            
    return [key for key, value in iso2.items() for _ in range(value)]

PySpark UDF:

get_iso2_udf = F.udf(get_iso2, T.ArrayType(T.StringType()))

创建新列:

df_new = df.withColumn('iso2',get_iso2_udf('address')

预期样本输出:

[US,US,NL]
[CA]
[BE,BE,AU]

有些地方出现在多个国家/地区(输入是带有城市、省、州、国家/地区的地址列...)

示例:

3030 Whispering Pines Circle, Prosper Texas, US -> [US,US,US] Kalverstraat 阿姆斯特丹 -> [US,NL] Kalverstraat 阿姆斯特丹,荷兰 -> [US, NL, NL]

也许在 PySpark 中使用 Scala UDF 是一种选择,但我不知道如何。

我们非常感谢您的优化建议!

【问题讨论】:

@anky 编辑了我的问题。我正在使用 PySpark (Spark DataFrame) 在 Spark 集群上运行 Jupyter Notebook。 @anky 正则表达式文件几乎可以解决问题,因为它针对我的数据集进行了优化,并且结果经过高精度验证。 听起来您正在使用类似正则表达式的谓词有效地连接两个表,然后根据第一个表的 ID 进行分组。您可以使用df = df_addresses.crossJoin(df_regex) 连接两个数据框,然后使用df.filter(df('address').rlike(df('keywords')),然后对继承自df_addresses 的ID 列进行分组。这将完全在 Spark 中运行,无需将数据编组到 Python 帮助程序进程或从 Python 助手进程传出。 其实你也可以像df_addresses.join(df_regex, df_addresses('address').rlike(df_regex('keywords')), 'cross')这样一个操作加入过滤。 @JohnDoe - 这是一个很好的问题,如果您添加“我有以下 DataFrame”...“可以使用此代码创建”as described here,您可以做得更好。 【参考方案1】:

IIUC,您可以不使用UDF尝试以下步骤:

from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],
  ["Kalverstraat Amsterdam","Mary"],
  ["Kalverstraat Amsterdam, Netherlands","Lex"],
  ["xvcv", "ddd"]
]).toDF("address","name")

第 1 步: 将 df_regex 转换为 Spark 数据帧 df1 并将 unique_id 添加到 df

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

# create regex patterns:
df_regex = df_regex.groupby('iso2').agg('keywords':lambda x: '(?m)' + '|'.join(x)).reset_index()

df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords                                                                         |
+----+---------------------------------------------------------------------------------+
|CA  |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$                             |
|NL  |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$                                          |
|US  |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+

df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address                                        |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0  |
|Kalverstraat Amsterdam                         |Mary|1  |
|Kalverstraat Amsterdam, Netherlands            |Lex |2  |
|xvcv                                           |ddd |3  |
+-----------------------------------------------+----+---+

第 2 步: 使用 rlike 将 df_regex 左连接到 df

df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
|             address|name| id|iso2|            keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|
|                xvcv| ddd|  3|null|                null|
+--------------------+----+---+----+--------------------+

步骤 3: 通过将 d1.address 除以 d2.keywords 来计算 d1.address 中匹配的 d2.keywords 的数量,然后将生成的 Array 的大小减少 1:

df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|         -2|
+--------------------+----+---+----+--------------------+-----------+

第四步:使用array_repeat将iso2的值重复num_matches次(需要Spark 2.4+):

df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
|             address|name| id|        iso2|            keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John|  0|[US, US, US]|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|        [NL]|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|        [US]|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|    [NL, NL]|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|        [US]|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|          []|                null|         -2|
+--------------------+----+---+------------+--------------------+-----------+

第 5 步: groupby 并进行聚合:

df_new = df4 \
    .groupby('id') \
    .agg(
      first('address').alias('address'),
      first('name').alias('name'),
      flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id|             address|name|   countries|
+---+--------------------+----+------------+
|  0|3030 Whispering P...|John|[US, US, US]|
|  1|Kalverstraat Amst...|Mary|    [NL, US]|
|  3|                xvcv| ddd|          []|
|  2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+

替代方案:第 3 步也可以由 Pandas UDF 处理:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re

@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
    return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])

df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|          0|
+--------------------+----+---+----+--------------------+-----------+

注意事项:

    由于不区分大小写的模式匹配代价高昂,我们将关键字的所有字符(锚点或转义字符除外,如\b\B\A\z)转换为大写。 提醒一下,rlikeregexp_replace 中使用的模式是基于 Java 的,而在 pandas_udf 中是基于 Python 的,这在 regex.csv 中设置模式时可能会略有不同。

方法2:使用pandas_udf

由于使用 join 和 groupby 会触发数据混洗,上述方法可能会很慢。为您的测试再提供一种选择:

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

df_ptn = spark.sparkContext.broadcast(
    df_regex.groupby('iso2').agg('keywords':lambda x: '(?m)' + '|'.join(x))["keywords"].to_dict()
)
df_ptn.value
#'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'

# REF: https://***.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten

def __get_iso2(addr, ptn):   
   return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])

get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+

或者在 pandas_udf 中返回一个数组数组(不带 reduceiconcat)并使用 Spark 执行 flatten

def __get_iso2_2(addr, ptn):
    return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])

get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)

df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()

更新:要查找独特的国家/地区,请执行以下操作:

def __get_iso2_3(addr, ptn):
  return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])

get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
|             address|name|    iso2|
+--------------------+----+--------+
|3030 Whispering P...|John|    [US]|
|Kalverstraat Amst...|Mary|[NL, US]|
|Kalverstraat Amst...| Lex|[NL, US]|
|                xvcv| ddd|      []|
+--------------------+----+--------+

方法3:使用列表推导:

类似于 @CronosNull 的 方法,如果 regex.csv 的列表是可管理的,您可以使用列表推导来处理:

from pyspark.sql.functions import size, split, upper, col, array, expr, flatten

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg('keywords':lambda x: '(?m)' + '|'.join(x))["keywords"].to_dict()

df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])

df1.select(*df.columns, flatten(array(*[ expr("array_repeat('0',`0`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+

【讨论】:

我试试看。看起来很有希望并且有解释! 第 2 步:我没有预期的匹配 df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("d1.address rlike d2.keywords"), "left") 我将其更改为 df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("d1.address rlike d2.keywords"), "cross"),它看起来已经好多了。我会继续测试并及时通知您。 我忘记在 rlike 中将 (?im) 标志添加到模式的开头:d1.address rlike '(?im)'||d2.keywords。理想情况下,这应该在 df1 中处理。另一个建议是使用i标志的区分大小写的匹配通常很昂贵,最好将关键字中的所有字符转换为df1中的大写,删除(i)标志。并改为对UPPER(address) 执行正则表达式操作。 根据cmets做了一些调整,见Step-1和最后的注释。 我接受了你的回答,因为解释更广泛,方法更灵活。这是一个艰难的选择,因为@CronosNull 也很好地帮助了我,而且他的解决方案有时甚至更快。再次感谢您的时间和耐心!【参考方案2】:

注意:根据cmets编辑

我喜欢@jxc 的方法。我采取了稍微不同的方式,仍然没有使用 UDF,也不需要广播正则表达式数据集(您只在驱动程序中使用它)。

设置场景

import re
from io import StringIO
from pyspark.sql.functions import (
    split,
    regexp_replace,
    regexp_extract,
    col,
    size,
    concat,
    lit,
    when,
    array,
    expr,
    array_repeat,
    regexp_extract,
    array_join,
)
from pyspark.sql import DataFrame
import pandas as pd
df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],   
  ["Kalverstraat Amsterdam","Mary"],   
  ["Kalverstraat Amsterdam, Netherlands","Lex"] 
]).toDF("address","name")

sample_data = r"""iso2;keywords
US;\bArizona\b
US:\bTexas\b
US:\bFlorida\b
US;\bChicago\b
US:\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA:\bAlberta\b
CA:\bNova Scotia\b
CA:\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL:\bAmsterdam\b
NL:\bNetherlands\b
NL;\bNL$"""
replace_pd = pd.read_csv(StringIO(sample_data),delimiter='[;:]', engine='python')
#Resample to have a similar number of rows
replace_pd = replace_pd.append([replace_pd]*10000)

通过正则表达式字典的每一行创建一个新列

def replace_dict(df: DataFrame, column: str, replace_pd: pd.DataFrame)->DataFrame:
    """
    returns a dataframe with the required transformations 
    to have a list of iso2 codes, and its number of repeats, based on the column (e.g. address) selected
    """
    _df = (
        df.withColumn("words", col(column))
    )
    #For each row in the csv create a new column
    # it will contains the new value if the original
    # column contains a matching string. 
    i = 0
    cols = []
    #grouping by iso2 code
    grouped_df = replace_pd.groupby('iso2').agg('keywords':lambda x: '(?im)' + '|'.join(x)).reset_index()
    for index, row in grouped_df.iterrows():
        key = row.keywords
        value = row.iso2
        _cr = value
        _df = _df.withColumn(_cr, size(split(col("words"), f"(key)"))-1)
        cols.append(_cr)
        i += 1
    # Join the aux columns, removing the empty strings. 
    _df = _df.withColumn("iso2", array(*[when(col(x)>0,concat(lit(x),lit(":"),col(x))) for x in cols])).withColumn(
        "iso2", expr(r"filter( iso2, x->x NOT rlike '^\s*$')")
    )
    _df = _df.drop("words",*cols) #drop the aux columns.
    return _df

运行测试

replace_dict(df,'address', replace_pd).show(truncate=False)

这给了你一个结果:

+--------------------+----+------------+
|             address|name|        iso2|
+--------------------+----+------------+
|3030 Whispering P...|John|      [US:3]|
|Kalverstraat Amst...|Mary|[NL:1, US:1]|
|Kalverstraat Amst...| Lex|[NL:2, US:1]|
+--------------------+----+------------+

它应该比其他替代方案更快(所有转换都很窄),但这取决于您的 regex.csv 文件的大小(因为它会创建大量稀疏列)。

【讨论】:

非常感谢。我继续测试@jxc 的方法,然后从你的方法开始。 regex.csv 非常大(所有国家/地区) regex.csv 分隔符都是分号。注意到了!! 由于技术问题,我不得不推迟测试。感谢您的帮助,希望能够尽快重新开始测试 它已经运行了一段时间,但我在 Spark UI 中没有看到任何工作。只有 1 个活跃的驱动程序和 1 个死的执行人。 Regex.csv 包含 164.059 行 我接受了@jxc 的回答,因为解释更广泛,方法更灵活。这是一个艰难的选择,因为您也很好地帮助了我,而且您的解决方案有时甚至更快。再次感谢您的时间和耐心!【参考方案3】:

您需要将df_regex 广播到集群中的所有节点,以便每个核心可以并行处理数据。

df_regex_b = spark.sparkContext.broadcast(df_regex)

更新get_iso2 以使用广播变量:

def get_iso2(x, df_regex_b): 
 
    iso2=
    
    for j, row in df_regex_b.value.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex, x)
        
        for m in matches:
            iso2[row['iso2'] = iso2.get(row['iso2'], 0) + 1
            
    return [key for key, value in iso2.items() for _ in range(value)]

使用嵌套函数定义 UDF:

def get_iso2_udf(mapping):
    def f(x):
        return get_iso2(x, mapping)
    return F.udf(f)

【讨论】:

非常感谢。我目前有一个工作正在运行,但一旦它准备好我就会试一试。事实上,我想将df_regex 广播到集群中的所有节点。我的印象是这已经发生了。 它正在运行。我假设我必须将 mapping 更改为 df_regex_breturn F.udf(f) 更改为 return F.udf(f, T.ArrayType(T.StringType())) 不幸的是它没有更快。 这很慢,因为每个分区中的数据都从 JVM 编组到辅助 Python 进程,并且执行了 for 循环。 Python 循环并不是最快的东西,这就是为什么人们应该以编写 Pandas UDF 为目标。但与在 JVM 中执行所有操作相比,即使是 Pandas UDF 也很慢。

以上是关于PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:UDF 将正则表达式应用于数据帧中的每一行

pyspark udf 的可变参数数量

使用 Scala 类作为带有 pyspark 的 UDF

Pyspark:使用带有参数的UDF创建一个新列[重复]

带有 PySpark 2.4 的 Pandas UDF [重复]

带有数据框查询的 PySpark UDF 函数?