Spark createDataFrame(df.rdd, df.schema) vs checkPoint 打破血统

Posted

技术标签:

【中文标题】Spark createDataFrame(df.rdd, df.schema) vs checkPoint 打破血统【英文标题】:Spark createDataFrame(df.rdd, df.schema) vs checkPoint for breaking lineage 【发布时间】:2019-09-02 00:37:52 【问题描述】:

我正在使用

val df=longLineageCalculation(....)
val newDf=sparkSession.createDataFrame(df.rdd, df.schema)
newDf.join......

为了在计算计划时节省时间,但是文档说检查点是“切割”血统的建议方法。但我不想付出将 RDD 保存到磁盘的代价.

我的过程是一个批处理过程,它的时间不长,可以毫无问题地重新启动,所以检查点对我没有好处(我认为)。

使用“我的”方法会出现什么问题? (文档建议使用更昂贵的检查点,而不是这种用于破坏血统的检查点,我想知道原因)

我只能猜测,如果某个节点在我的“血统中断”之后出现故障,也许我的进程会失败,而检查点的进程会正常工作? (如果 DF 被缓存而不是检查点呢?)

谢谢!

编辑:

来自 SMaZ 的回答,我自己的知识和他提供的文章。使用 createDataframe (这是一个 Dev-API,因此使用“我的”/您自己的风险)会将沿袭保留在内存中(对我来说不是问题,因为我没有内存问题并且沿袭不大)。

有了这个,看起来(未经 100% 测试)Spark 应该能够在失败时重建所需的一切。

由于我没有在以下执行中使用数据,因此我将使用 cache+createDataframe 与检查点(如果我没记错的话,是 实际上是缓存+saveToHDFS+"createDataFrame")。

我的流程并不那么关键(如果它崩溃了),因为用户总是期待结果并且他们手动启动它,所以如果它出现问题,他们可以重新启动(+Spark 将重新启动它)或打电话给我,所以无论如何我可以承担一些风险,但我 99% 确信没有风险:)

【问题讨论】:

你看过这个question吗? 是的,我做到了,但没有答案,我想知道这种方法的缺点,因为官方文档建议使用较慢的方法。 虽然重读,但迭代读取似乎需要检查点,这种方法对于独立读取应该没问题(来自 Garren 的评论)。但是,我仍然很好奇,如果在 createDataFrame 提供的“InMemoryTable”已经计算完毕后发生崩溃会发生什么(它是缓存的,还是 Spark 无论如何都能够重建它?)。 @BiS:my method 是什么意思?你的意思是用sparkSession.createDataFrame创建? @SMaZ 是的,使用没有检查点的数据框创建它 【参考方案1】:

让我从下面的行创建数据框开始:

val newDf=sparkSession.createDataFrame(df.rdd, df.schema)

如果我们仔细研究SparkSession 类,那么这个方法会被@DeveloperApi 注释。要了解此注释的含义,请查看DeveloperApi 类中的以下几行

面向开发人员的较低级别、不稳定的 API。

开发人员 API 可能会在 Spark 的次要版本中更改或被删除。

因此不建议将此方法用于生产解决方案,在开源世界中称为风险自负实施。

但是,让我们更深入地了解当我们从 RDD 调用 createDataframe 时会发生什么。它正在调用internalCreateDataFrame 私有方法并创建LogicalRDD

LogicalRDD 在以下情况下创建:

数据集被请求到检查点 请求 SparkSession 从内部二进制行的 RDD 创建 DataFrame

所以它与checkpoint 操作无异,无需物理保存数据集。它只是从内部二进制行和模式的 RDD 创建 DataFrame。这可能会截断内存中的谱系,但不会截断物理级别。

所以我相信这只是创建另一个 RDD 的开销,并且不能用作checkpoint 的替代品

现在,检查点是截断沿袭图并将其保存到可靠的分布式/本地文件系统的过程。

为什么要检查站?

如果计算需要很长时间沿袭太长依赖太多的RDD

保留繁重的沿袭信息会带来内存成本。

即使在 Spark 应用程序终止后,检查点文件也不会自动删除,因此我们可以将其用于其他进程

