Pyspark:根据每行空值的数量过滤数据框

Posted

技术标签:

【中文标题】Pyspark:根据每行空值的数量过滤数据框【英文标题】:Pyspark: Filtering Dataframe based on number of null values per row 【发布时间】:2017-04-10 09:46:13 【问题描述】:

我正在使用 pyspark 我有一张这样的表:

id |  ClientNum  | Value |      Date     | Age   |   Country  |   Job
 1 |      19     |   A   |   1483695000  |  21   |    null    |   null
 2 |      19     |   A   |   1483696500  |  21   |    France  |   null
 3 |      19     |   A   |   1483697800  |  21   |    France  |  Engineer
 4 |      19     |   B   |   1483699000  |  21   |    null    |   null
 5 |      19     |   B   |   1483699500  |  21   |    France  |   null
 6 |      19     |   B   |   1483699800  |  21   |    France  |  Engineer
 7 |      24     |   C   |   1483699200  |  null |    null    |   null
 8 |      24     |   D   |   1483699560  |  28   |    Spain   |   null
 9 |      24     |   D   |   1483699840  |  28   |    Spain   |  Student

基于列 Value,我想为每个 ClientNum 保留不同的值,其中指定了最多的信息(年龄、国家、工作)。

结果应该是这样的:

   ClientNum  | Value |      Date     | Age   |   Country  |   Job
       19     |   A   |   1483697800  |  21   |    France  |  Engineer
       19     |   B   |   1483699800  |  21   |    France  |  Engineer
       24     |   C   |   1483699200  | null  |    null    |   null
       24     |   D   |   1483699840  |  28   |    Spain   |  Student

谢谢!

【问题讨论】:

Try this answer 和 Also see this too 我不能用 df.distinct() 或 df.drop_duplicates() 来做,在我的例子中所有的行都是不同的。我只想保留不同的值。 这就是这些答案的内容。让您获得想要保留的独特价值观。 此解决方案适用于 2 列,以获得不同的行。但是在我们有多个列的情况下, distinct 将保留所有列,因为每一行都是不同的。我只希望列值不同,并保留其他列。 【参考方案1】:

这是一种使用udf 计算每行非空值数量,然后使用Window 函数过滤数据的方法:

让我们首先定义udf,它将列的array 作为参数,并为我们提供非空值的数量作为结果。

from pyspark.sql.functions import array

def nullcounter(arr):

  res = [x for x in arr if x != None]
  return(len(res))

nullcounter_udf = udf(nullcounter)

让我们将此列添加到您的数据中:

df = df.withColumn("counter", nullcounter_udf(array(df.columns)))

现在我们可以按ClientNumValue 对您的数据进行分区,并保留counter 值最高的行:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

window = Window.partitionBy(df['ClientNum'], df['Value']).orderBy(df['counter'].desc())

df.select('*', rank().over(window).alias('rank')) \
  .filter(col('rank') == 1) \
  .sort('Value') \
  .show() 
+---+---------+-----+----------+----+-------+--------+-------+----+
| id|ClientNum|Value|      Date| Age|Country|     Job|counter|rank|
+---+---------+-----+----------+----+-------+--------+-------+----+
|  3|       19|    A|1483697800|  21| France|Engineer|      8|   1|
|  6|       19|    B|1483699800|  21| France|Engineer|      8|   1|
|  7|       24|    C|1483699200|null|   null|    null|      5|   1|
|  9|       24|    D|1483699840|  28|  Spain| Student|      8|   1|
+---+---------+-----+----------+----+-------+--------+-------+----+

数据

df = sc.parallelize([(1, 19, "A", 1483695000, 21, None, None),
(2, 19, "A", 1483696500, 21, "France", None),
(3, 19, "A", 1483697800, 21, "France", "Engineer"),
(4, 19, "B", 1483699000, 21, None, None),
(5, 19, "B", 1483699500, 21, "France", None),
(6, 19, "B", 1483699800, 21, "France", "Engineer"),
(7, 24, "C", 1483699200, None, None, None),
(8, 24, "D", 1483699560, 28, "Spain", None),
(9, 24, "D", 1483699840, 28, "Spain", "Student")]).toDF(["id","ClientNum","Value","Date","Age", "Country", "Job"])

【讨论】:

谢谢,但是对于数组,所有列都必须具有我认为的类型。由于数据类型不匹配:函数数组的输入应该都是相同的类型。 它会将您的值强制转换为字符串,但这对您的用例并不重要,因为我们仅将其用作计算非空值长度的中间步骤。你用的是什么火花版本? 我遇到了@Omar14 (pyspark 2.2.0) 描述的相同问题【参考方案2】:

试试这个:

    val df = Your_data_frame.registerTempTable("allData") // register your dataframe as a temp table

// we are finding max of date for each clientNum and value and join back to the original table.  

    sqlContext.sql("select a.ClientNum, a.Value, a.Date, a.Age, a.Country, a.Job from allData a
    join
    (select ClientNum, Value, max(Date) as max_date from allData group by ClientNum, Value) b
    on a.ClientNum = b.ClientNum and a.Value = b.Value and a.Date = b.max_date").show

【讨论】:

【参考方案3】:

如果像我一样,您对其他答案有疑问,这是我在 Python 中使用 UDF (spark 2.2.0) 的解决方案:

让我们创建一个虚拟数据集:

llist = [(1, 'alice', 'some_field', 'some_field', 'some_field', None), (30, 'bob', 'some_field', None, None, 10), (3, 'charles', 'some_field', None, 'some_other_field', 1111)]
df = sqlContext.createDataFrame(llist, ['id', 'name','field1','field2', 'field3', 'field4'])

df.show()

+---+-------+----------+----------+----------------+------+
| id|   name|    field1|    field2|          field3|field4|
+---+-------+----------+----------+----------------+------+
|  1|  alice|some_field|some_field|      some_field|  null|
| 30|    bob|some_field|      null|            null|    10|
|  3|charles|some_field|      null|some_other_field|  1111|
+---+-------+----------+----------+----------------+------+

让我们定义我们的 UDF 来计算 None 值:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import struct, udf

count_empty_columns = udf(
                        lambda row: len([x for x in row if x is None]), 
                        IntegerType()
                      )

我们可以基于该 UDF 添加一个新列 null_count

df = df.withColumn('null_count',
        count_empty_columns(struct([df[x] for x in df.columns])))

df.show()

+---+-------+----------+----------+----------------+------+----------+
| id|   name|    field1|    field2|          field3|field4|null_count|
+---+-------+----------+----------+----------------+------+----------+
|  1|  alice|some_field|some_field|      some_field|  null|         1|
| 30|    bob|some_field|      null|            null|    10|         2|
|  3|charles|some_field|      null|some_other_field|  1111|         1|
+---+-------+----------+----------+----------------+------+----------+

最后过滤:

df = df.filter(df['null_count'] <= 1)

【讨论】:

以上是关于Pyspark:根据每行空值的数量过滤数据框的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 计算每个数据框列中的空值数量

计算Spark DataFrame中的非空值的数量

通过包含空值的列表过滤熊猫数据框

在pyspark中将值随机更改为空值的最有效方法是啥?

将 Pyspark 数据框转换为具有实际值的列表

如何使用过滤器从scala中的数据框中获取包含空值的行集