如果在之前进行预处理,则数据处理时间太长

Posted

技术标签:

【中文标题】如果在之前进行预处理,则数据处理时间太长【英文标题】:Data-processing takes too long if pre-processing just before 【发布时间】:2017-08-23 07:51:16 【问题描述】:

所以我一直在尝试对数据集执行 cumsum 操作。我想强调的是,我希望我的 cumsum 发生在我的数据集的分区上(例如,随着时间的推移,对于 personA 的 feature1 的 cumsum)。

我知道该怎么做,而且它可以完美地“独立”运行——我稍后会解释这部分。这是执行此操作的代码:

// it's admitted that this DF contains all data I need
// with one column/possible value, with only 1/0 in each line
// 1 <-> feature has the value
// 0 <-> feature doesn't contain the value
// this DF is the one I get after the one-hot operation
// this operation is performed to apply ML algorithms on features
// having simultaneously multiple values
df_after_onehot.createOrReplaceTempView("test_table")

// @param DataFrame containing all possibles values eg. A, B, C
def cumSumForFeatures(values: DataFrame) = 
  values
    .map(value => "CAST(sum(" + value(0) + ") OVER (PARTITION BY person ORDER BY date) as Integer) as sum_" + value(0))
    .reduce(_+ ", " +_)


val req = "SELECT *, " + cumSumForFeatures(possible_segments) + " FROM test_table"
// val req = "SELECT * FROM test_table"
println("executing: " + req)

val data_after_cumsum = sqLContext.sql(req).orderBy("person", "date")
data_after_cumsum.show(10, false)

当我尝试使用之前的一些预处理执行相同的操作(例如 one-hot 操作或之前添加计算特征)时,就会出现问题。我尝试了一个非常小的数据集,但它不起作用。

这是打印的堆栈跟踪(至少应该让您感兴趣的部分):

Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

[Executor task launch worker-3] ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded

所以它似乎与 GC 问题/JVM 堆大小有关?我只是不明白它与我的预处理有什么关系?

我尝试在不再使用的 DF 上执行 unpersist 操作。 我尝试修改我机器上的选项(例如 -Xmx2048m)。 在 AWS 上部署后,问题仍然存在。

我的 pom.xml 的摘录(适用于 Java、Spark、Scala 版本):

<spark.version>2.1.0</spark.version>
<scala.version>2.10.4</scala.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

您知道如何解决我的问题吗? 谢谢

【问题讨论】:

@som-snytt 我更新了提供 Scala、jdk 和 Spark 版本的问题 Spark 作业因 OOM 失败的方式有多种:一种常见的模式是在驱动程序上收集过多数据,或者在一组后的单个工作人员上收集过多数据被过度代表。如果没有有关您的转换的更多详细信息,很难为您提供帮助。作为起点,您应该查看 Spark GUI 并检查数据量是否在执行程序之间均匀分布。 @FurryMachine 谢谢!我一直在研究这个,但是当我在本地启动我的工作时也会发生这种情况。当我通过预处理(包括简单的特征计算 + one-hot)启动我的工作时会发生这种情况,如果我自己启动它就不会发生(我预处理,我将数据存储在我的驱动器上,然后启动另一个执行 cum-sum 的 JVM 实例)。我正在尝试执行的转换是一个 cum-sum 操作(每个人的每个特征随时间的累积和,它应该在 O(n) 中完成)。 哎呀,这样的事情可能发生在 Spark 上,而且没有太多工作可以找到原因。我猜想在链接操作时会发生某种不幸的优化。如果检查点您的中间结果有效,我会说“去做”。有时,仅仅添加一个persist() 我们有时只是一个show(),也可能会改变优化器的行为。这有时会让人非常沮丧。这就是为什么我总是喜欢简单的步骤并存储中间结果,这样我就可以分别查看、调试和修复每个部分。但这并不总是很有意义。 【参考方案1】:

据我了解,我认为我们可能有两个原因:

JVM 的堆溢出,因为保留在内存中但不再使用的数据帧 cum-sum 请求可能太大而无法用剩余的少量 RAM 处理 显示/打印操作会增加作业所需的步骤数,并且可能会干扰 Spark 的内部优化

考虑到这一点,我决定“取消持久化”不再使用的 DataFrame。这似乎没有太大变化。

然后,我决定删除所有不必要的显示/打印操作。这大大提高了步数。

我将代码更改为更实用,但我保留了 3 个单独的值以帮助调试。这并没有太大变化,但我的代码更干净了。

最后,这是帮助我解决问题的东西。我没有让我的请求一次性通过数据集,而是将特征列表划分为多个切片:

def listOfSlices[T](list: List[T], sizeOfSlices: Int): List[List[T]] =
    (for (i <- 0 until list.length by sizeOfSlices) yield list.slice(i, i+sizeOfSlices)).toList

我对每个切片执行请求,使用映射操作。然后我将它们连接在一起,得到我的最终 DataFrame。这样,我有点分散计算,而且似乎这种方式更有效。

val possible_features_slices = listOfSlices[String](possible_features, 5)

val df_cum_sum = possible_features_slices
  .map(possible_features_slice =>
    dfWithCumSum(sqLContext, my_df, possible_segments_slice, "feature", "time")) // operation described in the original post
  .foldLeft[DataFrame](null)((a, b) => if (a == null) b else if (b == null) a else a.join(b, Seq("person", "list_features", "time")))

我只是真的想强调一下,我仍然不了解我的问题背后的原因,我仍然期待这个级别的答案。

【讨论】:

以上是关于如果在之前进行预处理,则数据处理时间太长的主要内容,如果未能解决你的问题,请参考以下文章

在应用程序启动之前执行完成处理程序

Solr 索引花费的时间太长

MySQL对时间戳的转换处理

如果不存在文件,则处理 gsutil ls 和 rm 命令错误

对于PyCharm某些库没有自动提示的处理

登录token过期时间