使用“我的”方法会出现什么问题? (文档 建议检查点,它更昂贵,而不是这个 断血脉,我想知道原因)

This 文章会给出关于缓存和检查点的详细信息。 IIUC,您的问题更多是关于我们应该在哪里使用检查点。让我们讨论一些检查点有用的实际场景

    让我们假设我们有一个数据集,我们希望在该数据集上执行 100 次迭代操作,并且每次迭代都将最后一次迭代结果作为输入(Spark MLlib 用例)。现在,在这个迭代过程中,谱系将在此期间增长。这里定期检查点数据集(假设每 10 次迭代)将确保万一发生任何故障,我们可以从最后一个故障点开始该过程。 让我们举一些批处理的例子。想象一下,我们有一批正在创建一个具有繁重血统或复杂计算的主数据集。现在经过一些固定的时间间隔,我们得到了一些应该使用早期计算的主数据集的数据。在这里,如果我们检查我们的主数据集,那么它可以用于来自不同sparkSession 的所有后续进程。

我的过程是一个批处理过程,时间不长,可以 重新启动没有问题,所以检查点对我没有好处(我 想想)。

没错,如果您的进程不是重计算/大血统,那么就没有检查点。 拇指规则是,如果您的数据集没有被多次使用,并且可以比用于检查点/缓存的时间和资源更快地重新构建,那么我们应该避免它。它将为您的流程提供更多资源。

【讨论】:

CreateDataframe 对我来说就足够了(因为我只需要在内存中截断沿袭,不需要在未来的计算中节省内存或重用数据。我需要在下一个动作的计划计算中节省时间)。从文章和您的回答中,我几乎可以肯定它没有风险(忽略它是一个开发 API 的事实),因为 Spark 无论如何都会知道血统,因为它仍然“在内存中”(所以如果它崩溃,它会重新计算它或接受它如果缓存已缓存,则从缓存中提取),但它只是不将其与“动作”的其余部分混合,因此计划计算更短。 我的意思是,问题不在于节省执行时间或是否使用检查点,而是更多关于催化剂处理时间的节省,通过这些解决(通过检查点或 createDataframe)。我只是想知道 createDataframe 方法是否为当前执行引入了任何风险(例如在执行程序失败的情况下无法重新处理 RDD/从缓存中重新读取它),猜测这不是问题。 @BiS :了解您正在尝试做的事情。只有一件事,如果你有足够的资源并且无论如何你都要缓存,那么你也不需要做createDataFrame。如果数据集被缓存,血统将不是问题,因为它将使用 InMemoryTableScan InMemoryRelation 计划, 嗯,根据我的经验,它确实有所作为,缓存以某种方式保持了完整的计划,即使它在某些时候解析为 InMemoryTableScan(我在 Spark UI 中看到),我保存像 2- 3 分钟的执行时间(10-11 分钟)通过强制截断计划(与仅缓存相比)。如果计划在计算后匹配,则执行 AFAIK Spark 缓存“解析”(我猜它可能在物理计划部分已解决),我已经看到计划不使用缓存 DF 的情况,因为 Catalyst 优化改变了它们(不确定那是否是一个错误,但它也迫使我进行截断)。 @BiS:很有趣。将在此调试更多内容。【参考方案2】:

我认为sparkSession.createDataFrame(df.rdd, df.schema)会影响spark的容错性。

checkpoint()会将RDD保存在hdfs或s3中,因此如果发生故障,它将从最后一个检查点数据中恢复。

如果是createDataFrame(),它只会破坏谱系图。

【讨论】:

谢谢!根据我的经验,spark 似乎能够在崩溃时重建那些。我认为驱动程序以某种方式保留了这些 InMemoryTables 的血统,以防它们必须重新购买。不是 100% 确定,但从来没有遇到过任何问题(我的大部分程序都是完全可重复的)。

以上是关于Spark createDataFrame(df.rdd, df.schema) vs checkPoint 打破血统的主要内容,如果未能解决你的问题,请参考以下文章

07 从RDD创建DataFrame

07 从RDD创建DataFrame

07 从RDD创建DataFrame

07 从RDD创建DataFrame

07 从RDD创建DataFrame

07 从RDD创建DataFrame