数据集上的慢联合

Posted

技术标签:

【中文标题】数据集上的慢联合【英文标题】:Slow unions on a Dataset 【发布时间】:2018-05-21 14:45:48 【问题描述】:

我使用 Spark 2.1.1。我按小时循环对输入 DS (inputDs) 进行许多连接和选择,如下所示:

val myDs =  Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => 
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
).reduce(_.union(_))

def getDsForOneHour(ds: Dataset[I], year:Int, month:Int, day:Int, hour: Int)(implicit sql: SQLImplicits):Dataset[I]= 
ds.where(col("year") === year and col("month") ===  month and col("day") ===  day and col("hour") === hour)

我使用 spark-testing-base 运行该代码,完成一个月的操作大约需要 3 分钟(约 30*24 个联合和选择)。这些都是懒惰的操作,我想知道为什么 Spark 构建 myDs 需要这么多时间?

【问题讨论】:

这就是计算执行计划的成本。 Spark unionAll multiple dataframes 应该可以帮到你。 【参考方案1】:

我猜它很慢,因为执行计划会针对循环中联合的每个新数据集进行更新。您可以先重写代码来构建过滤器:

def getFilterForOneHour(year:Int, month:Int, day:Int, hour: Int): Column = 
  col("year") === year and col("month") ===  month and col("day") ===  day and col("hour") === hour
 


val myFilter =  Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => 
getFilterForOneHour(next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
).reduce(_ or _)

val myDs = inputDs.where(myFilter)

编辑: 您还可以尝试进行分组联合(我的情况是批量大小为 50)。我已经运行了一些虚拟内存数据集的测试,在我的情况下,这将性能提高了 8 倍:

val myDs =  Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => 
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
)
.grouped(50).map(dss => dss.reduce(_ union _))
.reduce(_ union _)

【讨论】:

这是一个聪明的解决方案,但我担心它可能无法为我的底层 CassandraDB 存储 PK(年、月、日)创建正确的下推过滤器。但我肯定会针对 DB 检查该解决方案。 有趣!我还注意到 union 工作得很快,直到它达到 X 次迭代,然后它显着减慢。将其分组意味着合并小块并最终加速整个计算。我还检查了为该分组联合生成的 DAG,它们与我的原始版本完全相同。似乎 Spark 在一次处理多个联合方面存在一些内部问题,可能应该在未来的 Spark 版本中报告为需要改进的地方。我非常感谢您的帮助。

以上是关于数据集上的慢联合的主要内容,如果未能解决你的问题,请参考以下文章

python计算多个模型在不同数据集上的预测概率获取每个数据集上的最优模型多个最优模型的ROC曲线进行对比分析

稀疏数据集上的光谱聚类

不平衡数据集上的一类文本分类

报告服务:连接数据集上的所有字段

大型数据集上的核心数据获取请求缓慢

为啥这个 Iris 数据集上的 silhouette_score 总是返回 0?