Spark createDataFrame(df.rdd, df.schema) vs checkPoint 打破血统
Posted
技术标签:
【中文标题】Spark createDataFrame(df.rdd, df.schema) vs checkPoint 打破血统【英文标题】:Spark createDataFrame(df.rdd, df.schema) vs checkPoint for breaking lineage 【发布时间】:2019-09-02 00:37:52 【问题描述】:我正在使用
val df=longLineageCalculation(....)
val newDf=sparkSession.createDataFrame(df.rdd, df.schema)
newDf.join......
为了在计算计划时节省时间,但是文档说检查点是“切割”血统的建议方法。但我不想付出将 RDD 保存到磁盘的代价.
我的过程是一个批处理过程,它的时间不长,可以毫无问题地重新启动,所以检查点对我没有好处(我认为)。
使用“我的”方法会出现什么问题? (文档建议使用更昂贵的检查点,而不是这种用于破坏血统的检查点,我想知道原因)
我只能猜测,如果某个节点在我的“血统中断”之后出现故障,也许我的进程会失败,而检查点的进程会正常工作? (如果 DF 被缓存而不是检查点呢?)
谢谢!
编辑:
来自 SMaZ 的回答,我自己的知识和他提供的文章。使用 createDataframe (这是一个 Dev-API,因此使用“我的”/您自己的风险)会将沿袭保留在内存中(对我来说不是问题,因为我没有内存问题并且沿袭不大)。
有了这个,看起来(未经 100% 测试)Spark 应该能够在失败时重建所需的一切。
由于我没有在以下执行中使用数据,因此我将使用 cache+createDataframe 与检查点(如果我没记错的话,是 实际上是缓存+saveToHDFS+"createDataFrame")。
我的流程并不那么关键(如果它崩溃了),因为用户总是期待结果并且他们手动启动它,所以如果它出现问题,他们可以重新启动(+Spark 将重新启动它)或打电话给我,所以无论如何我可以承担一些风险,但我 99% 确信没有风险:)
【问题讨论】:
你看过这个question吗? 是的,我做到了,但没有答案,我想知道这种方法的缺点,因为官方文档建议使用较慢的方法。 虽然重读,但迭代读取似乎需要检查点,这种方法对于独立读取应该没问题(来自 Garren 的评论)。但是,我仍然很好奇,如果在 createDataFrame 提供的“InMemoryTable”已经计算完毕后发生崩溃会发生什么(它是缓存的,还是 Spark 无论如何都能够重建它?)。 @BiS:my method
是什么意思?你的意思是用sparkSession.createDataFrame
创建?
@SMaZ 是的,使用没有检查点的数据框创建它
【参考方案1】:
让我从下面的行创建数据框开始:
val newDf=sparkSession.createDataFrame(df.rdd, df.schema)
如果我们仔细研究SparkSession 类,那么这个方法会被@DeveloperApi
注释。要了解此注释的含义,请查看DeveloperApi 类中的以下几行
面向开发人员的较低级别、不稳定的 API。
开发人员 API 可能会在 Spark 的次要版本中更改或被删除。
因此不建议将此方法用于生产解决方案,在开源世界中称为风险自负实施。
但是,让我们更深入地了解当我们从 RDD 调用 createDataframe
时会发生什么。它正在调用internalCreateDataFrame
私有方法并创建LogicalRDD
。
LogicalRDD 在以下情况下创建:
数据集被请求到检查点 请求 SparkSession 从内部二进制行的 RDD 创建 DataFrame所以它与checkpoint
操作无异,无需物理保存数据集。它只是从内部二进制行和模式的 RDD 创建 DataFrame。这可能会截断内存中的谱系,但不会截断物理级别。
所以我相信这只是创建另一个 RDD 的开销,并且不能用作checkpoint
的替代品。
现在,检查点是截断沿袭图并将其保存到可靠的分布式/本地文件系统的过程。
为什么要检查站?
如果计算需要很长时间或沿袭太长或依赖太多的RDD
保留繁重的沿袭信息会带来内存成本。
即使在 Spark 应用程序终止后,检查点文件也不会自动删除,因此我们可以将其用于其他进程
使用“我的”方法会出现什么问题? (文档 建议检查点,它更昂贵,而不是这个 断血脉,我想知道原因)
This 文章会给出关于缓存和检查点的详细信息。 IIUC,您的问题更多是关于我们应该在哪里使用检查点。让我们讨论一些检查点有用的实际场景
-
让我们假设我们有一个数据集,我们希望在该数据集上执行 100 次迭代操作,并且每次迭代都将最后一次迭代结果作为输入(
Spark MLlib
用例)。现在,在这个迭代过程中,谱系将在此期间增长。这里定期检查点数据集(假设每 10 次迭代)将确保万一发生任何故障,我们可以从最后一个故障点开始该过程。
让我们举一些批处理的例子。想象一下,我们有一批正在创建一个具有繁重血统或复杂计算的主数据集。现在经过一些固定的时间间隔,我们得到了一些应该使用早期计算的主数据集的数据。在这里,如果我们检查我们的主数据集,那么它可以用于来自不同sparkSession
的所有后续进程。
我的过程是一个批处理过程,时间不长,可以 重新启动没有问题,所以检查点对我没有好处(我 想想)。
没错,如果您的进程不是重计算/大血统,那么就没有检查点。 拇指规则是,如果您的数据集没有被多次使用,并且可以比用于检查点/缓存的时间和资源更快地重新构建,那么我们应该避免它。它将为您的流程提供更多资源。
【讨论】:
CreateDataframe 对我来说就足够了(因为我只需要在内存中截断沿袭,不需要在未来的计算中节省内存或重用数据。我需要在下一个动作的计划计算中节省时间)。从文章和您的回答中,我几乎可以肯定它没有风险(忽略它是一个开发 API 的事实),因为 Spark 无论如何都会知道血统,因为它仍然“在内存中”(所以如果它崩溃,它会重新计算它或接受它如果缓存已缓存,则从缓存中提取),但它只是不将其与“动作”的其余部分混合,因此计划计算更短。 我的意思是,问题不在于节省执行时间或是否使用检查点,而是更多关于催化剂处理时间的节省,通过这些解决(通过检查点或 createDataframe)。我只是想知道 createDataframe 方法是否为当前执行引入了任何风险(例如在执行程序失败的情况下无法重新处理 RDD/从缓存中重新读取它),猜测这不是问题。 @BiS :了解您正在尝试做的事情。只有一件事,如果你有足够的资源并且无论如何你都要缓存,那么你也不需要做createDataFrame
。如果数据集被缓存,血统将不是问题,因为它将使用 InMemoryTableScan
InMemoryRelation
计划,
嗯,根据我的经验,它确实有所作为,缓存以某种方式保持了完整的计划,即使它在某些时候解析为 InMemoryTableScan(我在 Spark UI 中看到),我保存像 2- 3 分钟的执行时间(10-11 分钟)通过强制截断计划(与仅缓存相比)。如果计划在计算后匹配,则执行 AFAIK Spark 缓存“解析”(我猜它可能在物理计划部分已解决),我已经看到计划不使用缓存 DF 的情况,因为 Catalyst 优化改变了它们(不确定那是否是一个错误,但它也迫使我进行截断)。
@BiS:很有趣。将在此调试更多内容。【参考方案2】:
我认为sparkSession.createDataFrame(df.rdd, df.schema)
会影响spark的容错性。
但checkpoint()
会将RDD保存在hdfs或s3中,因此如果发生故障,它将从最后一个检查点数据中恢复。
如果是createDataFrame()
,它只会破坏谱系图。
【讨论】:
谢谢!根据我的经验,spark 似乎能够在崩溃时重建那些。我认为驱动程序以某种方式保留了这些 InMemoryTables 的血统,以防它们必须重新购买。不是 100% 确定,但从来没有遇到过任何问题(我的大部分程序都是完全可重复的)。以上是关于Spark createDataFrame(df.rdd, df.schema) vs checkPoint 打破血统的主要内容,如果未能解决你的问题,请参考以下文章