如何在 Pyspark 中基于正则表达式条件验证(和删除)列,而无需多次扫描和改组?
Posted
技术标签:
【中文标题】如何在 Pyspark 中基于正则表达式条件验证(和删除)列,而无需多次扫描和改组?【英文标题】:How to validate (and drop) a column based on a regex condition in Pyspark without multiple scanning and shuffling? 【发布时间】:2019-09-30 17:04:47 【问题描述】:我想根据列是否有一个无效条目来验证它们。我的限制是避免洗牌和多次扫描,以使其扩展到 PB。
我尝试使用普通字符串比较来验证列,它有效,但我无法尝试使用正则表达式。我的问题陈述如下:
| Column 1 | Column 2 | Column 3 | Column 4 | Column 5 |
| --------------| --------------| --------------| --------------| --------------|
|(123)-456-7890 | 123-456-7890 |(123)-456-789 | |(123)-456-7890 |
|(123)-456-7890 | 123-4567890 |(123)-456-7890 |(123)-456-7890 | null |
|(123)-456-7890 | 1234567890 |(123)-456-7890 |(123)-456-7890 | null |
有效的格式是:
(xxx)-xxx-xxxx, xxx-xxx-xxxx, xxx-xxxxxxx and xxxxxxxxxx
因此,上述输入的 o/p 应该是:
| Column 1 | Column 2 |
| --------------| --------------|
|(123)-456-7890 | 123-456-7890 |
|(123)-456-7890 | 123-4567890 |
|(123)-456-7890 | 1234567890 |
我目前的代码如下:
import regex as re
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import when
from pyspark.sql import Row
formats = [r'^(?:\(\d3\)-)\d3-\d4$',
r'^(?:\d3-)\d3-\d4$', r'^(?:\d3-)\d7$', r'^\d10$']
def validate_format(number):
length = len(number)
if length == 14:
if (re.match(formats[0], number)):
return True
return False
if length == 12:
if (re.match(formats[1], number)):
return True
return False
if length == 11:
if (re.match(formats[2], number)):
return True
return False
if length == 10:
if (re.match(formats[3], number)):
return True
return False
return False
def create_dataframe(spark):
my_cols = Row("Column1", "Column2", "Column3", "Column4")
row_1 = my_cols('(617)-283-3811', 'Salah', 'Messi', None)
row_2 = my_cols('617-2833811', 'Messi', 'Virgil', 'Messi')
row_3 = my_cols('617-283-3811', 'Ronaldo', 'Messi', 'Ronaldo')
row_seq = [row_1, row_2, row_3]
df = spark.createDataFrame(row_seq)
invalid_counts = invalid_counts_in_df(df)
print(invalid_counts)
def invalid_counts_in_df(df):
invalid_counts = df.select(
*[_sum(when(validate_format(col(c)), lit(0)).otherwise(lit(1))).alias(c) for c in df.columns]).collect()
return invalid_counts
当我像here 那样处理普通字符串时,我成功了。但是,现在我的函数返回一条错误消息:
>>> create_dataframe(spark)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 8, in create_dataframe
File "<stdin>", line 3, in invalid_counts_in_df
File "<stdin>", line 3, in <listcomp>
File "<stdin>", line 2, in validate_format
TypeError: object of type 'Column' has no len()
我没有使用适当的方法来以最有效的方式使列无效或验证。我知道多次扫描和大量洗牌绝对不是要走的路。
我希望找到一种方法来获取所有条目都为有效格式的列。
【问题讨论】:
【参考方案1】:就性能而言,您应该始终尝试使用 pyspark 函数而不是 python 函数。 Pyspark 函数经过优化以利用集群的资源,并且不需要将数据转换为 python 对象。
适用于您的用例的 pyspark 函数是 rlike。看看下面的例子:
from pyspark.sql import Row
my_cols = Row("Column1", "Column2", "Column3", "Column4")
row_1 = my_cols('(617)-283-3811', 'Salah', 'Messi', None)
row_2 = my_cols('617-2833811', 'Messi', 'Virgil', 'Messi')
row_3 = my_cols('617-283-3811', 'Ronaldo', 'Messi', 'Ronaldo')
row_seq = [row_1, row_2, row_3]
df = spark.createDataFrame(row_seq)
numberOfRows = df.count()
#I have simplified your regexes a bit because I don't see a reason
#why you need non capturing groups
expr = "^(\(\d3\)-\d3-\d4)|(\d3-\d3-\d4)|(\d3-\d7)|(\d10)$"
#you can also set it to df.columns
columnsToCheck = ['Column1']
columnsToRemove = []
for col in columnsToCheck:
numberOfMatchingRows = df.filter(df[col].rlike(expr)).count()
if numberOfMatchingRows < numberOfRows:
columnsToRemove.append(col)
df = df.select(*[c for c in df.columns if c not in columnsToRemove])
df.show()
输出:
+--------------+-------+-------+-------+
| Column1|Column2|Column3|Column4|
+--------------+-------+-------+-------+
|(617)-283-3811| Salah| Messi| null|
| 617-2833811| Messi| Virgil| Messi|
| 617-283-3811|Ronaldo| Messi|Ronaldo|
+--------------+-------+-------+-------+
【讨论】:
工作就像一个魅力!感谢您的解释,@cronoik 另外,如果你觉得这个问题足够好,请不要忘记点赞。 :)以上是关于如何在 Pyspark 中基于正则表达式条件验证(和删除)列,而无需多次扫描和改组?的主要内容,如果未能解决你的问题,请参考以下文章
正则表达式在 PySpark Dataframe 列中查找所有不包含 _(Underscore) 和 :(Colon) 的字符串