Apache Spark Group By(获取组中的第一个和最后一个值)

Posted

技术标签:

【中文标题】Apache Spark Group By(获取组中的第一个和最后一个值)【英文标题】:Apache Spark Group By (get First & Last values in Group) 【发布时间】:2020-12-14 17:39:36 【问题描述】:

我正在学校云上的 VM 集群上运行 hadoop(老实说,我不知道具体细节)。我正在使用 apache spark 与 hadoop 对话并运行我当前的代码。

我一直在尝试对我的数据进行一些聚合,以找到一个小时/天/月的总消耗值(数据中的 ENERGY_READING 列)

CONSUMPTION.tsv已经完成了一些操作

+--------+-------------------+----+--------------+
|HOUSE_ID|CONDATE            |HOUR|ENERGY_READING|
+--------+-------------------+----+--------------+
|9       |2015-05-30 00:00:00|0   |11000.001444  |
|9       |2015-05-30 00:00:10|0   |11000.002888  |
|9       |2015-05-30 00:00:20|0   |11000.004332  |
|9       |2015-05-30 00:00:30|0   |11000.005776  |
|9       |2015-05-30 00:00:40|0   |11000.00722   |
|9       |2015-05-30 00:00:50|0   |11000.008664  |
|9       |2015-05-30 00:01:00|0   |11000.010108  |
|9       |2015-05-30 00:01:10|0   |11000.011552  |
|9       |2015-05-30 00:01:20|0   |11000.012996  |
|9       |2015-05-30 00:01:30|0   |11000.01444   |
|9       |2015-05-30 00:01:40|0   |11000.015884  |
|9       |2015-05-30 00:01:50|0   |11000.017328  |
|9       |2015-05-30 00:02:00|0   |11000.018772  |
|9       |2015-05-30 00:02:10|0   |11000.020216  |
|9       |2015-05-30 00:02:20|0   |11000.02166   |
|9       |2015-05-30 00:02:30|0   |11000.023104  |
|9       |2015-05-30 00:02:40|0   |11000.024548  |
|9       |2015-05-30 00:02:50|0   |11000.025992  |
|9       |2015-05-30 00:03:00|0   |11000.027436  |
|9       |2015-05-30 00:03:10|0   |11000.02888   |
+--------+-------------------+----+--------------+

Java 类

StructType schema = new StructType()
                .add("LOG_ID",IntegerType)
                .add("HOUSE_ID", IntegerType)
                .add("CONDATE", StringType)
                .add("ENERGY_READING", DoubleType)
                .add("FLAG", IntegerType);

        Dataset<Row> data = spark.read()
                .option("header", true)
                .option("delimiter", "\t")
                .option("mode","DROPMALFORMED")
                .schema(schema)
                .csv("hdfs://hd-master:9820/CONSUMPTION.tsv");

        data = data.withColumn("CONDATE", functions.to_timestamp(functions.col("CONDATE"),"yy-MM-dd HH:mm:ss.SSSSSSSSS").cast(TimestampType));

        data = data.withColumn("HOUR", functions.hour(functions.col("CONDATE")));

        Dataset<Row> df = data.select("HOUSE_ID","CONDATE","HOUR","ENERGY_READING");

所以我拥有的数据每 10 秒递增一次。我想获取每个小时/天/月的第一个和最后一个值。

基本上我想要的是一天 11000.001444 的第一个值,在这种情况下,最后一个值可以说是 11000.01444。然后从第一个中减去第二个,得到该小时/天/月的总消耗量。

这会给我一个输出

HOUSE_ID   CONDATE      HOUR       ENERGY_READING
  9        15-05-30      0              0.013
  9        15-05-30      1              ...

【问题讨论】:

如果你得到每个小时的最后一个值,你正在做 00:50 - 00:00、01:50 - 01:00 等,你会错过 00 期间消耗的能量: 50 到 01:00、01:50 到 02:00 等。相反,您想从 00:00 的值中减去 01:00 的值吗? (即 01:00 - 00:00)。在这种情况下,您将需要每个组的第一个值。 我明白你在说什么。所以基本上每组的第一个值将从前一组中减去。那么我该如何编写这样的代码呢? 是的,完全正确。那是你想要做的吗? 是的,这与我所做的想法完全相同,只是方法不同,无论如何都会产生相对相同的结果。 【参考方案1】:

下面的代码将按分钟分组并计算该分钟的消耗:

import org.apache.spark.sql.expressions.Window

Dataset<Row> df2 = df.groupBy(
    functions.col("HOUSE_ID"),
    functions.minute(col("CONDATE")).alias("minute")
).agg(
    functions.min("ENERGY_READING").alias("ENERGY_READING")
).withColumn(
    "LAG_ENERGY_READING",
    functions.lag(functions.col("ENERGY_READING"), 1).over(Window.partitionBy("HOUSE_ID").orderBy("minute"))
).withColumn(
    "consumption",
    functions.expr("ENERGY_READING - LAG_ENERGY_READING")
)

【讨论】:

运算符'-'不能应用于'org.apache.spark.sql.Column'、'org.apache.spark.sql.Column' 确实是:'( 是的,我测试一下。给我几分钟 不用担心。对我来说有点过程。创建一个jar文件,然后将其传输到服务器然后运行它,刚才我忘记打印输出了。 让我们continue this discussion in chat。

以上是关于Apache Spark Group By(获取组中的第一个和最后一个值)的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Apache Spark 中的 Group By Operation 形成的每个子集上应用用户定义函数?

org.apache.spark.sql.AnalysisException:表达式 't2.`sum_click_passed`' 既不在 group by 中,也不是聚合函数

获取每个组的第一个和最后一个值 – dplyr group_by 与 last() 和 first()

如果存在多个值,Apache Pig Group by 和过滤器?

从 group by 获取列描述

如何在 spark 数据集上使用 group by