如何在 Pyspark 中基于正则表达式条件验证(和删除)列,而无需多次扫描和改组?

Posted

技术标签:

【中文标题】如何在 Pyspark 中基于正则表达式条件验证(和删除)列,而无需多次扫描和改组?【英文标题】:How to validate (and drop) a column based on a regex condition in Pyspark without multiple scanning and shuffling? 【发布时间】:2019-09-30 17:04:47 【问题描述】:

我想根据列是否有一个无效条目来验证它们。我的限制是避免洗牌和多次扫描,以使其扩展到 PB。

我尝试使用普通字符串比较来验证列,它有效,但我无法尝试使用正则表达式。我的问题陈述如下:


| Column 1      | Column 2      | Column 3      | Column 4      | Column 5      |
| --------------| --------------| --------------| --------------| --------------|
|(123)-456-7890 | 123-456-7890  |(123)-456-789  |               |(123)-456-7890 |
|(123)-456-7890 | 123-4567890   |(123)-456-7890 |(123)-456-7890 | null          |
|(123)-456-7890 | 1234567890    |(123)-456-7890 |(123)-456-7890 | null          |

有效的格式是:

(xxx)-xxx-xxxx, xxx-xxx-xxxx, xxx-xxxxxxx and xxxxxxxxxx

因此,上述输入的 o/p 应该是:

| Column 1      | Column 2      |
| --------------| --------------| 
|(123)-456-7890 | 123-456-7890  |
|(123)-456-7890 | 123-4567890   |
|(123)-456-7890 | 1234567890    |

我目前的代码如下:

import regex as re
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import when
from pyspark.sql import Row

formats = [r'^(?:\(\d3\)-)\d3-\d4$',
           r'^(?:\d3-)\d3-\d4$', r'^(?:\d3-)\d7$', r'^\d10$']


def validate_format(number):
    length = len(number)
    if length == 14:
        if (re.match(formats[0], number)):
            return True
        return False
    if length == 12:
        if (re.match(formats[1], number)):
            return True
        return False
    if length == 11:
        if (re.match(formats[2], number)):
            return True
        return False
    if length == 10:
        if (re.match(formats[3], number)):
            return True
        return False
    return False


def create_dataframe(spark):
    my_cols = Row("Column1", "Column2", "Column3", "Column4")
    row_1 = my_cols('(617)-283-3811', 'Salah', 'Messi', None)
    row_2 = my_cols('617-2833811', 'Messi', 'Virgil', 'Messi')
    row_3 = my_cols('617-283-3811', 'Ronaldo', 'Messi', 'Ronaldo')
    row_seq = [row_1, row_2, row_3]
    df = spark.createDataFrame(row_seq)
    invalid_counts = invalid_counts_in_df(df)
    print(invalid_counts)


def invalid_counts_in_df(df):
    invalid_counts = df.select(
        *[_sum(when(validate_format(col(c)), lit(0)).otherwise(lit(1))).alias(c) for c in df.columns]).collect()
    return invalid_counts

当我像here 那样处理普通字符串时,我成功了。但是,现在我的函数返回一条错误消息:

>>> create_dataframe(spark)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 8, in create_dataframe
  File "<stdin>", line 3, in invalid_counts_in_df
  File "<stdin>", line 3, in <listcomp>
  File "<stdin>", line 2, in validate_format
TypeError: object of type 'Column' has no len()

我没有使用适当的方法来以最有效的方式使列无效或验证。我知道多次扫描和大量洗牌绝对不是要走的路。

我希望找到一种方法来获取所有条目都为有效格式的列。

【问题讨论】:

【参考方案1】:

就性能而言,您应该始终尝试使用 pyspark 函数而不是 python 函数。 Pyspark 函数经过优化以利用集群的资源,并且不需要将数据转换为 python 对象。

适用于您的用例的 pyspark 函数是 rlike。看看下面的例子:

from pyspark.sql import Row

my_cols = Row("Column1", "Column2", "Column3", "Column4")
row_1 = my_cols('(617)-283-3811', 'Salah', 'Messi', None)
row_2 = my_cols('617-2833811', 'Messi', 'Virgil', 'Messi')
row_3 = my_cols('617-283-3811', 'Ronaldo', 'Messi', 'Ronaldo')
row_seq = [row_1, row_2, row_3]

df = spark.createDataFrame(row_seq)

numberOfRows = df.count()

#I have simplified your regexes a bit because I don't see a reason 
#why you need non capturing groups 
expr = "^(\(\d3\)-\d3-\d4)|(\d3-\d3-\d4)|(\d3-\d7)|(\d10)$"

#you can also set it to df.columns
columnsToCheck = ['Column1']
columnsToRemove = []

for col in columnsToCheck:
    numberOfMatchingRows = df.filter(df[col].rlike(expr)).count()
    if numberOfMatchingRows < numberOfRows:
        columnsToRemove.append(col)

df = df.select(*[c for c in df.columns if c not in columnsToRemove])
df.show()

输出:

+--------------+-------+-------+-------+
|       Column1|Column2|Column3|Column4|
+--------------+-------+-------+-------+ 
|(617)-283-3811|  Salah|  Messi|   null| 
|   617-2833811|  Messi| Virgil|  Messi| 
|  617-283-3811|Ronaldo|  Messi|Ronaldo| 
+--------------+-------+-------+-------+

【讨论】:

工作就像一个魅力!感谢您的解释,@cronoik 另外,如果你觉得这个问题足够好,请不要忘记点赞。 :)

以上是关于如何在 Pyspark 中基于正则表达式条件验证(和删除)列,而无需多次扫描和改组?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 正则表达式以两个条件提取字符串

基于数据的 ajv 条件模式验证

正则表达式在 PySpark Dataframe 列中查找所有不包含 _(Underscore) 和 :(Colon) 的字符串

无法读取基于正则表达式的文件 spark

如何在 pySpark 中有效地从字符串数据框中替换多个正则表达式模式的所有实例?

如何使用 java 正则表达式验证字符串?