如何计算数据集中的连续性?

Posted

技术标签:

【中文标题】如何计算数据集中的连续性?【英文标题】:How to calculate consecutiveness in Dataset? 【发布时间】:2017-05-28 08:12:46 【问题描述】:

我有数据集,如果它满足某些状态,我需要计算数据的连续性。示例数据集如下。用例是,如果交换 id 连续处于风险和不稳定状态,则将该周的计数增加 1 并与数据集合并。我正在尝试使用 Spark。

Date    Exchange Id Status  Consecutiveness
5/05/2017   a   RISKY   0
5/05/2017   b   Stable  0
5/05/2017   c   Stable  0
5/05/2017   d   UNSTABLE    0
5/05/2017   e   UNKNOWN 0
5/05/2017   f   UNKNOWN 0
6/05/2017   a   RISKY   1
6/05/2017   b   Stable  0
6/05/2017   c   Stable  0
6/05/2017   d   UNSTABLE    1
6/05/2017   e   UNSTABLE    1
6/05/2017   f   UNKNOWN 0

我的方法如下。

    为具有风险和不稳定的当前日期交换创建数据框 条件 为前一个日期创建另一个数据框,以便交易所具有 有风险且不稳定 加入 2 个数据框并获得不符合条件的交换 更新当前日期的连续性 与原始数据集合并。

我正在尝试执行以下命令。但是,遇到问题,无法继续进行 3,4,5

case class Telecom(Date: String, Exchange: String, Stability: String, Cosecutive: Int)

val emp1  = sc.textFile("file:/// Filename").map(_.split(",")).map(emp1=>Telecom(emp1(0),emp1(1),emp1(2),emp1(4).trim.toInt)).toDF()

val PreviousWeek = sqlContext.sql("select * from T1 limit 10")

emp1.registerTempTable("T1")

val FailPreviousWeek = sqlContext.sql("Select Exchange, Count  from T1 where Date = '5/05/2017' and Stability in ('RISKY','UNSTABLE')")

val FailCurrentWeek = sqlContext.sql("Select Exchange, Count  from T1 where Date = '6/05/2017' and Stability in ('RISKY','UNSTABLE')")

FailCurrentWeek.join(FailPreviousWeek, FailCurrentWeek("Exchange") === FailPreviousWeek("Exchange"))

val UpdateCurrentWeek = FailCurrentWeek.select($"Exchange",$"Count" +1)

Val UpdateDataSet = emp1.join(UpdateCurrentWeek)

  val UpdateCurrentWeek = FailCurrentWeek.select($"Exchange".alias("Exchangeid"),$"Count" +1)

【问题讨论】:

为什么e 一周的6/05/20171 【参考方案1】:

这对我的心爱的 窗口聚合函数来说是一个完美的案例。

我认为lag(带有when)功能可以做到:

lag(columnName: String, offset: Int): Column 返回当前行之前偏移行的值,如果当前行之前有少于偏移行,则返回null

import org.apache.spark.sql.expressions.Window
val exchIds = Window.partitionBy("Exchange_Id").orderBy("Date")

val cc = when(lower($"Status") === "risky" && $"lag" === $"Status", 1).
  when(lower($"Status") === "unstable" && $"lag" === $"Status", 1).
  otherwise(0)

val solution = input.
  withColumn("lag", lag("Status", 1) over exchIds).
  withColumn("Consecutiveness", cc).
  orderBy("Date", "Exchange_Id").
  select("Date", "Exchange_Id", "Status", "Consecutiveness")
scala> solution.show
+---------+-----------+--------+---------------+
|     Date|Exchange_Id|  Status|Consecutiveness|
+---------+-----------+--------+---------------+
|5/05/2017|          a|   RISKY|              0|
|5/05/2017|          b|  Stable|              0|
|5/05/2017|          c|  Stable|              0|
|5/05/2017|          d|UNSTABLE|              0|
|5/05/2017|          e| UNKNOWN|              0|
|5/05/2017|          f| UNKNOWN|              0|
|6/05/2017|          a|   RISKY|              1|
|6/05/2017|          b|  Stable|              0|
|6/05/2017|          c|  Stable|              0|
|6/05/2017|          d|UNSTABLE|              1|
|6/05/2017|          e|UNSTABLE|              0|
|6/05/2017|          f| UNKNOWN|              0|
+---------+-----------+--------+---------------+

【讨论】:

你好亚采克。非常感谢。知道这个功能真是太好了。 虽然有一个问题 - 如果 a 的下一个日期 (07/05/2017) 的状态变得稳定,那么它应该再次变为 0,然后在 2017 年 8 月 5 日再次变为有风险,那么它应该再次从 1 开始,而不是 2。你能用这段代码试试这个,让我知道。谢谢。【参考方案2】:

我终于用了Hive Window的多循环分区功能。

    第一步将标志转换为boolean 在一个循环中计算带有标志值的连续性 然后再考虑一个窗口分区 循环中的连续性。

这可以像使用 Spark SQL 一样完成。

【讨论】:

以上是关于如何计算数据集中的连续性?的主要内容,如果未能解决你的问题,请参考以下文章

sas如何计算相邻观测的差值

如何计算决策树的 AUC?

非离散(连续)数据的信息增益计算

错误修复程序和测试集如何不兼容?

如何使用 data.table 有效地计算一个数据集中的 GPS 点与另一个数据集中的 GPS 点之间的距离

从数据集中计算最近的地理位置