Pyspark 数组列 - 用默认值替换空元素
Posted
技术标签:
【中文标题】Pyspark 数组列 - 用默认值替换空元素【英文标题】:Pyspark Array Column - Replace Empty Elements with Default Value 【发布时间】:2021-02-09 03:32:01 【问题描述】:我有一个数据框,其中有一列是字符串数组。数组的某些元素可能会丢失,如下所示:
-------------|-------------------------------
ID |array_list
---------------------------------------------
38292786 |[AAA,, JLT] |
38292787 |[DFG] |
38292788 |[SHJ, QKJ, AAA, YTR, CBM] |
38292789 |[DUY, ANK, QJK, POI, CNM, ADD] |
38292790 |[] |
38292791 |[] |
38292792 |[,,, HKJ] |
我想用默认值“ZZZ”替换缺失的元素。有没有办法做到这一点?我尝试了以下代码,它使用了转换函数和正则表达式:
import pyspark.sql.functions as F
from pyspark.sql.dataframe import DataFrame
def transform(self, f):
return f(self)
DataFrame.transform = transform
df = df.withColumn("array_list2", F.expr("transform(array_list, x -> regexp_replace(x, '', 'ZZZ'))"))
这不会产生错误,但会产生废话。我在想我只是不知道识别数组缺失元素的正确方法 - 谁能帮帮我?
在生产中,我们的数据大约有 1000 万行,我试图避免使用 explode 或 UDF(不确定是否可以避免同时使用这两种方法,只需要代码尽可能高效地运行)。我正在使用 Spark 2.4.4
这是我希望输出的样子:
-------------|-------------------------------|-------------------------------
ID |array_list | array_list2
---------------------------------------------|-------------------------------
38292786 |[AAA,, JLT] |[AAA, ZZZ, JLT]
38292787 |[DFG] |[DFG]
38292788 |[SHJ, QKJ, AAA, YTR, CBM] |[SHJ, QKJ, AAA, YTR, CBM]
38292789 |[DUY, ANK, QJK, POI, CNM, ADD] |[DUY, ANK, QJK, POI, CNM, ADD]
38292790 |[] |[ZZZ]
38292791 |[] |[ZZZ]
38292792 |[,,, HKJ] |[ZZZ, ZZZ, ZZZ, HKJ]
【问题讨论】:
你试过类似 regexp_replace(x, '^(?![\s\S])', 'ZZZ') 吗? @ggagliano,有趣的是,使用它会将 ZZZ 作为前缀添加到所有非缺失元素! 啊哈哈不错!然后试着去掉开头的^ UDF 的声明在这里不是真正的问题。 【参考方案1】:regex_replace 在字符级别起作用。
我也无法让它与转换一起使用,但在第一个回答者的帮助下,我使用了 UDF - 没那么容易。
这是我的数据示例,您可以定制。
%python
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import udf, col
concat_udf = udf(
lambda con_str, arr: [
x if x is not None else con_str for x in arr or [None]
],
ArrayType(StringType()),
)
arrayData = [
('James',['Java','Scala']),
('Michael',['Spark','Java',None]),
('Robert',['CSharp','']),
('Washington',None),
('Jefferson',['1','2'])]
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages'])
df = df.withColumn("knownLanguages", concat_udf(lit("ZZZ"), col("knownLanguages")))
df.show()
返回:
+----------+------------------+
| name| knownLanguages|
+----------+------------------+
| James| [Java, Scala]|
| Michael|[Spark, Java, ZZZ]|
| Robert| [CSharp, ]|
|Washington| [ZZZ]|
| Jefferson| [1, 2]|
+----------+------------------+
这很困难,得到了第一个回答者的帮助。
【讨论】:
您问了一个关于如何编写 UDF 以使用我的两个答案来回答另一个问题的问题? XD 所以,你得到了 25 分,因为我知道这是可能的,但我还不知道该怎么做,而且必须是可能的,并且可以适用于这里而不是变换。那么,有什么大不了的呢?人们使用彼此的工作。它被称为可重用性,就像 API 一样。欢迎来到现实世界。 这不是批评,我只是觉得这很有趣,而且你做得很好:) 我非常感谢您的意见。【参考方案2】:我正在考虑一些事情,但我不确定它是否有效。
from pyspark.sql import functions as F
df.withColumn("array_list2", F.split(F.array_join("array_list", ",", "ZZZ"), ","))
首先,我将值连接为带有分隔符,
的字符串(希望您的字符串中没有它,但您可以使用其他东西)。我使用null_replacement
选项来填充null
值。然后我按照相同的分隔符进行拆分。
编辑:根据@thebluephantom 评论,您可以试试这个解决方案:
df.withColumn(
"array_list_2", F.expr(" transform(array_list, x -> coalesce(x, 'ZZZ'))")
).show()
SQL 内置 transform
不适合我,所以我无法尝试,但希望你会得到你想要的结果。
【讨论】:
在 scala 中,您可以使用 transform,但显然在 python 中不可用。否则,UDF。 转换确实存在于 pyspark 中。 @thebluephantom 这个transform 或者这个transform ? df = df.withColumn("knownLanguages2", F.expr(" transform(knownLanguages, x -> upper(x))")) 嗯。这会将每个单独的字符作为一个元素放在新的数组列中(例如,[A, A, A, |, Z, Z, Z, |, J, L, T])。但它确实摆脱了我的空值,所以我认为你可能是在正确的轨道上,我会玩这个代码。谢谢!以上是关于Pyspark 数组列 - 用默认值替换空元素的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值