揭秘Spark_checkpoint
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了揭秘Spark_checkpoint相关的知识,希望对你有一定的参考价值。
参考技术A checkpoint是什么
(1)、Spark 在生产环境下经常会面临transformation的RDD非常多(例如一个Job中包含1万个RDD)或者具体transformation的RDD本身计算特别复杂或者耗时(例如计算时长超过1个小时),这个时候就要考虑对计算结果数据持久化保存;
(2)、Spark是擅长多步骤迭代的,同时擅长基于Job的复用,这个时候如果能够对曾经计算的过程产生的数据进行复用,就可以极大的提升效率;
(3)、如果采用persist把数据放在内存中,虽然是快速的,但是也是最不可靠的;如果把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏,系统管理员可能清空磁盘。
(4)、Checkpoint的产生就是为了相对而言更加可靠的持久化数据,在Checkpoint的时候可以指定把数据放在本地,并且是多副本的方式,但是在生产环境下是放在HDFS上,这就天然的借助了HDFS高容错、高可靠的特征来完成了最大化的可靠的持久化数据的方式;
假如进行一个1万个算子操作,在9000个算子的时候persist,数据还是有可能丢失的,但是如果checkpoint,数据丢失的概率几乎为0。
checkpoint原理机制
1.当RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制。checkpoint就是对于一个RDD chain(链)如果后面需要反复使用某些中间结果RDD,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制,使用checkpoint首先需要调用sparkContext的setCheckpoint方法,设置一个容错文件系统目录,比如hdfs,然后对RDD调用checkpoint方法。之后在RDD所处的job运行结束后,会启动一个单独的job来将checkpoint过的数据写入之前设置的文件系统持久化,进行高可用。所以后面的计算在使用该RDD时,如果数据丢失了,但是还是可以从它的checkpoint中读取数据,不需要重新计算。
2.persist或者cache与checkpoint的区别在于,前者持久化只是将数据保存在BlockManager中但是其lineage是不变的,但是后者checkpoint执行完后,rdd已经没有依赖RDD,只有一个checkpointRDD,checkpoint之后,RDD的lineage就改变了。persist或者cache持久化的数据丢失的可能性更大,因为可能磁盘或内存被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。
问题:哪些 RDD 需要 cache?
会被重复使用的(但不能太大)。
问题:用户怎么设定哪些 RDD 要 cache?
因为用户只与 driver program 打交道,因此只能用 rdd.cache() 去 cache 用户能看到的 RDD。所谓能看到指的是调用 transformation() 后生成的 RDD,而某些在 transformation() 中 Spark 自己生成的 RDD 是不能被用户直接 cache 的,比如 reduceByKey() 中会生成的 ShuffledRDD、MapPartitionsRDD 是不能被用户直接 cache 的。
运算时间很长或运算量太大才能得到的 RDD,computing chain 过长或依赖其他 RDD 很多的 RDD。 实际上,将 ShuffleMapTask 的输出结果存放到本地磁盘也算是 checkpoint,只不过这个 checkpoint 的主要目的是去 partition 输出数据。
问题:什么时候 checkpoint?
cache 机制是每计算出一个要 cache 的 partition 就直接将其 cache 到内存了。但 checkpoint 没有使用这种第一次计算得到就存储的方法,而是等到 job 结束后另外启动专门的 job 去完成 checkpoint 。 也就是说需要 checkpoint 的 RDD 会被计算两次。因此,在使用 rdd.checkpoint() 的时候,建议加上 rdd.cache(), 这样第二次运行的 job 就不用再去计算该 rdd 了,直接读取 cache 写磁盘。其实 Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 这样的方法,相当于 cache 到磁盘上,这样可以做到 rdd 第一次被计算得到时就存储到磁盘上,但这个 persist 和 checkpoint 有很多不同,之后会讨论。
RDD 需要经过 [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 这几个阶段才能被 checkpoint。
Initialized: 首先 driver program 需要使用 rdd.checkpoint() 去设定哪些 rdd 需要 checkpoint,设定后,该 rdd 就接受 RDDCheckpointData 管理。用户还要设定 checkpoint 的存储路径,一般在 HDFS 上。
marked for checkpointing: 初始化后,RDDCheckpointData 会将 rdd 标记为 MarkedForCheckpoint。
checkpointing in progress: 每个 job 运行结束后会调用 finalRdd.doCheckpoint(),finalRdd 会顺着 computing chain 回溯扫描,碰到要 checkpoint 的 RDD 就将其标记为 CheckpointingInProgress,然后将写磁盘(比如写 HDFS)需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 节点上的 blockManager。完成以后,启动一个 job 来完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf)) )。
checkpointed: job 完成 checkpoint 后,将该 rdd 的 dependency 全部清掉,并设定该 rdd 状态为 checkpointed。然后, 为该 rdd 强加一个依赖,设置该 rdd 的 parent rdd 为 CheckpointRDD, 该 CheckpointRDD 负责以后读取在文件系统上的 checkpoint 文件,生成该 rdd 的 partition。
有意思的是我在 driver program 里 checkpoint 了两个 rdd,结果只有一个(下面的 result)被 checkpoint 成功,pairs2 没有被 checkpoint,也不知道是 bug 还是故意只 checkpoint 下游的 RDD:
checkPoint是一种容错机制,当我们的程序需要很多transformation操作的时候,如果我们担心中间某些关键的后面会反复几次使用的RDD,可能会因为节点的故障,导致持久化数据的丢失,那么就可以针对该RDD额外启动checkpoint机制, 实现容错和高可用。
首先要调用 SparkContext的setCheckPointDir()方法,设置一个容错的文件系统的目录,比如HDFS; 然后对RDD调用checkpoint()方法。之后,在RDD所处的job运行结束之后,会启动一个单独的job,来将checkPoint的RDD的数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。
此时就算在后面使用RDD时,它的持久化的数据,不小心丢失了,但是还是可以从它的checkpoint文件中直接读取其数据,从而不需要重新计算。 (CacheManager)
答:首先使用SparkContext.setCheckpointDir() ,设置checkpoint的目录,然后使用RDD.checkpoin进行checkpoint。
剖析,当我们使用了checkpoint之后,发生的一系列操作:
1、 对RDD调用了checkpoint()方法之后,它就接受RDDCheckpointData对象的管理。
2、 RDDCheckpointData对象,会负责将调用了checkpoint()方法的RDD的状态,设置为MarkedForCheckpoint。
3、 在RDD所在的那个job运行结束之后,会调用job中,最后一个RDD的doCheckPoint()方法,该方法沿着finalRDD的lineage向上查找,标记为MarkedForCheckpoint的RDD,并将其标记为CheckpointingInProgress。
4、 然后启动一个单独的job,来将lineage中,标记为CheckpointingInProgress的RDD,进行checkpoint的操作,也就是将这个RDD写入到Sparkcontext.setCheckpointDir()方法设置的文件系统中。
答:最主要的区别:
在于持久化,只是将数据保存在BlockManager中,但是rdd的lineage没有发生改变。
但是checkpoint执行完以后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,也就是说,checkpoint之后,rdd的lineage就改变了。
其次,持久化的数据丢失的可能性更大,无论是磁盘、或者是内存,都有可能丢失;但是checkpoint的数据,通常是保存在容错、高可用的文件系统中的,比如HDFS,依赖于这种高容错的文件西永,所以checkpoint的数据丢失可能性非常低。
答:如果一个RDD没有缓存,而且还设置了checkpoint,这样的操作是很悲剧的,细想,本来当前RDD的这个job都执行结束了,但是由于中间的rdd没有持久化,那么checkpoint job想要将rdd 的数据写入到外部文件系统中的话,还得从rdd之前所有的rdd,全部重新计算一次,然后才能计算出rdd的数据,再将其checkpoint到外部的文件系统中,
所以我们通常建议,对要checkpoint的rdd使用persisit(StorageLevel_DISK_OMLY),该rdd计算之后,就直接将其持久化到磁盘上去,然后后面进行checkpoint操作是,直接从磁盘上读取rdd的数据,并checkpoint到外部文件系统即可,不需要重新计算一次rdd。
四、Docker网络揭秘
参考技术A Docker 之所以功能这么强大,其实就是充分利用了Linux Kernel的特性:NameSpace、CGroups、UnionFileSystem。通过这些特性实现了资源隔离、限制与分层等。本文这次就来揭晓Docker中的容器是如何做到网络互通的。两台机器如果要实现通信,其实就是通过底层的网卡进行数据传输,每个网卡都有一个唯一的MAC地址,网卡又会绑定一个ip地址,只要两台机器的网络可以互通,那么这两台机器就可以进行通信。
想要实现通信,就得有两个同一网段的网卡,两个网卡必须是可以 ping 通的。
Docker在安装成功后,会在宿主机创建一个docker0网卡,这个网卡就是负责容器与宿主机之间通信的桥梁。
通过Docker创建一个容器之后,会在宿主机再创建一个网卡,也就是上面的 veth3543ea3@if7 ,容器内也会创建一个网卡。
一般成对的网卡,网卡组件名称后面的数字是连续的,比如宿主机的 @if7 和容器内的 @if8 ,正是这成对的网卡,才实现了容器与宿主机之间的通信。这其实是利用 Linux Kernel 的特性 NameSpace 实现的网卡隔离,不同NameSpace下的网卡是独立的,就像Java程序中的 package 一样。
从上面的例子中看到容器与宿主机之间的通信好像并不是通过docker0网卡实现的?
其实这只是单容器的状态,可能看不出docker0的作用。用图来表示一下单容器的网卡通信情况。
通过docker生成的 eth0 和 veth 两个网卡实现同一网段内的通信,而这个网卡又是桥接在docker0网卡上的。
再看下有多个容器的情况。
两个容器之间可以互相通信的原因就是因为docker0的存在,因为它们的网卡都是桥接在docker0上,所以也就有了和另一个容器通信的桥。
我们来验证一下是不是这样!
这种网络连接方法我们称为Bridge,这也是docker中默认的网络模式。可以通过命令查看docker中的网络模式:
通过 docker network ls 命令查看到,docker提供了3种网络模式,brige模式我们已经知道了,那 host 和 none 又是什么意思呢?不妨来验证一下:
这种模式只会创建一个本地的环路网卡,无法与其他容器或宿主机进行通信。
在创建自己的network之前先来解释一下为什么要创建新的network。
我们用一个例子来演示一下不同容器之间的通信。
容器之间通过 ip 是可以正常访问的,但是有没有这种情况:如果一个容器出问题了,我们重启之后它的ip变了,那是不是其他用到这个容器的项目配置是不是都得改。
有没有可能直接通过容器名称来访问呢?来验证一下:
发现并不能 ping 通,但是可以使用别的手段来达到这个目的。
通过上面这种方式就可以做到以容器名来 ping 通其他容器,其实它就跟windows系统中在 hosts 文件里加了个映射是一样的。
可以看到创建自定义的 network 自动帮我们实现了这个功能,而且使用自定义的 network 也方便管理,不同业务类型的容器可以指定不同的 network。
不同的 network ip网段也不一样,这样也可以增加单机中可以创建的容器的数量。
在创建一个容器的时候,一般都需要讲容器需要暴露的端口映射到物理主机的相同端口或其他端口,因为在外网环境下是不方便直接连接到容器的,所以需要通过映射端口的方式,让外网访问宿主机的映射端口来访问容器。
如果想建立多个容器,势必需要端口映射,以满足不同容器使用相同端口的情况。
以上这些内容都是在单容器进行操作,容器之间通信也只是通过 docker0 实现的桥接模式。
如果要实现多机之间的docker通信,其实还是通过网卡,只不过需要其他的技术来实现了。
本章节就不在演示,到后面的章节再来分析!
以上是关于揭秘Spark_checkpoint的主要内容,如果未能解决你的问题,请参考以下文章
(转)写文章 以太坊(Ethereum)创世揭秘 以太坊(Ethereum)创世揭秘