使用 Scala 从 Spark 中列的一系列值中汇总为一个新列

Posted

技术标签:

【中文标题】使用 Scala 从 Spark 中列的一系列值中汇总为一个新列【英文标题】:Sum up into a new column from a range of values of a column in Spark using Scala 【发布时间】:2018-02-07 07:01:45 【问题描述】:

我有一个如下所示的数据框

articles
10
99
101
101
10005
1000001
1000001

我想要如下的输出数据框

range              sum
1-100              109
101-10000          202
10001-1000000      10005
1000001-100000000  2000002
...                ...

如何实现这一点。我是 spark 和 scala 的新手。

【问题讨论】:

总和背后的逻辑尚不清楚。范围代表什么?这是什么的总和? @eliasah - 'range' 我的意思是文章栏的范围。此范围内的 1-100 两行分别为 10 和 99,它们的总和为 109 好的。你试过什么吗?我建议您首先找到您的值所在的范围,然后按范围分组并总结文章 我没有尝试过。你能帮帮我吗? 我给了你一个建议。 【参考方案1】:

我建议您首先使用 when/otherwise 找到值的范围,然后您可以按该 range 分组并在 articles 上执行 sum 聚合:

import org.apache.spark.sql.functions._

df.withColumn("range", 
          when($"articles" >  0 and $"articles" <= 100, lit("1-100"))
            .otherwise(
              when($"articles" > 100 and $"articles" <= 10000, lit("101-10000")).otherwise(lit("others"))
            )
         ).groupBy("range").agg(sum($"articles")).orderBy("range").show

// +---------+-------------+
// |    range|sum(articles)|
// +---------+-------------+
// |    1-100|          109|
// |101-10000|          202|
// |   others|      2010007|
// +---------+-------------+

【讨论】:

【参考方案2】:

我会使用 UDF 对文章进行分类(分桶),然后使用普通的 groupBy().agg() 来计算总和。

case class Bucket(start: Long, end: Long) 
  def contains(l: Long) = start <= l && end >= l
  override def toString: String = s"$start - $end"


val buckets = Seq(
  Bucket(1L, 100L),
  Bucket(101L, 10000L),
  Bucket(10001L, 100000L),
  Bucket(1000001L, 10000000L)
)

val bucketize = udf((l: Long) => buckets.find(_.contains(l)).map(_.toString))

df
  .withColumn("bucket", bucketize($"article"))
  .groupBy($"bucket")
  .agg(
    sum($"article").as("sum")
  )

【讨论】:

【参考方案3】:

您可以在Dataset 上使用groupByKey 方法轻松定义键控,而不是像通常使用groupBy 那样按单个列值进行分组。以下示例可以在您的spark-shell 上运行,否则请记住创建您的SparkSessionimport org.apache.spark.sql.functions.sum

// relevant types: one for actual data, the other to define ranges
final case class Data(articles: Int)
final case class Range(from: Int, to: Int)

// the data we want to process
val dataset = spark.createDataset(
  Seq(Data(10), Data(99), Data(101), Data(101), Data(10005), Data(1000001), Data(1000001)))

// the ranges we wanto _bucket_ our data in
val ranges = spark.sparkContext.broadcast(
  Seq(Range(1, 100), Range(101, 10000), Range(10001, 1000000), Range(1000001, 100000000)))

// the actual operation: group by range and sum the values in each bucket
dataset.groupByKey 
  d =>
    ranges.value.find(r => d.articles >= r.from && d.articles <= r.to).orNull
.agg(sum("articles").as[Long])

这将是这段 sn-p 代码的输出:

+-------------------+-------------+
|                key|sum(articles)|
+-------------------+-------------+
|            [1,100]|          109|
|        [101,10000]|          202|
|    [10001,1000000]|        10005|
|[1000001,100000000]|      2000002|
+-------------------+-------------+

我们做了什么:

定义并广播一组您想要作为键值的范围 使用广播的范围集将数据存储到范围中 sum by articles 并将结果转换为 Long(键入 Datasets 时需要)

不属于特定存储桶的数据将被分组到具有null范围的行中。

请注意我使用了 bucket 这个词来传达按范围分组的含义,但这与 Hive 分桶无关(在尝试优化 Spark 上的连接时,您可能会听到很多)。

【讨论】:

以上是关于使用 Scala 从 Spark 中列的一系列值中汇总为一个新列的主要内容,如果未能解决你的问题,请参考以下文章

使用 parquet 格式附加 Apache Spark 中列的描述

Apache Spark(Java)中列的自定义处理

Spark Scala聚合函数,用于查找组中列值的出现次数

我可以更改 Spark 数据框中列的可空性吗?

无法使用 spark scala 从数据集中的行中获取第一列的值

使用 lapply 更改列表元素中列的格式