如何计算数据集中的连续性?
Posted
技术标签:
【中文标题】如何计算数据集中的连续性?【英文标题】:How to calculate consecutiveness in Dataset? 【发布时间】:2017-05-28 08:12:46 【问题描述】:我有数据集,如果它满足某些状态,我需要计算数据的连续性。示例数据集如下。用例是,如果交换 id 连续处于风险和不稳定状态,则将该周的计数增加 1 并与数据集合并。我正在尝试使用 Spark。
Date Exchange Id Status Consecutiveness
5/05/2017 a RISKY 0
5/05/2017 b Stable 0
5/05/2017 c Stable 0
5/05/2017 d UNSTABLE 0
5/05/2017 e UNKNOWN 0
5/05/2017 f UNKNOWN 0
6/05/2017 a RISKY 1
6/05/2017 b Stable 0
6/05/2017 c Stable 0
6/05/2017 d UNSTABLE 1
6/05/2017 e UNSTABLE 1
6/05/2017 f UNKNOWN 0
我的方法如下。
-
为具有风险和不稳定的当前日期交换创建数据框
条件
为前一个日期创建另一个数据框,以便交易所具有
有风险且不稳定
加入 2 个数据框并获得不符合条件的交换
更新当前日期的连续性
与原始数据集合并。
我正在尝试执行以下命令。但是,遇到问题,无法继续进行 3,4,5
case class Telecom(Date: String, Exchange: String, Stability: String, Cosecutive: Int)
val emp1 = sc.textFile("file:/// Filename").map(_.split(",")).map(emp1=>Telecom(emp1(0),emp1(1),emp1(2),emp1(4).trim.toInt)).toDF()
val PreviousWeek = sqlContext.sql("select * from T1 limit 10")
emp1.registerTempTable("T1")
val FailPreviousWeek = sqlContext.sql("Select Exchange, Count from T1 where Date = '5/05/2017' and Stability in ('RISKY','UNSTABLE')")
val FailCurrentWeek = sqlContext.sql("Select Exchange, Count from T1 where Date = '6/05/2017' and Stability in ('RISKY','UNSTABLE')")
FailCurrentWeek.join(FailPreviousWeek, FailCurrentWeek("Exchange") === FailPreviousWeek("Exchange"))
val UpdateCurrentWeek = FailCurrentWeek.select($"Exchange",$"Count" +1)
Val UpdateDataSet = emp1.join(UpdateCurrentWeek)
val UpdateCurrentWeek = FailCurrentWeek.select($"Exchange".alias("Exchangeid"),$"Count" +1)
【问题讨论】:
为什么e
一周的6/05/2017
是1
?
【参考方案1】:
这对我的心爱的 窗口聚合函数来说是一个完美的案例。
我认为lag(带有when
)功能可以做到:
lag(columnName: String, offset: Int): Column 返回当前行之前偏移行的值,如果当前行之前有少于偏移行,则返回
null
。
import org.apache.spark.sql.expressions.Window
val exchIds = Window.partitionBy("Exchange_Id").orderBy("Date")
val cc = when(lower($"Status") === "risky" && $"lag" === $"Status", 1).
when(lower($"Status") === "unstable" && $"lag" === $"Status", 1).
otherwise(0)
val solution = input.
withColumn("lag", lag("Status", 1) over exchIds).
withColumn("Consecutiveness", cc).
orderBy("Date", "Exchange_Id").
select("Date", "Exchange_Id", "Status", "Consecutiveness")
scala> solution.show
+---------+-----------+--------+---------------+
| Date|Exchange_Id| Status|Consecutiveness|
+---------+-----------+--------+---------------+
|5/05/2017| a| RISKY| 0|
|5/05/2017| b| Stable| 0|
|5/05/2017| c| Stable| 0|
|5/05/2017| d|UNSTABLE| 0|
|5/05/2017| e| UNKNOWN| 0|
|5/05/2017| f| UNKNOWN| 0|
|6/05/2017| a| RISKY| 1|
|6/05/2017| b| Stable| 0|
|6/05/2017| c| Stable| 0|
|6/05/2017| d|UNSTABLE| 1|
|6/05/2017| e|UNSTABLE| 0|
|6/05/2017| f| UNKNOWN| 0|
+---------+-----------+--------+---------------+
【讨论】:
你好亚采克。非常感谢。知道这个功能真是太好了。 虽然有一个问题 - 如果 a 的下一个日期 (07/05/2017) 的状态变得稳定,那么它应该再次变为 0,然后在 2017 年 8 月 5 日再次变为有风险,那么它应该再次从 1 开始,而不是 2。你能用这段代码试试这个,让我知道。谢谢。【参考方案2】:我终于用了Hive Window的多循环分区功能。
-
第一步将标志转换为boolean
在一个循环中计算带有标志值的连续性
然后再考虑一个窗口分区
循环中的连续性。
这可以像使用 Spark SQL 一样完成。
【讨论】:
以上是关于如何计算数据集中的连续性?的主要内容,如果未能解决你的问题,请参考以下文章