我只需要在 pyspark 数据框中附加那些具有非空值的人

Posted

技术标签:

【中文标题】我只需要在 pyspark 数据框中附加那些具有非空值的人【英文标题】:I need to append only those who has non null values in pyspark dataframe 【发布时间】:2019-08-23 13:43:02 【问题描述】:

我的 pyspark 数据框 (df) 具有以下示例表 (table1): id, col1, col2, col3 1, abc, null, def 2、空、def、abc 3、def、abc、null

我试图通过忽略空值附加所有列来获取新列(最终)。 我尝试了 pyspark 代码并使用了 f.array(col1, col2, col3)。值正在附加,但它不会忽略空值。我也尝试过 UDF 仅附加非空列,但它不起作用。

import pyspark.sql.functions as f    
df = spark.table('table1')
df = df.withColumn('final', f.array(col1,col2,col3))

Actual result:
id, col1, col2, col3, final
1, abc, null, def, [abc,,def]
2, null, def, abc, [,def, abc]
3, def, abc, null, [def,abc,,]

expected result:
id, col1, col2, col3, final
1, abc, null, def, [abc,def]
2, null, def, abc, [def, abc]
3, def, abc, null, [def,abc]


my col1, col2, col3 schema are as below:
where as col1 name is applications


applications: struct (nullable = false)
    applicationid: string (nullable = true)
    createdat: string (nullable = true)
    updatedat: string (nullable = true)
    source_name: string (nullable = true)
    status: string (nullable = true)
    creditbureautypeid: string (nullable = true)
    score: integer (nullable = true)
    applicationcreditreportid: string (nullable = true)
    firstname: string (nullable = false)
    lastname: string (nullable = false)
    dateofbirth: string (nullable = false)
    accounts: array (nullable = true)
        element: struct (containsNull = true)
        applicationcreditreportaccountid: string (nullable = true)
        currentbalance: integer (nullable = true)
        institutionid: string (nullable = true)
        accounttypeid: string (nullable = true)
        dayspastdue: integer (nullable = true)
        institution_name: string (nullable = true)
        account_type_name: string (nullable = true) 

如果问题不清楚或需要更多信息,请告诉我。 任何帮助,将不胜感激。 :)

【问题讨论】:

其实pyspark的版本大概不会有什么影响:How to remove nulls with array_remove Spark SQL Built-in Function 【参考方案1】:

从 Spark 2.4 开始,您可以使用高阶函数来执行此操作(不需要 UDF)。在 PySpark 中,查询可能如下所示:

result = (
    df
    .withColumn("temp", f.array("col1", "col2", "col3"))
    .withColumn("final", f.expr("FILTER(temp, x -> x is not null)"))
    .drop("temp")
)

【讨论】:

伟大的@David。它的作品:) 我尝试了多种解决方案,但这就像一个魅力。非常感谢【参考方案2】:

使用 UDF

from pyspark.sql.functions import udf, array

def join_columns(row_list):
    return [cell_val for cell_val in row_list if cell_val is not None]

join_udf = udf(join_columns)

df = spark.table('table1')
df = df.withColumn('final', join_udf(array(col1,col2,col3))

不仅适用于 3 列,还适用于多列,只需编辑数组内的列。

【讨论】:

感谢您的回复!它的工作但是我的列架构很复杂。我以字符串列为例。当我定义函数时,我也必须给出返回类型。我已经提到了我的列架构。【参考方案3】:

您可以如下定义自己的UDF

def only_not_null(st,nd,rd):
   return [x for x in  locals().values() if x is not None]  # Take non empty columns

然后调用:

df = spark.table('table1')
df = df.withColumn('final', f.udf(only_not_null)(col1,col2,col3))

【讨论】:

你为什么要使用locals().values() 同时使用 if x 是不正确的,因为这会过滤掉任何 Falsey 值,例如 0''

以上是关于我只需要在 pyspark 数据框中附加那些具有非空值的人的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回

PySpark Dataframe:将一个单词附加到列的每个值

PySpark:基于数据框中具有 UUID 的列添加新列

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

如何在 pyspark 数据框中读取 csv 文件时读取选定的列?

Pyspark 在数据框中合并 WrappedArrays