如何检查 RDD

Posted

技术标签:

【中文标题】如何检查 RDD【英文标题】:How to checkpoint RDD 【发布时间】:2019-11-14 11:55:39 【问题描述】:

火花 2.4.0


rdd = rdd.cache()
print(rdd.getStorageLevel())

内存序列化 1x 复制

sc.setCheckpointDir("/tmp/checkpoints")
rdd.checkpoint()

对 rdd 的操作

rdd.count()

25066

检查是否有检查点:

rdd.isCheckpointed()

错误

print(rdd.getCheckpointFile())

错误

【问题讨论】:

执行isLocallyCheckpointed()会返回什么? @Bala 其返回 False scala 代码对你有用吗?如果是这样,请添加该标签。我会写一个例子 this 能回答你的疑问吗? 基本上,你只需要checkpointed_rdd = rdd.checkpoint() 你的checkpointed_rdd.isCheckpointed() 应该返回true 【参考方案1】:

我在 spark 2.4.4 (EMR 5.28) 及其工作上进行了测试。

我可能是通过 EMR 的权限或配置问题,正如我之前在 spark 2.4.3 上尝试过的那样,我在 2.4.4 发行说明中没有看到任何关于检查点的问题。

df = spark.range(1, 7, 2)
df.show()

rdd = df.rdd
rdd = rdd.cache()
print("Storage Level - ".format(rdd.getStorageLevel()))

print("Is Checkpointed - ".format(rdd.isCheckpointed()))
print("Checkpoint File - ".format(rdd.getCheckpointFile()))


# Setting HDFS directory
sc.setCheckpointDir("/tmp/checkpoint_dir/")
rdd.checkpoint()

print("Is Checkpointed - ".format(rdd.isCheckpointed()))
print("Checkpoint File - ".format(rdd.getCheckpointFile()))

# Calling an action
print("count - ".format(rdd.count()))

print("Is Checkpointed - ".format(rdd.isCheckpointed()))
print("Checkpoint File - ".format(rdd.getCheckpointFile()))

输出:

+---+
| id|
+---+
|  1|
|  3|
|  5|
+---+

Storage Level - Memory Serialized 1x Replicated
Is Checkpointed - False
Checkpoint File - None
Is Checkpointed - False
Checkpoint File - None
count - 3
Is Checkpointed - True
Checkpoint File - hdfs://ip-xx-xx-xx-xx.ec2.internal:8020/tmp/checkpoint_dir/5d3bf642-cc17-4ffa-be10-51c58b8f5fcf/rdd-9

【讨论】:

【参考方案2】:

我使用 Spark 2.4.2 在独立集群中进行了测试。 Checkpoint 也在那里工作。

    spark.sparkContext.setCheckpointDir("temp/")
    val textFile=spark.sparkContext.textFile("test1.txt")
    println("textFile.isCheckpointed = " + textFile.isCheckpointed)
    textFile.checkpoint()
    println("textFile.count() = " + textFile.count())
    println("textFile.isCheckpointed = " + textFile.isCheckpointed)

结果

textFile.isCheckpointed = false
textFile.count() = 8
textFile.isCheckpointed = true

【讨论】:

以上是关于如何检查 RDD的主要内容,如果未能解决你的问题,请参考以下文章

如何使用火花流检查 rdd 是不是为空?

如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数

Spark的RDD检查点实现分析

Spark基础学习笔记21:RDD检查点与共享变量

RDD缓存与检查点

火花检查点比缓存更快吗?