在 Pyspark 中屏蔽/替换字符串列的内部

Posted

技术标签:

【中文标题】在 Pyspark 中屏蔽/替换字符串列的内部【英文标题】:Mask/replace inner part of string column in Pyspark 【发布时间】:2020-02-25 01:29:41 【问题描述】:

我在数据框中有一个电子邮件列,我想用星号替换其中的一部分。我无法使用 PySpark 函数解决这个问题。

我的电子邮件栏可能是这样的"

email_col
abc123@gmail.com
123abc123@yahoo.com

我想要实现的是:

mod_email_col
ab**23@gmail.com
12*****23@yahoo.com

所以基本上除了前 2 个字符和最后 2 个字符之外,我希望将剩余部分替换为星号。

这是我尝试过的

from pyspark.sql import functions as F

split_email = F.split(df.email_address, "@")
df = df.withColumn('email_part', split_email.getItem(0))
df = df.withColumn('start', df.email_part.substr(0,2))
df = df.withColumn('end', df.email_part.substr(-2,2))

df.withColumn(
    'masked_part', 
     F.expr("regexp_replace(email_part, email_part[email_part.index(start)+len(start):email_part.index(end)], '*')")
).show(n=5)

【问题讨论】:

你应该在@拆分字符串,然后看看我的回答:substring multiple characters from the last index of a pyspark string column using negative indexing 我都试过了。但是我无法弄清楚提取物是如何从第一个 2 到最后一个 2 的。当然,我进行了拆分以获取域之前的部分。立即更新代码 【参考方案1】:

我认为您可以借助以下正则表达式来实现这一点:(?<=.2)\w+(?=.2@)

(?<=.2): 正面向后看 2 个字符 \w+: 任意单词字符 (?=.2@):2 个字符的正向前瞻,后跟文字 @

首先使用regexp_extract 从您的字符串中提取此模式。

from pyspark.sql.functions import regexp_extract, regexp_replace

df = df.withColumn(
    "pattern", 
    regexp_extract("email", r"(?<=.2)\w+(?=.2@)", 0)
)
df.show()
#+-------------------+-------+
#|              email|pattern|
#+-------------------+-------+
#|   abc123@gmail.com|     c1|
#|123abc123@yahoo.com|  3abc1|
#|      abcd@test.com|       |
#+-------------------+-------+

然后使用regexp_replace 创建相同长度的* 的替换。

df = df.withColumn(
    "replacement",
    regexp_replace("pattern", r"\w", "*")
)
df.show()
#+-------------------+-------+-----------+
#|              email|pattern|replacement|
#+-------------------+-------+-----------+
#|   abc123@gmail.com|     c1|         **|
#|123abc123@yahoo.com|  3abc1|      *****|
#|      abcd@test.com|       |           |
#+-------------------+-------+-----------+

接下来使用派生的patternreplacement 列在原始email 列上再次使用regexp_replace

为了安全起见,concat 在替换时从原始模式向后/向前看。为此,我们必须使用expr 才能使用pass the column values as parameters。

from pyspark.sql.functions import concat, expr, lit

df = df.withColumn(
    "mod_email_col",
    expr("regexp_replace(email, concat('(?<=.2)', pattern, '(?=.2@)'), replacement)")
)
df.show()
#+-------------------+-------+-----------+-------------------+
#|              email|pattern|replacement|      mod_email_col|
#+-------------------+-------+-----------+-------------------+
#|   abc123@gmail.com|     c1|         **|   ab**23@gmail.com|
#|123abc123@yahoo.com|  3abc1|      *****|12*****23@yahoo.com|
#|      abcd@test.com|       |           |      abcd@test.com|
#+-------------------+-------+-----------+-------------------+

最后删除中间列:

df = df.drop("pattern", "replacement")
df.show()
#+-------------------+-------------------+
#|              email|      mod_email_col|
#+-------------------+-------------------+
#|   abc123@gmail.com|   ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#|      abcd@test.com|      abcd@test.com|
#+-------------------+-------------------+

注意:我添加了一个测试用例来表明如果电子邮件地址部分为 4 个字符或更少,这将不起作用。


更新:这里有一些方法可以处理电子邮件地址部分少于 4 个字符的边缘情况。

我使用的规则:

电子邮件地址长度超过 5:按上述操作 电子邮件地址长度为 3、4 或 5:保留第一个和最后一个字符,用 * 屏蔽其他字符 电子邮件地址长度为 1 或 2:屏蔽 @ 之前的单个字符

代码:

patA = "regexp_replace(email, concat('(?<=.2)', pattern, '(?=.2@)'), replacement)"
patB = "regexp_replace(email, concat('(?<=.1)', pattern, '(?=.1@)'), replacement)"

from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import concat, expr, length, lit, split, when

