带有数据框查询的 PySpark UDF 函数?

Posted

技术标签:

【中文标题】带有数据框查询的 PySpark UDF 函数?【英文标题】:PySpark UDF function with data frame query? 【发布时间】:2019-02-02 11:55:50 【问题描述】:

我有另一个解决方案,但我更喜欢使用 PySpark 2.3 来做。

我有一个这样的二维 PySpark 数据框:

Date       | ID
---------- | ----
08/31/2018 | 10
09/31/2018 | 10
09/01/2018 | null
09/01/2018 | null
09/01/2018 | 12

我想通过查找过去最接近的值来替换ID null 值,或者如果该值为 null,则向前看(如果再次为 null,则设置默认值)

我曾设想用.withColumn 添加一个新列,并使用一个UDF 函数来查询数据框本身。

类似的伪代码(不完美,但它是主要思想):

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def return_value(value,date):

    if value is not null:
        return val

    value1 = df.filter(df['date']<= date).select(df['value']).collect()

    if (value1)[0][0] is not null:
        return (value1)[0][0]

    value2 = df.filter(tdf['date']>= date).select(df['value']).collect()
        return (value2)[0][0]


value_udf = udf(return_value,StringType())
new_df = tr.withColumn("new_value", value_udf(df.value,df.date))

但它不起作用。我完全走错了路吗?是否只能在 UDF 函数中查询 Spark 数据帧?我错过了一个更简单的解决方案吗?

【问题讨论】:

在您的示例中,您有 3 行具有相同的日期,其中 2 行带有空值。在这种情况下,您试图获得的预期结果是什么?您想从 09/31/2018 行中为两个空值获取 10 还是只为第一个空值和 12(从最后一行)获取第二个空记录?看着你的熊猫代码,我假设是前者。 【参考方案1】:

创建具有一列的新数据框 - 所有日期的唯一列表:

datesDF = yourDF.select('Date').distinct()

创建另一个由日期和 ID 组成的,但只有那些没有空值的。并且还让我们只保留每个日期的第一次(无论是第一次)出现的 ID(从您的示例来看,每个日期可以有多行)

noNullsDF = yourDF.dropna().dropDuplicates(subset='Date')

现在让我们加入这两个,这样我们就可以得到所有日期的列表,其中包含我们所拥有的任何值(或 null)

joinedDF = datesDF.join(noNullsDF, 'Date', 'left')

现在对于每个日期,使用窗口函数从上一个日期和下一个日期获取 ID 的值,还可以重命名我们的 ID 列,以便以后加入问题会更少:

from pyspark.sql.window import Window
from pyspark.sql import functions as f
w = Window.orderBy('Date')

joinedDF = joinedDF.withColumn('previousID',f.lag('ID').over(w)) 
                   .withColumn('nextID',f.lead('ID').over(w))
                   .withColumnRenamed('ID','newID') 

现在让我们按日期将它加入到我们原来的 Dataframe 中

yourDF = yourDF.join(joinedDF, 'Date', 'left')

现在我们的 Dataframe 有 4 个 ID 列:

    原身份证 newID - 给定日期的任何非空值的 ID(如果有或为空) previousID - 上一日期的 ID(如果有或 null,则为非 null) nextID - 下一个日期的 ID(如果有,则为非 null 或 null)

现在我们需要将它们按顺序组合成 finalID:

    如果不为空,则为原始值 如果结果不为空,则为当前日期的值(如果存在任何非空值(这与您的问题相反,但熊猫代码建议您进行 如果不为空,则为上一个日期的值 如果不为空,则为下一个日期的值 一些默认值

我们只是通过合并来做到这一点:

default = 0
finalDF = yourDF.select('Date', 
                        'ID',
                        f.coalesce('ID',
                                   'newID',
                                   'previousID',
                                   'nextID',
                                   f.lit(default)).alias('finalID')
                       )

【讨论】:

以上是关于带有数据框查询的 PySpark UDF 函数?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:使用带有参数的UDF创建一个新列[重复]

pyspark:将多个数据框字段传递给 udf

Pyspark:访问 UDF 中行内的列

pyspark 数据框 UDF 异常处理

Pyspark:将UDF的结果迭代地写回数据框不会产生预期的结果

在 for 循环中使用 udf 在 Pyspark 中创建多个列