从过滤聚合pySpark中获取第一个示例元素
Posted
技术标签:
【中文标题】从过滤聚合pySpark中获取第一个示例元素【英文标题】:Get first example element from filtered aggregation pySpark 【发布时间】:2021-01-18 11:02:48 【问题描述】:表:
+---------+---------+---------+---------+
|id |path |error |message |
+---------+---------+---------+---------+
| 1 | a.a.a| true | "aaa" |
+---------+---------+---------+---------+
| 2 | a.a.a| true | "bbb" |
+---------+---------+---------+---------+
| 2 | a.a.a| true | "bbc" |
+---------+---------+---------+---------+
| 2 | a.a.b| false | "ccc" |
+---------+---------+---------+---------+
我有 pySpark 查询:
data.groupBy('id', 'path')\
.agg(
sum(when(col('error') == 'true', 1).otherwise(0)).alias('count'),
).show()
如何添加具有第一个示例元素的新列 col('error') == 'true'
?我想要一张包含 id, path, count, exampleItem
元素的表格。
函数first().alias('exampleItem')
有效,但返回不一定符合上述条件的元素。
【问题讨论】:
【参考方案1】:您可以做的是使用when
函数定义一个列,该列仅在错误为真的情况下包含消息,否则为空。然后,将 ignorenulls
设置为 true 的 first
函数将提供您所期望的结果。
d = [(1, "a.a.a", True, "aaa"), (2, "a.a.a", True, "bbb"),
(2, "a.a.a", True, "bbc"), (2, "a.a.b", False, "ccc")]
data = spark.createDataFrame(d, ['id', 'path', 'error', 'message'])
data\
.groupBy('id', 'path')\
.agg(F.sum(F.when(F.col('error') == 'true', 1).otherwise(0)).alias('count'),
F.first(F.when(F.col('error'), F.col('message')), ignorenulls=True).alias('exampleItem'))\
.show()
+---+-----+-----+-----------+
| id| path|count|exampleItem|
+---+-----+-----+-----------+
| 2|a.a.a| 2| bbb|
| 1|a.a.a| 1| aaa|
| 2|a.a.b| 0| null|
+---+-----+-----+-----------+
对于最后一行,空值是由于没有消息满足所需的要求。
【讨论】:
【参考方案2】:您可以将first
与ignorenulls=True
选项和case when
语句一起使用,以获取带有error = true
的第一条消息。
from pyspark.sql import functions as F, Window
df2 = df.groupBy(
'id', 'path'
).agg(
F.sum(F.when(F.col('error') == 'true', 1).otherwise(0)).alias('count'),
F.first(
F.when(F.col('error') == 'true', F.col('message')),
ignorenulls=True
).alias('exampleItem')
).orderBy('id', 'path')
df2.show()
+---+-----+-----+-----------+
| id| path|count|exampleItem|
+---+-----+-----+-----------+
| 1|a.a.a| 1| aaa|
| 2|a.a.a| 2| bbb|
| 2|a.a.b| 0| null|
+---+-----+-----+-----------+
但是,请注意,您的示例数据框中没有定义排序列,因此first
没有具体含义,只会返回带有error = true
的随机元素。大概应该有一个时间戳列,可用于对每个 id/path 分区中的数据帧进行排序。
【讨论】:
以上是关于从过滤聚合pySpark中获取第一个示例元素的主要内容,如果未能解决你的问题,请参考以下文章