使用 SparkR 的逐行计算

Posted

技术标签:

【中文标题】使用 SparkR 的逐行计算【英文标题】:Rowwise calculation using SparkR 【发布时间】:2018-11-01 19:00:09 【问题描述】:

这是我的玩具数据框。

library(tibble); library(SparkR)

df <- tibble::tribble(
  ~var1, ~var2, ~maxofvar1var2,
  1L,    1L,    1L,
  2L,    1L,    2L,
  2L,    3L,    3L,
  NA,    2L,    2L,
  1L,    4L,    4L,
  8L,    5L,    8L)

df <- df %>% as.DataFrame()

如何使用 SparkR 计算行计算以获得 var1 和 var2 的最大值,如上面 df 中的第三个变量所示?如果 SparkR 中没有 rowwise 函数,我怎样才能得到想要的输出?

【问题讨论】:

【参考方案1】:

一组列中的To get a maximum value 使用SparkR::greatest

df %>% withColumn("maxOfVars", greatest(df$var1, df$var2))

以及一般情况下的高阶函数,例如 aggregate(Spark 2.4 或更高版本),用于组装数据。

df %>% withColumn("theLastVar", expr("aggregate(array(var1, var2), (x, y) -> y)"))

或(与版本无关的)表达式组合:

scols <- c("var1", "var2") %>% purrr::map(column)

sumOfVars <- scols %>%
  purrr::map(function(x) coalesce(x, lit(0)))  %>%
  purrr::reduce(function(x, y) x + y, .init=lit(0))

countOfVars <- scols %>% 
  purrr::map(function(x) ifelse(isNotNull(x), lit(1), lit(0))) %>%
  purrr::reduce(
    function(x, y) x + y, .init=lit(0))

df %>% withColumn("meanOfVars", sumOfVars / countOfVars)

【讨论】:

Greatest 是一个有趣的函数名称!谢谢!

以上是关于使用 SparkR 的逐行计算的主要内容,如果未能解决你的问题,请参考以下文章

PySpark DataFrame的逐行聚合

来自谷歌云视觉 API OCR 的逐行数据

使用 SparkR 计算地理距离

Numpy 中的逐行索引

pandas 获得两列或多列的逐行最小值

大文件的逐行操作