Pyspark 数组列 - 用默认值替换空元素

Posted

技术标签:

【中文标题】Pyspark 数组列 - 用默认值替换空元素【英文标题】:Pyspark Array Column - Replace Empty Elements with Default Value 【发布时间】:2021-02-09 03:32:01 【问题描述】:

我有一个数据框,其中有一列是字符串数组。数组的某些元素可能会丢失,如下所示:

-------------|-------------------------------
ID           |array_list                      
---------------------------------------------
38292786     |[AAA,, JLT]                    |
38292787     |[DFG]                          |
38292788     |[SHJ, QKJ, AAA, YTR, CBM]      |
38292789     |[DUY, ANK, QJK, POI, CNM, ADD] |
38292790     |[]                             |
38292791     |[]                             |
38292792     |[,,, HKJ]                      |

我想用默认值“ZZZ”替换缺失的元素。有没有办法做到这一点?我尝试了以下代码,它使用了转换函数和正则表达式:

import pyspark.sql.functions as F
from pyspark.sql.dataframe import DataFrame

def transform(self, f):
    return f(self)

DataFrame.transform = transform  
  
df = df.withColumn("array_list2", F.expr("transform(array_list, x -> regexp_replace(x, '', 'ZZZ'))")) 

这不会产生错误,但会产生废话。我在想我只是不知道识别数组缺失元素的正确方法 - 谁能帮帮我?

在生产中,我们的数据大约有 1000 万行,我试图避免使用 explode 或 UDF(不确定是否可以避免同时使用这两种方法,只需要代码尽可能高效地运行)。我正在使用 Spark 2.4.4

这是我希望输出的样子:

-------------|-------------------------------|-------------------------------
ID           |array_list                     | array_list2
---------------------------------------------|-------------------------------
38292786     |[AAA,, JLT]                    |[AAA, ZZZ, JLT]                     
38292787     |[DFG]                          |[DFG]                          
38292788     |[SHJ, QKJ, AAA, YTR, CBM]      |[SHJ, QKJ, AAA, YTR, CBM]      
38292789     |[DUY, ANK, QJK, POI, CNM, ADD] |[DUY, ANK, QJK, POI, CNM, ADD] 
38292790     |[]                             |[ZZZ]                             
38292791     |[]                             |[ZZZ]   
38292792     |[,,, HKJ]                      |[ZZZ, ZZZ, ZZZ, HKJ]           

【问题讨论】:

你试过类似 regexp_replace(x, '^(?![\s\S])', 'ZZZ') 吗? @ggagliano,有趣的是,使用它会将 ZZZ 作为前缀添加到所有非缺失元素! 啊哈哈不错!然后试着去掉开头的^ UDF 的声明在这里不是真正的问题。 【参考方案1】:

regex_replace 在字符级别起作用。

我也无法让它与转换一起使用,但在第一个回答者的帮助下,我使用了 UDF - 没那么容易。

这是我的数据示例,您可以定制。

%python

from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import udf, col

concat_udf = udf(
    lambda con_str, arr: [
        x if x is not None else con_str for x in arr or [None]
    ],
    ArrayType(StringType()),
)

arrayData = [
        ('James',['Java','Scala']),
        ('Michael',['Spark','Java',None]),
        ('Robert',['CSharp','']),
        ('Washington',None),
        ('Jefferson',['1','2'])]

df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages'])
df = df.withColumn("knownLanguages", concat_udf(lit("ZZZ"), col("knownLanguages")))
df.show()

返回:

+----------+------------------+
|      name|    knownLanguages|
+----------+------------------+
|     James|     [Java, Scala]|
|   Michael|[Spark, Java, ZZZ]|
|    Robert|        [CSharp, ]|
|Washington|             [ZZZ]|
| Jefferson|            [1, 2]|
+----------+------------------+

这很困难,得到了第一个回答者的帮助。

【讨论】:

您问了一个关于如何编写 UDF 以使用我的两个答案来回答另一个问题的问题? XD 所以,你得到了 25 分,因为我知道这是可能的,但我还不知道该怎么做,而且必须是可能的,并且可以适用于这里而不是变换。那么,有什么大不了的呢?人们使用彼此的工作。它被称为可重用性,就像 API 一样。欢迎来到现实世界。 这不是批评,我只是觉得这很有趣,而且你做得很好:) 我非常感谢您的意见。【参考方案2】:

我正在考虑一些事情,但我不确定它是否有效。

from pyspark.sql import functions as F

df.withColumn("array_list2", F.split(F.array_join("array_list", ",", "ZZZ"), ","))

首先,我将值连接为带有分隔符, 的字符串(希望您的字符串中没有它,但您可以使用其他东西)。我使用null_replacement 选项来填充null 值。然后我按照相同的分隔符进行拆分。


编辑:根据@thebluephantom 评论,您可以试试这个解决方案:

df.withColumn(
    "array_list_2", F.expr(" transform(array_list, x -> coalesce(x, 'ZZZ'))")
).show()

SQL 内置 transform 不适合我,所以我无法尝试,但希望你会得到你想要的结果。

【讨论】:

在 scala 中,您可以使用 transform,但显然在 python 中不可用。否则,UDF。 转换确实存在于 pyspark 中。 @thebluephantom 这个transform 或者这个transform ? df = df.withColumn("knownLanguages2", F.expr(" transform(knownLanguages, x -> upper(x))")) 嗯。这会将每个单独的字符作为一个元素放在新的数组列中(例如,[A, A, A, |, Z, Z, Z, |, J, L, T])。但它确实摆脱了我的空值,所以我认为你可能是在正确的轨道上,我会玩这个代码。谢谢!

以上是关于Pyspark 数组列 - 用默认值替换空元素的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:如何为数组列中的每个元素添加值?

在 PySpark 中使用列条件替换空值

Pyspark:用字典中的值替换列的值

Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值

Python - 读取文本并写入 csv。将空列替换为默认的“N/A”值

Pyspark:用同名的另一列替换行值