数据集上的慢联合
Posted
技术标签:
【中文标题】数据集上的慢联合【英文标题】:Slow unions on a Dataset 【发布时间】:2018-05-21 14:45:48 【问题描述】:我使用 Spark 2.1.1。我按小时循环对输入 DS (inputDs) 进行许多连接和选择,如下所示:
val myDs = Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next =>
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
).reduce(_.union(_))
def getDsForOneHour(ds: Dataset[I], year:Int, month:Int, day:Int, hour: Int)(implicit sql: SQLImplicits):Dataset[I]=
ds.where(col("year") === year and col("month") === month and col("day") === day and col("hour") === hour)
我使用 spark-testing-base 运行该代码,完成一个月的操作大约需要 3 分钟(约 30*24 个联合和选择)。这些都是懒惰的操作,我想知道为什么 Spark 构建 myDs 需要这么多时间?
【问题讨论】:
这就是计算执行计划的成本。 Spark unionAll multiple dataframes 应该可以帮到你。 【参考方案1】:我猜它很慢,因为执行计划会针对循环中联合的每个新数据集进行更新。您可以先重写代码来构建过滤器:
def getFilterForOneHour(year:Int, month:Int, day:Int, hour: Int): Column =
col("year") === year and col("month") === month and col("day") === day and col("hour") === hour
val myFilter = Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next =>
getFilterForOneHour(next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
).reduce(_ or _)
val myDs = inputDs.where(myFilter)
编辑: 您还可以尝试进行分组联合(我的情况是批量大小为 50)。我已经运行了一些虚拟内存数据集的测试,在我的情况下,这将性能提高了 8 倍:
val myDs = Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next =>
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
)
.grouped(50).map(dss => dss.reduce(_ union _))
.reduce(_ union _)
【讨论】:
这是一个聪明的解决方案,但我担心它可能无法为我的底层 CassandraDB 存储 PK(年、月、日)创建正确的下推过滤器。但我肯定会针对 DB 检查该解决方案。 有趣!我还注意到 union 工作得很快,直到它达到 X 次迭代,然后它显着减慢。将其分组意味着合并小块并最终加速整个计算。我还检查了为该分组联合生成的 DAG,它们与我的原始版本完全相同。似乎 Spark 在一次处理多个联合方面存在一些内部问题,可能应该在未来的 Spark 版本中报告为需要改进的地方。我非常感谢您的帮助。以上是关于数据集上的慢联合的主要内容,如果未能解决你的问题,请参考以下文章