在 Spark 数据集中创建具有运行总计的列

Posted

技术标签:

【中文标题】在 Spark 数据集中创建具有运行总计的列【英文标题】:create column with a running total in a Spark Dataset 【发布时间】:2017-10-05 00:16:12 【问题描述】:

假设我们有一个包含两列的 Spark 数据集,比如索引和值,按第一列(索引)排序。

((1, 100), (2, 110), (3, 90), ...)

我们希望有一个包含第三列的数据集,其中包含第二列(值)中的值的运行总计。

((1, 100, 100), (2, 110, 210), (3, 90, 300), ...)

任何建议如何通过数据有效地做到这一点?或者是否有任何可用于此的罐装 CDF 类型函数?

如果需要,可以将 Dataset 转换为 Dataframe 或 RDD 来完成任务,但它必须保持分布式数据结构。也就是说,它不能简单地收集并转换为数组或序列,并且不能使用可变变量(仅限val,没有var)。

【问题讨论】:

【参考方案1】:

但它必须保持分布式数据结构。

不幸的是,你所说的你想要做的事情在 Spark 中是不可能的。如果您愿意将数据集重新分区到单个分区(实际上是将其合并到单个主机上),您可以轻松编写一个函数来做您想做的事情,将增量值保留为一个字段。

由于 Spark 函数在执行时不会在网络上共享状态,因此无法创建您需要保持数据集完全分布的共享状态。

如果您愿意放宽您的要求并允许在一台主机上一次性整合和读取数据,那么您可以通过重新分区到单个分区并应用功能来做您想做的事情。这不会将数据拉到驱动程序上(将其保存在 HDFS/集群中),但仍会在单个执行程序上串行计算输出。例如:

package com.github.nevernaptitsa

import java.io.Serializable
import java.util

import org.apache.spark.sql.Encoders, SparkSession

object SparkTest 

  class RunningSum extends Function[Int, Tuple2[Int, Int]] with Serializable 
    private var runningSum = 0
    override def apply(v1: Int): Tuple2[Int, Int] = 
      runningSum+=v1
      return (v1, runningSum)
    
  

  def main(args: Array[String]): Unit =
    val session = SparkSession.builder()
      .appName("runningSumTest")
      .master("local[*]")
      .getOrCreate()
    import session.implicits._
    session.createDataset(Seq(1,2,3,4,5))
      .repartition(1)
      .map(new RunningSum)
      .show(5)
    session.createDataset(Seq(1,2,3,4,5))
      .map(new RunningSum)
      .show(5)
  


这里的两个语句显示不同的输出,第一个提供正确的输出(串行,因为调用了repartition(1)),第二个提供不正确的输出,因为结果是并行计算的。

第一个语句的结果:

+---+---+
| _1| _2|
+---+---+
|  1|  1|
|  2|  3|
|  3|  6|
|  4| 10|
|  5| 15|
+---+---+

第二个语句的结果:

+---+---+
| _1| _2|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  5|  9|
+---+---+

【讨论】:

感谢您的详细回复,@Ed。我也有同样的怀疑。也许我应该为这个用例研究Greenwald-Khanna 类型的近似方法。 没问题@BahmanEngheta!如果您对我的回答感到满意,您介意将其标记为已接受吗?【参考方案2】:

一位同事提出了以下依赖于RDD.mapPartitionsWithIndex() 方法的建议。 (据我所知,其他数据结构不提供对其分区索引的这种引用。)

val data = sc.parallelize((1 to 5))  // sc is the SparkContext
val partialSums = data.mapPartitionsWithIndex (i, values) => 
    Iterator((i, values.sum))
.collect().toMap  // will in general have size other than data.count
val cumSums = data.mapPartitionsWithIndex (i, values) => 
    val prevSums = (0 until i).map(partialSums).sum
    values.scanLeft(prevSums)(_+_).drop(1)

【讨论】:

以上是关于在 Spark 数据集中创建具有运行总计的列的主要内容,如果未能解决你的问题,请参考以下文章

获取 Apache spark 数据集中包含的列的列数据类型

使用 Spark 过滤大型数据集中的列

如何对 Spark 数据集中的列进行舍入?

如何在 Spark 数据框中添加具有序列值的列?

将值转换为 Spark 数据集中的列(将列的键和值对转换为常规列)[重复]

合并在Apache spark中具有不同列名的两个数据集