在 R/sparkR 中加速大数据的 udf
Posted
技术标签:
【中文标题】在 R/sparkR 中加速大数据的 udf【英文标题】:speeding up udf on large data in R/sparkR 【发布时间】:2017-03-15 22:16:54 【问题描述】:我使用 spark 2.1.0 在具有充足内存的单个 16 核节点上本地运行。我循环了大约 50 个文件,每个文件有 250M 行。在 UDF 阶段,我有一个包含两个字符串的 SparkDataFrame。我应用了一个 UDF,它只是一个表查找,将一列(单元格)替换为相应的“除法”(整数):
cellToDivision <- function(df)
## convert dataframe with cell to the one based on division
division <- distCell[as.character(df$cell)]
data.frame(unname(df$msisdn),
unname(division),
# remove names -> otherwise data.frame tries to
# use NA for rowname for missings and fails
stringsAsFactors=FALSE,
check.rows=FALSE)
distCell
是长度为 ~10000 的整数的命名向量。
我的问题是应用 UDF 的代码:
dapply(df, cellToDivision,
structType(structField("msisdn", "string"),
structField("division", "integer"))
)
运行每个数据集大约需要 2 小时,速度非常慢。如果有 250 个分区,代码只是挂在那里,5000 个分区将在 4 小时内运行,50k 个分区在 2 小时内运行; 500k 个分区似乎给我每个文件 6 小时。
250M 行 x 2 列应该在此节点的能力范围内。我做错了吗? R 中的 UDF-s 在实际大小的数据集上总是那么慢吗?
【问题讨论】:
我知道我可以通过相应的合并来替换表查找。但是,以前我的印象是(在 pyspark 中)广播表格并将字典查找作为 UDF 是首选方式。 使用 UDF 不是首选,目前 R UDF 非常慢(serde 开销)。如果查找足够小,它应该在加入时自动广播(看起来 Spark R 目前没有broadcast
绑定)。
谢谢,所以你基本上说我所看到的是人们对 R 的期望?我猜这是因为当前低效的序列化程序?
我不确定它是否是一致的行为。 R -> JVM 传输在 1.5 或 1.6 中显着减少,但为了更一致的行为,我猜我们必须使用 Arrow。
【参考方案1】:
仅作记录——这个特殊任务很容易通过合并替换 udf 中的表查找来解决:
cdrDist <- cdr %>%
merge(cellDistrict, by="cell", all.x=TRUE)
其中cellDistrict
是一个包含列cell
和division
的SparkDataFrame。现在运行一个数据集需要
【讨论】:
以上是关于在 R/sparkR 中加速大数据的 udf的主要内容,如果未能解决你的问题,请参考以下文章
使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群
使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群