使用 Scala 从 Spark 中列的一系列值中汇总为一个新列
Posted
技术标签:
【中文标题】使用 Scala 从 Spark 中列的一系列值中汇总为一个新列【英文标题】:Sum up into a new column from a range of values of a column in Spark using Scala 【发布时间】:2018-02-07 07:01:45 【问题描述】:我有一个如下所示的数据框
articles
10
99
101
101
10005
1000001
1000001
我想要如下的输出数据框
range sum
1-100 109
101-10000 202
10001-1000000 10005
1000001-100000000 2000002
... ...
如何实现这一点。我是 spark 和 scala 的新手。
【问题讨论】:
总和背后的逻辑尚不清楚。范围代表什么?这是什么的总和? @eliasah - 'range' 我的意思是文章栏的范围。此范围内的 1-100 两行分别为 10 和 99,它们的总和为 109 好的。你试过什么吗?我建议您首先找到您的值所在的范围,然后按范围分组并总结文章 我没有尝试过。你能帮帮我吗? 我给了你一个建议。 【参考方案1】:我建议您首先使用 when
/otherwise
找到值的范围,然后您可以按该 range
分组并在 articles
上执行 sum
聚合:
import org.apache.spark.sql.functions._
df.withColumn("range",
when($"articles" > 0 and $"articles" <= 100, lit("1-100"))
.otherwise(
when($"articles" > 100 and $"articles" <= 10000, lit("101-10000")).otherwise(lit("others"))
)
).groupBy("range").agg(sum($"articles")).orderBy("range").show
// +---------+-------------+
// | range|sum(articles)|
// +---------+-------------+
// | 1-100| 109|
// |101-10000| 202|
// | others| 2010007|
// +---------+-------------+
【讨论】:
【参考方案2】:我会使用 UDF 对文章进行分类(分桶),然后使用普通的 groupBy().agg()
来计算总和。
case class Bucket(start: Long, end: Long)
def contains(l: Long) = start <= l && end >= l
override def toString: String = s"$start - $end"
val buckets = Seq(
Bucket(1L, 100L),
Bucket(101L, 10000L),
Bucket(10001L, 100000L),
Bucket(1000001L, 10000000L)
)
val bucketize = udf((l: Long) => buckets.find(_.contains(l)).map(_.toString))
df
.withColumn("bucket", bucketize($"article"))
.groupBy($"bucket")
.agg(
sum($"article").as("sum")
)
【讨论】:
【参考方案3】:您可以在Dataset
上使用groupByKey
方法轻松定义键控,而不是像通常使用groupBy
那样按单个列值进行分组。以下示例可以在您的spark-shell
上运行,否则请记住创建您的SparkSession
和import org.apache.spark.sql.functions.sum
:
// relevant types: one for actual data, the other to define ranges
final case class Data(articles: Int)
final case class Range(from: Int, to: Int)
// the data we want to process
val dataset = spark.createDataset(
Seq(Data(10), Data(99), Data(101), Data(101), Data(10005), Data(1000001), Data(1000001)))
// the ranges we wanto _bucket_ our data in
val ranges = spark.sparkContext.broadcast(
Seq(Range(1, 100), Range(101, 10000), Range(10001, 1000000), Range(1000001, 100000000)))
// the actual operation: group by range and sum the values in each bucket
dataset.groupByKey
d =>
ranges.value.find(r => d.articles >= r.from && d.articles <= r.to).orNull
.agg(sum("articles").as[Long])
这将是这段 sn-p 代码的输出:
+-------------------+-------------+
| key|sum(articles)|
+-------------------+-------------+
| [1,100]| 109|
| [101,10000]| 202|
| [10001,1000000]| 10005|
|[1000001,100000000]| 2000002|
+-------------------+-------------+
我们做了什么:
定义并广播一组您想要作为键值的范围 使用广播的范围集将数据存储到范围中sum
by articles
并将结果转换为 Long
(键入 Dataset
s 时需要)
不属于特定存储桶的数据将被分组到具有null
范围的行中。
请注意我使用了 bucket 这个词来传达按范围分组的含义,但这与 Hive 分桶无关(在尝试优化 Spark 上的连接时,您可能会听到很多)。
【讨论】:
以上是关于使用 Scala 从 Spark 中列的一系列值中汇总为一个新列的主要内容,如果未能解决你的问题,请参考以下文章
使用 parquet 格式附加 Apache Spark 中列的描述