使用 Spark Scala 运行最后一小时事务的总和

Posted

技术标签:

【中文标题】使用 Spark Scala 运行最后一小时事务的总和【英文标题】:Running Sum of last one hour transaction using Spark Scala 【发布时间】:2016-03-19 00:28:12 【问题描述】:

我想使用 Spark-Scala 计算每笔交易的最后一小时的运行总和。我有以下包含三个字段的数据框,并希望计算第四个字段,如下所示:

Customer     TimeStamp        Tr Last_1Hr_RunningSum
Cust-1  6/1/2015 6:51:55      1        1
Cust-1  6/1/2015 6:58:34      3        4
Cust-1  6/1/2015 7:20:46      3        7
Cust-1  6/1/2015 7:40:45      4       11
Cust-1  6/1/2015 7:55:34      5       15
Cust-1  6/1/2015 8:20:34      0       12
Cust-1  6/1/2015 8:34:34      3       12
Cust-1  6/1/2015 9:35:34      7        7
Cust-1  6/1/2015 9:45:34      3       10
Cust-2  6/1/2015 16:26:34     2        2
Cust-2  6/1/2015 16:35:34     1        3
Cust-2  6/1/2015 17:39:34     3        3
Cust-2  6/1/2015 17:43:34     5        8
Cust-3  6/1/2015 17:17:34     6        6
Cust-3  6/1/2015 17:21:34     4       10
Cust-3  6/1/2015 17:45:34     2       12
Cust-3  6/1/2015 17:56:34     3       15
Cust-3  6/1/2015 18:21:34     4       13
Cust-3  6/1/2015 19:24:34     1        1

我想将“Last_1Hr_RunningSum”计算为新字段,该字段按客户 ID 从每笔交易中回顾一小时并获取一些“Tr”(交易归档)。

    例如:Cust-1 at 6/1/2015 8:20:34 将回溯到 6/1/2015 7:20:46 并取 (0+5+4+3) = 12 的总和。 对于每一行,我想回顾一小时并计算在那一小时内所有交易的总和。

我尝试使用嵌套查询运行 sqlContext.sql,但它给了我错误。 Spark-Scala SQLContext 也不支持窗口函数和分区上的行号。

如何仅使用带有 Spark-Scala 的“TimeStamp”列从“Tr”获取过去一小时的总和。

提前致谢。

【问题讨论】:

您应该显示查询并修复示例数据的缩进和格式,以及预期的数据 @AlbertoBonsanto 我已经修复了示例数据的缩进和格式。输入表将是前三个字段,预期输出表将带有附加字段“Last_1Hr_RunningSum”。 “我尝试使用嵌套查询运行 sqlContext.sql,但它给了我错误”我们是否打算猜测您尝试了什么查询以及您遇到了什么错误? @The Archetypal Paul,不支持 spark sql 中的嵌套查询。 【参考方案1】:

我尝试使用嵌套查询运行 sqlContext.sql,但它给了我错误

您是否尝试过使用加入?

df.registerTempTable("input")

val result = sqlContext.sql("""
        SELECT
           FIRST(a.Customer) AS Customer,
           FIRST(a.Timestamp) AS Timestamp,
           FIRST(a.Tr) AS Tr,
           SUM(b.Tr) AS Last_1Hr_RunningSum
        FROM input a
        JOIN input b ON
          a.Customer = b.Customer
          AND b.Timestamp BETWEEN (a.Timestamp - 3600000) AND a.Timestamp
        GROUP BY a.Customer, a.Timestamp
        ORDER BY a.Customer, a.Timestamp
        """)

result.show()

打印预期结果:

+--------+-------------+---+-------------------+
|Customer|    Timestamp| Tr|Last_1Hr_RunningSum|
+--------+-------------+---+-------------------+
|  Cust-1|1420519915000|  1|                1.0|
|  Cust-1|1420520314000|  3|                4.0|
|  Cust-1|1420521646000|  3|                7.0|
|  Cust-1|1420522845000|  4|               11.0|
|  Cust-1|1420523734000|  5|               15.0|
|  Cust-1|1420525234000|  0|               12.0|
|  Cust-1|1420526074000|  3|               12.0|
|  Cust-1|1420529734000|  7|                7.0|
|  Cust-1|1420530334000|  3|               10.0|
|  Cust-2|1420554394000|  2|                2.0|
|  Cust-2|1420554934000|  1|                3.0|
|  Cust-2|1420558774000|  3|                3.0|
|  Cust-2|1420559014000|  5|                8.0|
|  Cust-3|1420557454000|  6|                6.0|
|  Cust-3|1420557694000|  4|               10.0|
|  Cust-3|1420559134000|  2|               12.0|
|  Cust-3|1420559794000|  3|               15.0|
|  Cust-3|1420561294000|  4|               13.0|
|  Cust-3|1420565074000|  1|                1.0|
+--------+-------------+---+-------------------+

(此解决方案假定时间以毫秒为单位)

【讨论】:

以上是关于使用 Spark Scala 运行最后一小时事务的总和的主要内容,如果未能解决你的问题,请参考以下文章

02环境搭建

02环境搭建

Scala、Apache Spark编写的编译错误保存模型

Scala学习之路 Spark初识

在Windows下用Eclipse开发和运行Spark程序

在Windows下用Eclipse开发和运行Spark程序