使用 Spark 过滤大型数据集中的列
Posted
技术标签:
【中文标题】使用 Spark 过滤大型数据集中的列【英文标题】:Filter columns in large dataset with Spark 【发布时间】:2015-09-08 14:30:03 【问题描述】:我有一个 1,000,000 行 x 大约 390,000 列的数据集。字段都是二进制的,要么是0,要么是1。数据很稀疏。
我一直在使用 Spark 来处理这些数据。我当前的任务是过滤数据——我只想要预先选择的 1000 列中的数据。这是我用来完成此任务的当前代码:
val result = bigdata.map(_.zipWithIndex.filtercase (value, index) => selectedColumns.contains(index))
bigdata
只是一个RDD[Array[Int]]
但是,此代码需要很长时间才能运行。我确信有一种更有效的方法来过滤我的数据集,它不涉及单独进入和过滤每一行。将我的数据加载到 DataFrame 中,并通过 DataFrame API 进行操作会使事情变得更快/更容易吗?我应该研究基于列存储的数据库吗?
【问题讨论】:
【参考方案1】:您可以先提高过滤器的效率。请注意:
您的RDD
包含Array[Int]
。这意味着您可以在 O(1) 时间内访问每一行的第 n 个元素
#selectedColumns
考虑到这两个事实,很明显迭代每一行的所有元素是没有意义的,更不用说contains
调用了。相反,您可以简单地将map
替换为selectedColumns
// Optional if selectedColumns are not ordered
val orderedSelectedColumns = selectedColumns.toList.sorted.toArray
rdd.map(row => selectedColumns.map(row))
比较时间复杂度:
zipWithIndex
+ filter
(假设 contains
为 O(1) 时的最佳情况) - O(#rows * # columns)
map
- O(#rows * #selectedColumns)
【讨论】:
我必须使用selectedColumns.map(row)).deep
,这样当我将结果保存为文本文件时,实际数据会被保存(而不是内存位置)【参考方案2】:
加速执行的最简单方法是使用 partitionBy 并行化它:
bigdata.partitionBy(new HashPartitioner(numPartitions)).foreachPartition(...)
foreachPartition 接收一个迭代器,您可以在其上进行映射和过滤。
numPartitions 是一个 val,您可以设置所需的并行分区数量。
【讨论】:
所以你是说简单地重新分区到更多的分区会加快我的数据过滤速度? 是的,如果满足以下条件:a) 您的数据源是可拆分的(例如 HDFS 可分块)并且 b) 您指定了大量的执行器 - 这个数字应该反映节点的数量在您的集群中并反映您的分区以上是关于使用 Spark 过滤大型数据集中的列的主要内容,如果未能解决你的问题,请参考以下文章
使用空数据集的Spark SQL连接会导致更大的输出文件大小
获取 Apache spark 数据集中包含的列的列数据类型