从过滤聚合pySpark中获取第一个示例元素

Posted

技术标签:

【中文标题】从过滤聚合pySpark中获取第一个示例元素【英文标题】:Get first example element from filtered aggregation pySpark 【发布时间】:2021-01-18 11:02:48 【问题描述】:

表:

+---------+---------+---------+---------+
|id       |path     |error    |message  |
+---------+---------+---------+---------+
|      1  |    a.a.a|   true  |   "aaa" |
+---------+---------+---------+---------+
|      2  |    a.a.a|   true  |   "bbb" |
+---------+---------+---------+---------+
|      2  |    a.a.a|   true  |   "bbc" |
+---------+---------+---------+---------+
|      2  |    a.a.b|   false |   "ccc" |
+---------+---------+---------+---------+

我有 pySpark 查询:

data.groupBy('id', 'path')\
.agg(
    sum(when(col('error') == 'true', 1).otherwise(0)).alias('count'),
).show()

如何添加具有第一个示例元素的新列 col('error') == 'true'?我想要一张包含 id, path, count, exampleItem 元素的表格。 函数first().alias('exampleItem') 有效,但返回不一定符合上述条件的元素。

【问题讨论】:

【参考方案1】:

您可以做的是使用when 函数定义一个列,该列仅在错误为真的情况下包含消息,否则为空。然后,将 ignorenulls 设置为 true 的 first 函数将提供您所期望的结果。

d = [(1, "a.a.a", True, "aaa"), (2, "a.a.a", True, "bbb"),
     (2, "a.a.a", True, "bbc"), (2, "a.a.b", False, "ccc")]
data = spark.createDataFrame(d, ['id', 'path', 'error', 'message'])

data\
    .groupBy('id', 'path')\
    .agg(F.sum(F.when(F.col('error') == 'true', 1).otherwise(0)).alias('count'),
         F.first(F.when(F.col('error'), F.col('message')), ignorenulls=True).alias('exampleItem'))\
    .show()
+---+-----+-----+-----------+
| id| path|count|exampleItem|
+---+-----+-----+-----------+
|  2|a.a.a|    2|        bbb|
|  1|a.a.a|    1|        aaa|
|  2|a.a.b|    0|       null|
+---+-----+-----+-----------+

对于最后一行,空值是由于没有消息满足所需的要求。

【讨论】:

【参考方案2】:

您可以将firstignorenulls=True 选项和case when 语句一起使用,以获取带有error = true 的第一条消息。

from pyspark.sql import functions as F, Window

df2 = df.groupBy(
    'id', 'path'
).agg(
    F.sum(F.when(F.col('error') == 'true', 1).otherwise(0)).alias('count'),
    F.first(
        F.when(F.col('error') == 'true', F.col('message')),
        ignorenulls=True
    ).alias('exampleItem')
).orderBy('id', 'path')

df2.show()
+---+-----+-----+-----------+
| id| path|count|exampleItem|
+---+-----+-----+-----------+
|  1|a.a.a|    1|        aaa|
|  2|a.a.a|    2|        bbb|
|  2|a.a.b|    0|       null|
+---+-----+-----+-----------+

但是,请注意,您的示例数据框中没有定义排序列,因此first 没有具体含义,只会返回带有error = true 的随机元素。大概应该有一个时间戳列,可用于对每个 id/path 分区中的数据帧进行排序。

【讨论】:

以上是关于从过滤聚合pySpark中获取第一个示例元素的主要内容,如果未能解决你的问题,请参考以下文章

获取 PySpark 列中列表列表中第一个元素的最大值

通过第三个索引从一个索引聚合和过滤到另一个索引

js删除数组第一个元素怎么写

PySpark-如何从此数据框中过滤行

Pyspark 过滤器使用列表中的startswith

聚合 SQL 函数以仅从每个组中获取第一个