总结scala中每个序列的连续值

Posted

技术标签:

【中文标题】总结scala中每个序列的连续值【英文标题】:Summing up consecutive values for each sequence in scala 【发布时间】:2019-09-06 12:27:06 【问题描述】:

我有一个数据集,其中的序列号是 0 和 1。

Category   Value    Sequences
  1         10        0
  1         11        1
  1         13        1
  1         16        1
  1         20        0
  1         21        0
  1         22        1
  1         25        1
  1         27        1
  1         29        1
  1         30        0
  1         32        1
  1         34        1
  1         35        1
  1         38        0

这里序列中的 1 列出现三次。我需要单独总结那个序列值。

我正在尝试使用以下代码:

%livy2.spark
import org.apache.spark.rdd.RDD

val df = df.select( $"Category", $"Value", $"Sequences").rdd.groupBy(x => 
(x.getInt(0)) 
 ).map(
   x =>  
      val Category= x(0).getInt(0)
      val Value= x(0).getInt(1)
      val Sequences = x(0).getInt(2)
      for (i <- x.indices)
         val vi = x(i).getFloat(4)
         if (vi(0) >0 )                 
              
               summing+  = Value//  
             
  (Category, summing)
 
 
 )
 df_new.take(10).foreach(println) 

当我编写此代码时,会发生错误,说明该语句不完整。 df值代表我最初给出的数据集。

预期的输出是:

Category   summing 
  1         40
  1         103
  1         101

我不知道我在哪里落后。如果有人能帮助我学习这个新事物,那就太好了。

【问题讨论】:

【参考方案1】:

可以通过为每一行分配唯一 ID,然后将每个单元包含在由下一个零唯一 ID 指定的组中:

val df = Seq(
  (1, 10, 0),
  (1, 11, 1),
  (1, 13, 1),
  (1, 16, 1),
  (1, 20, 0),
  (1, 21, 0),
  (1, 22, 1),
  (1, 25, 1),
  (1, 27, 1),
  (1, 29, 1),
  (1, 30, 0),
  (1, 32, 1),
  (1, 34, 1),
  (1, 35, 1),
  (1, 38, 0)
).toDF("Category", "Value", "Sequences")

// assign each row unique id
val zipped = df.withColumn("zip", monotonically_increasing_id())

// Make range from zero to next zero
val categoryWindow = Window.partitionBy("Category").orderBy($"zip")
val groups = zipped
  .filter($"Sequences" === 0)
  .withColumn("rangeEnd", lead($"zip", 1).over(categoryWindow))
  .withColumnRenamed("zip", "rangeStart")

println("Groups:")
groups.show(false)

// Assign range for each unit
val joinCondition = ($"units.zip" > $"groups.rangeStart").and($"units.zip" < $"groups.rangeEnd")
val unitsByRange = zipped
  .filter($"Sequences" === 1).alias("units")
  .join(groups.alias("groups"), joinCondition, "left")
  .select("units.Category", "units.Value", "groups.rangeStart")

println("Units in groups:")
unitsByRange.show(false)

// Group by range
val result = unitsByRange
  .groupBy($"Category", $"rangeStart")
  .agg(sum("Value").alias("summing"))
  .orderBy("rangeStart")
  .drop("rangeStart")

println("Result:")
result.show(false)

输出:

Groups:
+--------+-----+---------+----------+----------+
|Category|Value|Sequences|rangeStart|rangeEnd  |
+--------+-----+---------+----------+----------+
|1       |10   |0        |0         |4         |
|1       |20   |0        |4         |5         |
|1       |21   |0        |5         |8589934595|
|1       |30   |0        |8589934595|8589934599|
|1       |38   |0        |8589934599|null      |
+--------+-----+---------+----------+----------+

Units in groups:
+--------+-----+----------+
|Category|Value|rangeStart|
+--------+-----+----------+
|1       |11   |0         |
|1       |13   |0         |
|1       |16   |0         |
|1       |22   |5         |
|1       |25   |5         |
|1       |27   |5         |
|1       |29   |5         |
|1       |32   |8589934595|
|1       |34   |8589934595|
|1       |35   |8589934595|
+--------+-----+----------+

Result:
+--------+-------+
|Category|summing|
+--------+-------+
|1       |40     |
|1       |103    |
|1       |101    |
+--------+-------+

【讨论】:

以上是关于总结scala中每个序列的连续值的主要内容,如果未能解决你的问题,请参考以下文章

将每个连续序列减少到它的值和长度

动态规划总结最长子序列

Java 求解最长连续递增序列

算法小总结最大连续子序列和最大连续子矩阵的关系与实现

LeetCode--53 最大连续子序列(总结)

求最短连续子序列