如何对 Spark 数据集中的列进行舍入?

Posted

技术标签:

【中文标题】如何对 Spark 数据集中的列进行舍入?【英文标题】:How can I round a column in a Spark Dataset? 【发布时间】:2017-07-11 17:03:16 【问题描述】:

使用 Scala Spark,我如何使用类型化 Dataset API 对聚合列进行舍入?

另外,如何通过groupby操作保留数据集的类型?

这是我目前拥有的:

case class MyRow(
  k1: String,
  k2: String,
  c1: Double,
  c2: Double
)

def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = 
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(row => (row.k1, row.k2))
  .agg(
    avg(_.c1),
    avg(_.c2)
  )
  .map(r => MyRow(r._1._1, r._1._2, r._2, r._3))

    如果我将avg(_.c1) 替换为round(avg(_.c1)),则会出现类型错误。对我的值进行四舍五入的正确方法是什么? .map(...) 行感觉不对 - 是否有更优雅的方式来保留我的数据集类型?

谢谢!

【问题讨论】:

【参考方案1】:

虽然接受的答案有效且更通用,但在这种情况下,您也可以使用圆形。您只需要在使用.as[T] 进行舍入后输入类型(还需要将类型定义为 avg)。

.agg(
  // Alternative ways to define a type to avg
  round(avg((r: MyRow) => r.c1)).as[Double],
  round(avg[MyRow](_.c2)).as[Double]
)

【讨论】:

【参考方案2】:

使用round 确实会因类型错误而失败,因为agg 需要TypedColumn[IN, OUT] 类型的聚合函数,而round 提供Column(适合在DataFrames 上使用)。

您需要的是一个四舍五入的聚合函数,它在org.apache.spark.sql.expressions.scalalang.typed._ 中没有提供 - 但您可以通过扩展执行平均聚合的类来轻松创建一个:

// Extend TypedAverage - round the result before returning it
class TypedRoundAverage[IN](f: IN => Double) extends TypedAverage[IN](f) 
  override def finish(reduction: (Double, Long)): Double = math.round(super.finish(reduction))


// A nice wrapper to create the TypedRoundAverage for a given function  
def roundAvg[IN](f: IN => Double): TypedColumn[IN, Double] = new TypedRoundAverage(f).toColumn

// Now you can use "roundAvg" instead of "round"  
def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = 
  ds.groupByKey(row => (row.k1, row.k2))
    .agg(
      roundAvg(_.c1),
      roundAvg(_.c2)
    )
    .map  case ((k1, k2), c1, c2) => MyRow(k1, k2, c1, c2)  // just a nicer way to put it

我看不到摆脱 map 操作的方法,因为 group-by 必然返回一个元组,但使用模式匹配可以使它更好一点

【讨论】:

以上是关于如何对 Spark 数据集中的列进行舍入?的主要内容,如果未能解决你的问题,请参考以下文章

如何将具有值的列添加到 Spark Java 中的新数据集?

获取 Apache spark 数据集中包含的列的列数据类型

使用 Spark 过滤大型数据集中的列

如何将一行与 spark 数据集中的所有其他行进行比较?

将值转换为 Spark 数据集中的列(将列的键和值对转换为常规列)[重复]

使用空数据集的Spark SQL连接会导致更大的输出文件大小