Pyspark:UDF 将正则表达式应用于数据帧中的每一行

Posted

技术标签:

【中文标题】Pyspark:UDF 将正则表达式应用于数据帧中的每一行【英文标题】:Pyspark: UDF to apply regex to each line in dataframe 【发布时间】:2020-10-15 22:16:57 【问题描述】:

我想检查我的数据框中的每一行是否有任何时髦的字符,这些字符在保存文件时可能会弄乱我的架构。

我读过我的文件:

a = spark.read.csv(
    "s3a://mybucket/ML_teradata_feeds/PTEF/AM_PROGRAM_TUNING_EVENT_FACT_01_TO_10_202009.dat-1.gz").unionAll(spark.read.csv(
    "s3a://mybucket/ML_teradata_feeds/PTEF/AM_PROGRAM_TUNING_EVENT_FACT_01_TO_10_202009.dat-2.gz")).unionAll(spark.read.csv(
    "s3a://mybucket/ML_teradata_feeds/PTEF/AM_PROGRAM_TUNING_EVENT_FACT_01_TO_10_202009.dat-3.gz"))

然后从一个正则表达式制作一个UDF,并通过udf运行每一行以查看该行是否符合正则表达式:

import re
from pyspark.sql import functions as f

regex = re.compile('[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9\-\/]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|.*\|.*\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26')

def isInteresting( line ):
    if len(regex.match(line).group(0)) is not None: 
        return True
    else:
        return False

isInterestingUdf = f.udf(isInteresting)
interestingLines = a.withColumn( 'isInteresting', isInterestingUdf('_c0') )

但这只是打印出每一行,而不是过滤掉被正则表达式捕获的行。我错过了什么吗?

【问题讨论】:

【参考方案1】:

首先,您需要修复您的isInteresting 函数。如果不匹配,它将抛出异常。将isInteresting Column 添加到您的DataFrame 后,您需要应用过滤语句isInteresting=True

regex = re.compile('[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9\-\/]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|.*\|.*\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26')

def isInteresting( line ):
    if regex.match(line):
        if len(regex.match(line).group(0)): 
            return True
    return False
isInterestingUdf = f.udf(isInteresting)
interestingLines = a.withColumn( 'isInteresting', isInterestingUdf('_c0') )
#filter only interestingLines
interestingLines = interestingLines.filter('isInteresting=True')

编辑: 我的建议是使用 rlike 函数而不是 udf(它的性能要高得多)

import pyspark.sql.functions as f
a.filter(f.col('_c0').rlike('[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9\-\/]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|.*\|.*\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26'))

【讨论】:

同意rlike 的使用不仅对可维护性更好,而且对性能更重要!

以上是关于Pyspark:UDF 将正则表达式应用于数据帧中的每一行的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark UDF 中使用广播数据帧

pyspark udf 的可变参数数量

PySpark 将算法转换为 UDF 并将其应用于 DataFrame

PySpark 结构化流将 udf 应用于窗口

Apache Spark - 将 UDF 的结果分配给多个数据框列

无法在 pyspark 中应用标量 pandas udf