Pyspark:根据每行空值的数量过滤数据框
Posted
技术标签:
【中文标题】Pyspark:根据每行空值的数量过滤数据框【英文标题】:Pyspark: Filtering Dataframe based on number of null values per row 【发布时间】:2017-04-10 09:46:13 【问题描述】:我正在使用 pyspark 我有一张这样的表:
id | ClientNum | Value | Date | Age | Country | Job
1 | 19 | A | 1483695000 | 21 | null | null
2 | 19 | A | 1483696500 | 21 | France | null
3 | 19 | A | 1483697800 | 21 | France | Engineer
4 | 19 | B | 1483699000 | 21 | null | null
5 | 19 | B | 1483699500 | 21 | France | null
6 | 19 | B | 1483699800 | 21 | France | Engineer
7 | 24 | C | 1483699200 | null | null | null
8 | 24 | D | 1483699560 | 28 | Spain | null
9 | 24 | D | 1483699840 | 28 | Spain | Student
基于列 Value,我想为每个 ClientNum 保留不同的值,其中指定了最多的信息(年龄、国家、工作)。
结果应该是这样的:
ClientNum | Value | Date | Age | Country | Job
19 | A | 1483697800 | 21 | France | Engineer
19 | B | 1483699800 | 21 | France | Engineer
24 | C | 1483699200 | null | null | null
24 | D | 1483699840 | 28 | Spain | Student
谢谢!
【问题讨论】:
Try this answer 和 Also see this too 我不能用 df.distinct() 或 df.drop_duplicates() 来做,在我的例子中所有的行都是不同的。我只想保留不同的值。 这就是这些答案的内容。让您获得想要保留的独特价值观。 此解决方案适用于 2 列,以获得不同的行。但是在我们有多个列的情况下, distinct 将保留所有列,因为每一行都是不同的。我只希望列值不同,并保留其他列。 【参考方案1】:这是一种使用udf
计算每行非空值数量,然后使用Window
函数过滤数据的方法:
让我们首先定义udf
,它将列的array
作为参数,并为我们提供非空值的数量作为结果。
from pyspark.sql.functions import array
def nullcounter(arr):
res = [x for x in arr if x != None]
return(len(res))
nullcounter_udf = udf(nullcounter)
让我们将此列添加到您的数据中:
df = df.withColumn("counter", nullcounter_udf(array(df.columns)))
现在我们可以按ClientNum
和Value
对您的数据进行分区,并保留counter
值最高的行:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
window = Window.partitionBy(df['ClientNum'], df['Value']).orderBy(df['counter'].desc())
df.select('*', rank().over(window).alias('rank')) \
.filter(col('rank') == 1) \
.sort('Value') \
.show()
+---+---------+-----+----------+----+-------+--------+-------+----+
| id|ClientNum|Value| Date| Age|Country| Job|counter|rank|
+---+---------+-----+----------+----+-------+--------+-------+----+
| 3| 19| A|1483697800| 21| France|Engineer| 8| 1|
| 6| 19| B|1483699800| 21| France|Engineer| 8| 1|
| 7| 24| C|1483699200|null| null| null| 5| 1|
| 9| 24| D|1483699840| 28| Spain| Student| 8| 1|
+---+---------+-----+----------+----+-------+--------+-------+----+
数据
df = sc.parallelize([(1, 19, "A", 1483695000, 21, None, None),
(2, 19, "A", 1483696500, 21, "France", None),
(3, 19, "A", 1483697800, 21, "France", "Engineer"),
(4, 19, "B", 1483699000, 21, None, None),
(5, 19, "B", 1483699500, 21, "France", None),
(6, 19, "B", 1483699800, 21, "France", "Engineer"),
(7, 24, "C", 1483699200, None, None, None),
(8, 24, "D", 1483699560, 28, "Spain", None),
(9, 24, "D", 1483699840, 28, "Spain", "Student")]).toDF(["id","ClientNum","Value","Date","Age", "Country", "Job"])
【讨论】:
谢谢,但是对于数组,所有列都必须具有我认为的类型。由于数据类型不匹配:函数数组的输入应该都是相同的类型。 它会将您的值强制转换为字符串,但这对您的用例并不重要,因为我们仅将其用作计算非空值长度的中间步骤。你用的是什么火花版本? 我遇到了@Omar14 (pyspark 2.2.0) 描述的相同问题【参考方案2】:试试这个:
val df = Your_data_frame.registerTempTable("allData") // register your dataframe as a temp table
// we are finding max of date for each clientNum and value and join back to the original table.
sqlContext.sql("select a.ClientNum, a.Value, a.Date, a.Age, a.Country, a.Job from allData a
join
(select ClientNum, Value, max(Date) as max_date from allData group by ClientNum, Value) b
on a.ClientNum = b.ClientNum and a.Value = b.Value and a.Date = b.max_date").show
【讨论】:
【参考方案3】:如果像我一样,您对其他答案有疑问,这是我在 Python 中使用 UDF (spark 2.2.0) 的解决方案:
让我们创建一个虚拟数据集:
llist = [(1, 'alice', 'some_field', 'some_field', 'some_field', None), (30, 'bob', 'some_field', None, None, 10), (3, 'charles', 'some_field', None, 'some_other_field', 1111)]
df = sqlContext.createDataFrame(llist, ['id', 'name','field1','field2', 'field3', 'field4'])
df.show()
+---+-------+----------+----------+----------------+------+
| id| name| field1| field2| field3|field4|
+---+-------+----------+----------+----------------+------+
| 1| alice|some_field|some_field| some_field| null|
| 30| bob|some_field| null| null| 10|
| 3|charles|some_field| null|some_other_field| 1111|
+---+-------+----------+----------+----------------+------+
让我们定义我们的 UDF 来计算 None
值:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import struct, udf
count_empty_columns = udf(
lambda row: len([x for x in row if x is None]),
IntegerType()
)
我们可以基于该 UDF 添加一个新列 null_count
:
df = df.withColumn('null_count',
count_empty_columns(struct([df[x] for x in df.columns])))
df.show()
+---+-------+----------+----------+----------------+------+----------+
| id| name| field1| field2| field3|field4|null_count|
+---+-------+----------+----------+----------------+------+----------+
| 1| alice|some_field|some_field| some_field| null| 1|
| 30| bob|some_field| null| null| 10| 2|
| 3|charles|some_field| null|some_other_field| 1111| 1|
+---+-------+----------+----------+----------------+------+----------+
最后过滤:
df = df.filter(df['null_count'] <= 1)
【讨论】:
以上是关于Pyspark:根据每行空值的数量过滤数据框的主要内容,如果未能解决你的问题,请参考以下文章