在 R/sparkR 中加速大数据的 udf

Posted

技术标签:

【中文标题】在 R/sparkR 中加速大数据的 udf【英文标题】:speeding up udf on large data in R/sparkR 【发布时间】:2017-03-15 22:16:54 【问题描述】:

我使用 spark 2.1.0 在具有充足内存的单个 16 核节点上本地运行。我循环了大约 50 个文件,每个文件有 250M 行。在 UDF 阶段,我有一个包含两个字符串的 SparkDataFrame。我应用了一个 UDF,它只是一个表查找,将一列(单元格)替换为相应的“除法”(整数):

cellToDivision <- function(df) 
  ## convert dataframe with cell to the one based on division
  division <- distCell[as.character(df$cell)]
  data.frame(unname(df$msisdn),
             unname(division),
                       # remove names -> otherwise data.frame tries to
                       # use NA for rowname for missings and fails
             stringsAsFactors=FALSE,
             check.rows=FALSE)

distCell 是长度为 ~10000 的整数的命名向量。

我的问题是应用 UDF 的代码:

dapply(df, cellToDivision,
    structType(structField("msisdn", "string"),
               structField("division", "integer"))
    )

运行每个数据集大约需要 2 小时,速度非常慢。如果有 250 个分区,代码只是挂在那里,5000 个分区将在 4 小时内运行,50k 个分区在 2 小时内运行; 500k 个分区似乎给我每个文件 6 小时。

250M 行 x 2 列应该在此节点的能力范围内。我做错了吗? R 中的 UDF-s 在实际大小的数据集上总是那么慢吗?

【问题讨论】:

我知道我可以通过相应的合并来替换表查找。但是,以前我的印象是(在 pyspark 中)广播表格并将字典查找作为 UDF 是首选方式。 使用 UDF 不是首选,目前 R UDF 非常慢(serde 开销)。如果查找足够小,它应该在加入时自动广播(看起来 Spark R 目前没有 broadcast 绑定)。 谢谢,所以你基本上说我所看到的是人们对 R 的期望?我猜这是因为当前低效的序列化程序? 我不确定它是否是一致的行为。 R -> JVM 传输在 1.5 或 1.6 中显着减少,但为了更一致的行为,我猜我们必须使用 Arrow。 【参考方案1】:

仅作记录——这个特殊任务很容易通过合并替换 udf 中的表查找来解决:

cdrDist <- cdr %>%
     merge(cellDistrict, by="cell", all.x=TRUE)

其中cellDistrict 是一个包含列celldivision 的SparkDataFrame。现在运行一个数据集需要

【讨论】:

以上是关于在 R/sparkR 中加速大数据的 udf的主要内容,如果未能解决你的问题,请参考以下文章

Pandas UDF 函数在大数据上完成的时间异常长

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

大数据必知必会系列——萌新提问怎么定义HiveUDF函数?能否给个示例[新星计划]

大数据不是Hadoop,基础软件国产化替代加速

数据存储历史拐点,MIGO新机制加速大数据流转