df.withColumn("address_part", split("email", "@").getItem(0))\
.withColumn(
    "pattern", 
    when(
        length("address_part") > 5, 
        regexp_extract("email", r"(?<=.2)\w+(?=.2@)", 0)
    ).otherwise(
        regexp_extract("email", r"(?<=.1)\w+(?=.1@)", 0)
    )
).withColumn(
    "replacement", regexp_replace("pattern", r"\w", "*")
).withColumn(
    "mod_email_col",
    when(
        length("address_part") > 5, expr(patA)
    ).when(
        length("address_part") > 3, expr(patB)
    ).otherwise(regexp_replace('email', '\w(?=@)', '*'))
).drop("pattern", "replacement", "address_part").show()
#+-------------------+-------------------+
#|              email|      mod_email_col|
#+-------------------+-------------------+
#|   abc123@gmail.com|   ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#|     abcde@test.com|     a***e@test.com|
#|      abcd@test.com|      a**d@test.com|
#|        ab@test.com|        a*@test.com|
#|         a@test.com|         *@test.com|
#+-------------------+-------------------+

【讨论】:

哇,这太漂亮了。太感谢了!如果我想包含一个只有 4 个字符的字符,比如取第一个字符和最后一个字符并替换两者之间的字符怎么办? @ManasJani 在这种情况下使用 when 检查替换字符串的长度。如果它 > 0 这样做,否则使用不同的模式(未经测试,我认为这就像将 2s 更改为 1s 一样简单。您还必须处理电子邮件只有 2 个字符长的情况.. 所以如果我想保持动态,那么我将不得不使用 when 条件之类的东西? @ManasJani 我发布了一个更新来处理你的边缘情况 非常感谢你,真的帮助我更好地理解了正则表达式。如何将函数参数传递给上面的表达式?喜欢def mask_email(df, email_col):?将值传递给 email_col arg 时出现错误。【参考方案2】:

您的问题可以使用一些字符串操作来简化(Spark SQL 函数:instr、concat、left、repeat、substr):

首先找到@在邮件字符串中的位置:pos_at = instr(email_col, '@'),那么用户名部分的长度就是pos_at - 1。如果我们将N=2作为要保留的字符数,那么要屏蔽的字符数应该是pos_at - 1 - 2*N,在代码中,我们有:

from pyspark.sql.functions import instr, expr

df = spark.createDataFrame(
        [(e,) for e in ['abc123@gmail.com', '123abc123@yahoo.com', 'abd@gmail.com']]
      , ['email_col']
)

# set N=2 as a parameter in the SQL expression
N = 2

df.withColumn('pos_at', instr('email_col', '@')) \
  .withColumn('new_col', expr("""
        CONCAT(LEFT(email_col,0), REPEAT('*', pos_at-1-2*0), SUBSTR(email_col, pos_at-0))
   """.format(N))).show(truncate=False)
#+-------------------+------+-------------------+
#|email_col          |pos_at|new_col            |
#+-------------------+------+-------------------+
#|abc123@gmail.com   |7     |ab**23@gmail.com   |
#|123abc123@yahoo.com|10    |12*****23@yahoo.com|
#|abd@gmail.com      |4     |abbd@gmail.com     |
#+-------------------+------+-------------------+

注意pos_at - 1 &lt;= 2*N时最后一行的问题,必须单独处理。如果我定义以下逻辑:

if `pos_at - 1 <= 2*N`:   keep the first char and mask the rest
otherwise: keep the original processing routine

整个处理过程可以包含在一个带有两个参数(column_nameN)的 lambda 函数中

# in the SQL expression, 0 is column_name and 1 is N
mask_email = lambda col_name, N: expr("""

  IF(INSTR(0, '@') <= 1*2+1
    , CONCAT(LEFT(0,1), REPEAT('*', INSTR(0, '@')-2), SUBSTR(0, INSTR(0, '@')))
    , CONCAT(LEFT(0,1), REPEAT('*', INSTR(0, '@')-1-2*1), SUBSTR(0, INSTR(0, '@')-1))
  ) as `0_masked`

""".format(col_name, N))

df.select('*', mask_email('email_col', 2)).show()
#+-------------------+-------------------+
#|          email_col|   email_col_masked|
#+-------------------+-------------------+
#|   abc123@gmail.com|   ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#|      abd@gmail.com|      a**@gmail.com|
#+-------------------+-------------------+

【讨论】:

以上是关于在 Pyspark 中屏蔽/替换字符串列的内部的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中将字符串列转换为ArrayType

pyspark用正则表达式替换正则表达式

如何在字符串列中应用正则表达式替换

如何在 DataFrame 的字符串列中应用正则表达式替换?

如何查找和替换字符串列中数字之间的空格?

删除 Pandas 中字符串列中的 Dash/Dots 并将其替换为 Null