使用 scala 基于 Spark DataFrame 中现有列的聚合添加新列
Posted
技术标签:
【中文标题】使用 scala 基于 Spark DataFrame 中现有列的聚合添加新列【英文标题】:Adding new Columns based on aggregation on existing column in Spark DataFrame using scala 【发布时间】:2016-05-09 03:50:03 【问题描述】:我有一个如下所示的 DataFrame。我需要根据现有列创建一个新列。
col1 col2
a 1
a 2
b 1
c 1
d 1
d 2
输出数据框如下所示
col1 col2 col3 col4
a 1 1 2
a 2 1 2
b 1 0 1
c 1 0 1
d 1 1 2
d 2 1 2
我用来查找 col3 的逻辑是 如果 col1 的计数 > 1 并且 col4 是 col2 的最大值。
我熟悉如何在 sql 中执行此操作。但是很难找到使用数据帧 DSL 的解决方案。任何帮助,将不胜感激。谢谢
【问题讨论】:
【参考方案1】:groupBy col1 和聚合得到count 和max。然后您可以join 将其与原始数据框一起返回以获得您想要的结果
val df2 = df1.groupBy("col1").agg(count() as col3, max("col2") as col4)
val df3 = df1.join(df2, "col1")
【讨论】:
,+1 表示加入和组概念。只是为了澄清 col3 不是 col2 的总和。它是 col2 的计数。如果 col2 >1 它应该是 1 否则它应该是零。没有加入有什么办法吗?当我在大量数据中使用连接时,我面临内存错误。谢谢 是的,我也想知道没有加入的解决方案【参考方案2】:spark df 有一个名为 withColumn 您可以根据需要添加任意数量的派生列。但是该列并未添加到现有的 DF 中,而是创建了一个新的 DF 并添加了列。
例如为数据添加静态日期
val myFormattedData = myData.withColumn("batchdate",addBatchDate(myData("batchdate")))
val addBatchDate = udf (BatchDate: String) => "20160101"
【讨论】:
【参考方案3】:要添加 col3,您可以使用 withcolumn + when/otherwise :
val df2 = df.withColumn("col3",when($"col2" > 1, 1).otherwise(0))
要添加 col4,已经提到的 groupBy/max + join 应该可以完成这项工作:
val df3 = df2.join(df.groupBy("col1").max("col2"), "col1")
【讨论】:
【参考方案4】:要在没有连接的情况下实现这一点,您需要使用count
和max
作为窗口函数。这需要使用Window
创建一个窗口,并告诉count
和max
对这个窗口进行操作。
from pyspark.sql import Window, functions as fn
df = sc.parallelize([
'col1': 'a', 'col2': 1,
'col1': 'a', 'col2': 2,
'col1': 'b', 'col2': 1,
'col1': 'c', 'col2': 1,
'col1': 'd', 'col2': 1,
'col1': 'd', 'col2': 2
]).toDF()
col1_window = Window.partitionBy('col1')
df = df.withColumn('col3', fn.when(fn.count('col1').over(col1_window) > 1, 1).otherwise(0))
df = df.withColumn('col4', fn.max('col2').over(col1_window))
df.orderBy(['col1', 'col2']).show()
【讨论】:
以上是关于使用 scala 基于 Spark DataFrame 中现有列的聚合添加新列的主要内容,如果未能解决你的问题,请参考以下文章
通过python扩展spark mllib 算法包(e.g.基于spark使用孤立森林进行异常检测)
如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]