Pyspark:UDF 将正则表达式应用于数据帧中的每一行
Posted
技术标签:
【中文标题】Pyspark:UDF 将正则表达式应用于数据帧中的每一行【英文标题】:Pyspark: UDF to apply regex to each line in dataframe 【发布时间】:2020-10-15 22:16:57 【问题描述】:我想检查我的数据框中的每一行是否有任何时髦的字符,这些字符在保存文件时可能会弄乱我的架构。
我读过我的文件:
a = spark.read.csv(
"s3a://mybucket/ML_teradata_feeds/PTEF/AM_PROGRAM_TUNING_EVENT_FACT_01_TO_10_202009.dat-1.gz").unionAll(spark.read.csv(
"s3a://mybucket/ML_teradata_feeds/PTEF/AM_PROGRAM_TUNING_EVENT_FACT_01_TO_10_202009.dat-2.gz")).unionAll(spark.read.csv(
"s3a://mybucket/ML_teradata_feeds/PTEF/AM_PROGRAM_TUNING_EVENT_FACT_01_TO_10_202009.dat-3.gz"))
然后从一个正则表达式制作一个UDF,并通过udf运行每一行以查看该行是否符合正则表达式:
import re
from pyspark.sql import functions as f
regex = re.compile('[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9\-\/]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|.*\|.*\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26')
def isInteresting( line ):
if len(regex.match(line).group(0)) is not None:
return True
else:
return False
isInterestingUdf = f.udf(isInteresting)
interestingLines = a.withColumn( 'isInteresting', isInterestingUdf('_c0') )
但这只是打印出每一行,而不是过滤掉被正则表达式捕获的行。我错过了什么吗?
【问题讨论】:
【参考方案1】:首先,您需要修复您的isInteresting
函数。如果不匹配,它将抛出异常。将isInteresting
Column 添加到您的DataFrame 后,您需要应用过滤语句isInteresting=True
regex = re.compile('[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9\-\/]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|.*\|.*\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26')
def isInteresting( line ):
if regex.match(line):
if len(regex.match(line).group(0)):
return True
return False
isInterestingUdf = f.udf(isInteresting)
interestingLines = a.withColumn( 'isInteresting', isInterestingUdf('_c0') )
#filter only interestingLines
interestingLines = interestingLines.filter('isInteresting=True')
编辑:
我的建议是使用 rlike
函数而不是 udf(它的性能要高得多)
import pyspark.sql.functions as f
a.filter(f.col('_c0').rlike('[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,10\|[0-9]0,19\|[0-9\-\/]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9]0,10\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|.*\|.*\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26\|[0-9\s\-:\.]0,26'))
【讨论】:
同意rlike
的使用不仅对可维护性更好,而且对性能更重要!以上是关于Pyspark:UDF 将正则表达式应用于数据帧中的每一行的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 将算法转换为 UDF 并将其应用于 DataFrame