PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)
Posted
技术标签:
【中文标题】PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)【英文标题】:PySpark UDF optimization challenge using a dictionary with regex's (Scala?) 【发布时间】:2020-08-12 04:47:31 【问题描述】:我正在尝试优化下面的代码 (PySpark UDF)。
它给了我想要的结果(基于我的数据集),但是在非常大的数据集(大约 180M)上它太慢了。
结果(准确性)优于可用的 Python 模块(例如 geotext、hdx-python-country)。所以我不是在寻找另一个模块。
数据帧:
df = spark.createDataFrame([
["3030 Whispering Pines Circle, Prosper Texas, US","John"],
["Kalverstraat Amsterdam","Mary"],
["Kalverstraat Amsterdam, Netherlands","Lex"]
]).toDF("address","name")
正则表达式.csv:
iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$
......<many, many more>
从regex.csv
创建一个 Pandas DataFrame,按iso2
分组并加入keywords
(\bArizona\b|\bTexas\b\bFlorida\b|\bUS$
)。
df = pd.read_csv(regex.csv, sep=';')
df_regex = df.groupby('iso2').agg('keywords': '|'.join ).reset_index()
功能:
def get_iso2(x):
iso2=
for j, row in df_regex.iterrows():
regex = re.compile(row['keywords'],re.I|re.M)
matches = re.finditer(regex, x)
for m in matches:
iso2[row['iso2']] = iso2.get(row['iso2'], 0) + 1
return [key for key, value in iso2.items() for _ in range(value)]
PySpark UDF:
get_iso2_udf = F.udf(get_iso2, T.ArrayType(T.StringType()))
创建新列:
df_new = df.withColumn('iso2',get_iso2_udf('address')
预期样本输出:
[US,US,NL]
[CA]
[BE,BE,AU]
有些地方出现在多个国家/地区(输入是带有城市、省、州、国家/地区的地址列...)
示例:
3030 Whispering Pines Circle, Prosper Texas, US -> [US,US,US]
Kalverstraat 阿姆斯特丹 -> [US,NL]
Kalverstraat 阿姆斯特丹,荷兰 -> [US, NL, NL]
也许在 PySpark 中使用 Scala UDF 是一种选择,但我不知道如何。
我们非常感谢您的优化建议!
【问题讨论】:
@anky 编辑了我的问题。我正在使用 PySpark (Spark DataFrame) 在 Spark 集群上运行 Jupyter Notebook。 @anky 正则表达式文件几乎可以解决问题,因为它针对我的数据集进行了优化,并且结果经过高精度验证。 听起来您正在使用类似正则表达式的谓词有效地连接两个表,然后根据第一个表的 ID 进行分组。您可以使用df = df_addresses.crossJoin(df_regex)
连接两个数据框,然后使用df.filter(df('address').rlike(df('keywords'))
,然后对继承自df_addresses
的ID 列进行分组。这将完全在 Spark 中运行,无需将数据编组到 Python 帮助程序进程或从 Python 助手进程传出。
其实你也可以像df_addresses.join(df_regex, df_addresses('address').rlike(df_regex('keywords')), 'cross')
这样一个操作加入过滤。
@JohnDoe - 这是一个很好的问题,如果您添加“我有以下 DataFrame”...“可以使用此代码创建”as described here,您可以做得更好。
【参考方案1】:
IIUC,您可以不使用UDF尝试以下步骤:
from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd
df = spark.createDataFrame([
["3030 Whispering Pines Circle, Prosper Texas, US","John"],
["Kalverstraat Amsterdam","Mary"],
["Kalverstraat Amsterdam, Netherlands","Lex"],
["xvcv", "ddd"]
]).toDF("address","name")
第 1 步: 将 df_regex 转换为 Spark 数据帧 df1
并将 unique_id 添加到 df
。
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
# create regex patterns:
df_regex = df_regex.groupby('iso2').agg('keywords':lambda x: '(?m)' + '|'.join(x)).reset_index()
df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords |
+----+---------------------------------------------------------------------------------+
|CA |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$ |
|NL |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$ |
|US |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+
df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0 |
|Kalverstraat Amsterdam |Mary|1 |
|Kalverstraat Amsterdam, Netherlands |Lex |2 |
|xvcv |ddd |3 |
+-----------------------------------------------+----+---+
第 2 步: 使用 rlike 将 df_regex 左连接到 df
df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
| address|name| id|iso2| keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...|
| xvcv| ddd| 3|null| null|
+--------------------+----+---+----+--------------------+
步骤 3: 通过将 d1.address
除以 d2.keywords
来计算 d1.address
中匹配的 d2.keywords
的数量,然后将生成的 Array 的大小减少 1:
df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| -2|
+--------------------+----+---+----+--------------------+-----------+
第四步:使用array_repeat将iso2
的值重复num_matches
次(需要Spark 2.4+):
df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
| address|name| id| iso2| keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John| 0|[US, US, US]|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| [NL]|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| [US]|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| [NL, NL]|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| [US]|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3| []| null| -2|
+--------------------+----+---+------------+--------------------+-----------+
第 5 步: groupby 并进行聚合:
df_new = df4 \
.groupby('id') \
.agg(
first('address').alias('address'),
first('name').alias('name'),
flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id| address|name| countries|
+---+--------------------+----+------------+
| 0|3030 Whispering P...|John|[US, US, US]|
| 1|Kalverstraat Amst...|Mary| [NL, US]|
| 3| xvcv| ddd| []|
| 2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+
替代方案:第 3 步也可以由 Pandas UDF 处理:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re
@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])
df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| 0|
+--------------------+----+---+----+--------------------+-----------+
注意事项:
-
由于不区分大小写的模式匹配代价高昂,我们将关键字的所有字符(锚点或转义字符除外,如
\b
、\B
、\A
、\z
)转换为大写。
提醒一下,rlike
和 regexp_replace
中使用的模式是基于 Java 的,而在 pandas_udf 中是基于 Python 的,这在 regex.csv 中设置模式时可能会略有不同。
方法2:使用pandas_udf
由于使用 join 和 groupby 会触发数据混洗,上述方法可能会很慢。为您的测试再提供一种选择:
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = spark.sparkContext.broadcast(
df_regex.groupby('iso2').agg('keywords':lambda x: '(?m)' + '|'.join(x))["keywords"].to_dict()
)
df_ptn.value
#'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'
# REF: https://***.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten
def __get_iso2(addr, ptn):
return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])
get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US, US, US]|
|Kalverstraat Amst...|Mary| 1| [NL, US]|
|Kalverstraat Amst...| Lex| 2|[NL, NL, US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
或者在 pandas_udf 中返回一个数组数组(不带 reduce
和 iconcat
)并使用 Spark 执行 flatten
:
def __get_iso2_2(addr, ptn):
return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])
get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)
df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()
更新:要查找独特的国家/地区,请执行以下操作:
def __get_iso2_3(addr, ptn):
return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])
get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
| address|name| iso2|
+--------------------+----+--------+
|3030 Whispering P...|John| [US]|
|Kalverstraat Amst...|Mary|[NL, US]|
|Kalverstraat Amst...| Lex|[NL, US]|
| xvcv| ddd| []|
+--------------------+----+--------+
方法3:使用列表推导:
类似于 @CronosNull 的 方法,如果 regex.csv 的列表是可管理的,您可以使用列表推导来处理:
from pyspark.sql.functions import size, split, upper, col, array, expr, flatten
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg('keywords':lambda x: '(?m)' + '|'.join(x))["keywords"].to_dict()
df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])
df1.select(*df.columns, flatten(array(*[ expr("array_repeat('0',`0`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US, US, US]|
|Kalverstraat Amst...|Mary| 1| [NL, US]|
|Kalverstraat Amst...| Lex| 2|[NL, NL, US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
【讨论】:
我试试看。看起来很有希望并且有解释! 第 2 步:我没有预期的匹配df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("d1.address rlike d2.keywords"), "left")
我将其更改为 df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("d1.address rlike d2.keywords"), "cross")
,它看起来已经好多了。我会继续测试并及时通知您。
我忘记在 rlike 中将 (?im) 标志添加到模式的开头:d1.address rlike '(?im)'||d2.keywords
。理想情况下,这应该在 df1 中处理。另一个建议是使用i
标志的区分大小写的匹配通常很昂贵,最好将关键字中的所有字符转换为df1中的大写,删除(i)标志。并改为对UPPER(address)
执行正则表达式操作。
根据cmets做了一些调整,见Step-1和最后的注释。
我接受了你的回答,因为解释更广泛,方法更灵活。这是一个艰难的选择,因为@CronosNull 也很好地帮助了我,而且他的解决方案有时甚至更快。再次感谢您的时间和耐心!【参考方案2】:
注意:根据cmets编辑
我喜欢@jxc 的方法。我采取了稍微不同的方式,仍然没有使用 UDF,也不需要广播正则表达式数据集(您只在驱动程序中使用它)。
设置场景
import re
from io import StringIO
from pyspark.sql.functions import (
split,
regexp_replace,
regexp_extract,
col,
size,
concat,
lit,
when,
array,
expr,
array_repeat,
regexp_extract,
array_join,
)
from pyspark.sql import DataFrame
import pandas as pd
df = spark.createDataFrame([
["3030 Whispering Pines Circle, Prosper Texas, US","John"],
["Kalverstraat Amsterdam","Mary"],
["Kalverstraat Amsterdam, Netherlands","Lex"]
]).toDF("address","name")
sample_data = r"""iso2;keywords
US;\bArizona\b
US:\bTexas\b
US:\bFlorida\b
US;\bChicago\b
US:\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA:\bAlberta\b
CA:\bNova Scotia\b
CA:\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL:\bAmsterdam\b
NL:\bNetherlands\b
NL;\bNL$"""
replace_pd = pd.read_csv(StringIO(sample_data),delimiter='[;:]', engine='python')
#Resample to have a similar number of rows
replace_pd = replace_pd.append([replace_pd]*10000)
通过正则表达式字典的每一行创建一个新列
def replace_dict(df: DataFrame, column: str, replace_pd: pd.DataFrame)->DataFrame:
"""
returns a dataframe with the required transformations
to have a list of iso2 codes, and its number of repeats, based on the column (e.g. address) selected
"""
_df = (
df.withColumn("words", col(column))
)
#For each row in the csv create a new column
# it will contains the new value if the original
# column contains a matching string.
i = 0
cols = []
#grouping by iso2 code
grouped_df = replace_pd.groupby('iso2').agg('keywords':lambda x: '(?im)' + '|'.join(x)).reset_index()
for index, row in grouped_df.iterrows():
key = row.keywords
value = row.iso2
_cr = value
_df = _df.withColumn(_cr, size(split(col("words"), f"(key)"))-1)
cols.append(_cr)
i += 1
# Join the aux columns, removing the empty strings.
_df = _df.withColumn("iso2", array(*[when(col(x)>0,concat(lit(x),lit(":"),col(x))) for x in cols])).withColumn(
"iso2", expr(r"filter( iso2, x->x NOT rlike '^\s*$')")
)
_df = _df.drop("words",*cols) #drop the aux columns.
return _df
运行测试
replace_dict(df,'address', replace_pd).show(truncate=False)
这给了你一个结果:
+--------------------+----+------------+
| address|name| iso2|
+--------------------+----+------------+
|3030 Whispering P...|John| [US:3]|
|Kalverstraat Amst...|Mary|[NL:1, US:1]|
|Kalverstraat Amst...| Lex|[NL:2, US:1]|
+--------------------+----+------------+
它应该比其他替代方案更快(所有转换都很窄),但这取决于您的 regex.csv 文件的大小(因为它会创建大量稀疏列)。
【讨论】:
非常感谢。我继续测试@jxc 的方法,然后从你的方法开始。 regex.csv 非常大(所有国家/地区)regex.csv
分隔符都是分号。注意到了!!
由于技术问题,我不得不推迟测试。感谢您的帮助,希望能够尽快重新开始测试
它已经运行了一段时间,但我在 Spark UI 中没有看到任何工作。只有 1 个活跃的驱动程序和 1 个死的执行人。 Regex.csv 包含 164.059 行
我接受了@jxc 的回答,因为解释更广泛,方法更灵活。这是一个艰难的选择,因为您也很好地帮助了我,而且您的解决方案有时甚至更快。再次感谢您的时间和耐心!【参考方案3】:
您需要将df_regex
广播到集群中的所有节点,以便每个核心可以并行处理数据。
df_regex_b = spark.sparkContext.broadcast(df_regex)
更新get_iso2
以使用广播变量:
def get_iso2(x, df_regex_b):
iso2=
for j, row in df_regex_b.value.iterrows():
regex = re.compile(row['keywords'],re.I|re.M)
matches = re.finditer(regex, x)
for m in matches:
iso2[row['iso2'] = iso2.get(row['iso2'], 0) + 1
return [key for key, value in iso2.items() for _ in range(value)]
使用嵌套函数定义 UDF:
def get_iso2_udf(mapping):
def f(x):
return get_iso2(x, mapping)
return F.udf(f)
【讨论】:
非常感谢。我目前有一个工作正在运行,但一旦它准备好我就会试一试。事实上,我想将df_regex
广播到集群中的所有节点。我的印象是这已经发生了。
它正在运行。我假设我必须将 mapping
更改为 df_regex_b
和 return F.udf(f)
更改为 return F.udf(f, T.ArrayType(T.StringType()))
不幸的是它没有更快。
这很慢,因为每个分区中的数据都从 JVM 编组到辅助 Python 进程,并且执行了 for
循环。 Python 循环并不是最快的东西,这就是为什么人们应该以编写 Pandas UDF 为目标。但与在 JVM 中执行所有操作相比,即使是 Pandas UDF 也很慢。以上是关于PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)的主要内容,如果未能解决你的问题,请参考以下文章