使用 Spark 过滤大型数据集中的列

Posted

技术标签:

【中文标题】使用 Spark 过滤大型数据集中的列【英文标题】:Filter columns in large dataset with Spark 【发布时间】:2015-09-08 14:30:03 【问题描述】:

我有一个 1,000,000 行 x 大约 390,000 列的数据集。字段都是二进制的,要么是0,要么是1。数据很稀疏。

我一直在使用 Spark 来处理这些数据。我当前的任务是过滤数据——我只想要预先选择的 1000 列中的数据。这是我用来完成此任务的当前代码:

val result = bigdata.map(_.zipWithIndex.filtercase (value, index) => selectedColumns.contains(index))

bigdata 只是一个RDD[Array[Int]]

但是,此代码需要很长时间才能运行。我确信有一种更有效的方法来过滤我的数据集,它不涉及单独进入和过滤每一行。将我的数据加载到 DataFrame 中,并通过 DataFrame API 进行操作会使事情变得更快/更容易吗?我应该研究基于列存储的数据库吗?

【问题讨论】:

【参考方案1】:

您可以先提高过滤器的效率。请注意:

您的RDD 包含Array[Int]。这意味着您可以在 O(1) 时间内访问每一行的第 n 个元素 #selectedColumns

考虑到这两个事实,很明显迭代每一行的所有元素是没有意义的,更不用说contains 调用了。相反,您可以简单地将map 替换为selectedColumns

// Optional if selectedColumns are not ordered
val orderedSelectedColumns = selectedColumns.toList.sorted.toArray
rdd.map(row => selectedColumns.map(row))

比较时间复杂度:

    zipWithIndex + filter(假设 containsO(1) 时的最佳情况) - O(#rows * # columns) map - O(#rows * #selectedColumns)

【讨论】:

我必须使用selectedColumns.map(row)).deep,这样当我将结果保存为文本文件时,实际数据会被保存(而不是内存位置)【参考方案2】:

加速执行的最简单方法是使用 partitionBy 并行化它:

bigdata.partitionBy(new HashPartitioner(numPartitions)).foreachPartition(...)

foreachPartition 接收一个迭代器,您可以在其上进行映射和过滤。

numPartitions 是一个 val,您可以设置所需的并行分区数量。

【讨论】:

所以你是说简单地重新分区到更多的分区会加快我的数据过滤速度? 是的,如果满足以下条件:a) 您的数据源是可拆分的(例如 HDFS 可分块)并且 b) 您指定了大量的执行器 - 这个数字应该反映节点的数量在您的集群中并反映您的分区

以上是关于使用 Spark 过滤大型数据集中的列的主要内容,如果未能解决你的问题,请参考以下文章

使用空数据集的Spark SQL连接会导致更大的输出文件大小

在 Spark 数据集中创建具有运行总计的列

获取 Apache spark 数据集中包含的列的列数据类型

如何对 Spark 数据集中的列进行舍入?

如何将具有值的列添加到 Spark Java 中的新数据集?

您可以将公式应用于数据透视表的计数过滤器吗?尝试在大型数据集中查找重复项