Spark内存数据存储(MemoryStore)的实现

Posted 大数据架构师修行之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark内存数据存储(MemoryStore)的实现相关的知识,希望对你有一定的参考价值。

在Spark中根据存储级别可以把块数据保存到磁盘或内存中,同时还可以选择按序列化或非序列化的形式保存。

MemoryStore类实现了一个简单的基于块数据的内存数据库,用来管理需要写入到内存中的块数据。可以按序列化或非序列化的形式存放块数据,存放这两种块数据的数据结构是不同的,但都必须实现MemoryEntry这个接口。也就是说:MemoryStore管理的是以MemoryEntry为父接口的内存对象。

MemoryStore如何管理这些MemoryEntry对象呢?在当前版本,MemoryStore通过一个LinkedHashMap结构来管理内存对象。也就是说,MemoryStore是一个MemoryEntry类型的LinkedHashMap。那么,Spark为什么要选择LinkedHashMap作为内存管理的数据结构呢?这与内存块的淘汰机制有很大的关系,这会在后面的分析中详细说明。

另外,MemoryStore可以把块数据保存到堆外内存或堆内内存中,若内存不够还需要通过内存管理模块来获取内存。

内存数据存储管理结构

MemoryStore通过以MemoryEntry对象为元素的LinkedHashMap来管理块数据。LinkedHashMap是一个有序的HashMap,这样可以按插入顺序来对元素进行管理,此时各个节点构成了一个双向链表。其结构如图1所示。

     图1 MemoryStore的存储结构

在图1中,先后向Hash表中插入了节点1、2、3、4,由于是LinkedHashMap结构,所以会通过一个双向链表把这些节点按插入顺序连接起来,这样就得到了一个双向链表,见图1的红色箭头部分,该链表的结构如下图2:

         图2 按插入顺序组织的双向链表

然而,MemoryStore不仅使用了LinkedHashMap的基本特性,还使用了其按访问顺序的特性。当创建LinkedHashMap时若把参数accessOrder设置为true(默认是true),LinkedHashMap不会改变HashMap的基本结构,但会按访问元素的先后顺序把访问过的元素放到双向链表的末尾。这其实就形成了一个LRU队列(Least Recently Used队列)。这也就是官方所说:缓存数据是不可靠的,当内存不够时,会按LRU算法来淘汰内存块。而LinkedHashMap就是该算法的底层数据结构支持。

若我们先后访问(get)了节点3和2,则图1就变成了图3的结构:

             图3 访问节点3和2后的LinkedHashMap

从图3可以看到:当先后访问了节点3和2后的链表状态,节点3和节点2被添加到了访问链表的尾部。

另外,LinkedHashMap非并发结构,所以在进行其元素的读写操作时,必须加锁。

MemoryEntry结构

MemoryEntry是MemoryStore中的管理的成员结构。它是一个接口,有两种实现:一种是DeserializedMemoryEntry,用来保存非序列化块数据;一种是SerializedMemoryEntry,用来保存序列化块数据。

MemoryEntry的成员变量有三个:块数据的大小,内存的模式(堆内还是堆外),块数据的类标识。MemoryEntry的代码实现如下:

 // 代码位置:org.apache.spark.storage.memory
 
 private sealed trait MemoryEntry[T] {
   // 块数据大小
   def size: Long
   // 内存模式:ON_HEAP(堆内),OFF_HEAP(堆外)
   def memoryMode: MemoryMode
   // 数据的类标识
   def classTag: ClassTag[T]
 }

从以上定义代码可以看出,每个MemoryEntry对象的大小不是固定的,另外,每个MemoryEntry对象可能被保存为堆内或堆外内存模式。

DeserializedMemoryEntry

非序列化的块数据都通过该类来进行封装。要注意的是,非序列化的块数据只能保存在JVM的堆中,也就是说只能使用堆内内存(memoryMode的值为ON_HEAP)。在堆内内存中,该类通过一个数组来保存不同类型的块数据。

该类的实现代码如下:

 private case class DeserializedMemoryEntry[T](
  // 保存块数据的数组
     value: Array[T],
     size: Long,
     classTag: ClassTag[T]) extends MemoryEntry[T] {
   // 非序列化数据只能使用堆内内存存储
   val memoryMode: MemoryMode = MemoryMode.ON_HEAP
 }

SerializedMemoryEntry

序列化的块数据其实是一组字节数据的集合,MemoryStore使用了一个专门的结构:ChunkedByteBuffer来进行保存。序列化块数据可以保存在堆内和堆外内存中,其实现代码如下:

 private case class SerializedMemoryEntry[T](
  // 在这里保存序列化数据
     buffer: ChunkedByteBuffer,
  // 内存模式
     memoryMode: MemoryMode,
  // 数据的类标识
     classTag: ClassTag[T]) extends MemoryEntry[T] {
   // 块数据大小是buffer的大小
   def size: Long = buffer.size
 }

ChunkedByteBuffer

该对象本质上是字节流ByteBuffer的数组,也就是:Array[ByteBuffer]。

内存数据的写入

前面的章节介绍过,MemoryStore可以以序列化和非序列化的形式来写入数据。而这两种数据形式的存储结构也不相同。根据这两种形式MemoryStore提供了不同的写入方式。

序列化数据的写入

序列化数据的写入是通过函数MemoryStore#putBytes来实现的。并需要使用内存管理模块来确保内存大小,若内存不够还需要通过内存管理模块来释放内存。

该函数的声明如下:

 // 代码位置:org.apache.spark.storage.memory.MemoryStore
 def putBytes[T: ClassTag](
     blockId: BlockId,
     size: Long,
     memoryMode: MemoryMode,
     _bytes: () => ChunkedByteBuffer): Boolean = {

该函数的实现步骤如下:

1)判断blockId对应的内存块是否已经在MemoryStore中存在,若存在,则中断执行并抛出异常。

2)调用memoryManager.acquireStorageMemory(blockId, size, memoryMode)来判断在给定的memoryMode模式下内存是否足够,若不够则需要通过统一内存管理模块来从执行池中借用一部分内存,若执行内存池空闲内存不够,则需要把存储池中一部分内存进行持久化,并释放这部分内存(可以参考《统一内存管理的实现》一文)。

3)创建一个ChunkedByteBuffer对象,需要根据参数给定的匿名函数进行创建。

3)创建一个SerializedMemoryEntry对象,并把bytes作为参数传入。

4)在LinkedHashMap对象entries上加锁,并调用:entries.put(blockId, entry)把数据写入LinkedHashMap中。

非序列化数据的写入

非序列化数据的写入是通过putIteratorAsBytes函数来实现的。该函数的声明如下:

 private[storage] def putIteratorAsBytes[T](
     blockId: BlockId,
     values: Iterator[T],
     classTag: ClassTag[T],
     memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {...}

该函数的实现步骤如下:

1)第1步:获取数据块存储的初始化内存参数的值:spark.storage.unrollMemoryThreshold,默认值是1M。

2)第2步:通过迭代器遍历数据。保存数据块使用的是存储池内存,所以需要检查存储池的空闲内存是否足够,若不够,则会释放存储内存块,直到能容下需要保存的数据块内容。注意,保存非序列化数据时,其保存值是一个迭代器(参考以上代码中参数values的类型,),比如:保存一个数组的数据。当通过迭代器遍历该数组的数据时,每次都可能向存储池申请一些内存。但这一步只申请了空间,并没有存放数据元素。

3)第3步:由于第2步并没有存放数据,所以这一步还要进一步确认空闲内存是否足够,若足够,才会创建一个DeserializedValuesHolder对象;通过该对象可以创建一个MemoryEntry构造器:MemoryEntryBuilder。通过该构造器可以创建一个DeserializedMemoryEntry对象。该对象就是保存到MemoryStore的LinkedHashMap结构中的对象。由于LinkedHashMap不是一个同步队列,所以在向其存放数据时,需要加锁。

4)若获取不到足够的空闲内存,则会保错,并通过日志来记录当前内存的情况,包括:存储内存的大小,目前展开数据块的任务数,和目前已经使用的存储内存。

读取内存数据

读取内存数据的操作主要是通过getValues和getBytes来完成。由于数据是以MemoryEntry结构来进行保存,并且是以LinkedHashMap来进行组织,所以,读取和查找某个内存数据块十分方便,主要的实现代码如下:

     val entry = entries.synchronized { entries.get(blockId) }

此代码返回的是LinkedHashMap中的MemoryEntry,若是序列化数据,则返回:ChunkedByteBuffer对象。若是非序列化数据,则返回一个迭代器:Iterator[T]。

另外,从以上代码可以看出,在读取内存数据块时,需要加锁。

淘汰内存数据

当执行任务或缓存数据空闲内存不足时,可能会释放一部分存储内存,若对应的rdd的存储级别设置了useDisk,则会把内存中的数据持久化到磁盘上。完成以上操作的函数是:MemoryStore#evictBlocksToFreeSpace函数。其原型如下:

   private[spark] def evictBlocksToFreeSpace(
       blockId: Option[BlockId],
       space: Long,
       memoryMode: MemoryMode): Long = {

其中的blockId是数据块的id,每个id都对应一个内存块。释放内存块的逻辑如下:

(1)遍历内存块的队列。这是一个LinkedHashMap,最后一次被访问的内存块节点会放到链表的后面,这样最近没有被访问的内存块就在队列的头部。需要淘汰内存块时,只需要从头部选择一个进行删除即可。这就是LRU内存数据淘汰机制。

(2)检查内存块是否可以被释放。释放内存块需要满足以下条件:

1)内存块的模式必须和参数中memoryMode的值相等;

2)该blockId对应的内存块没有被其他RDD占用,或则不是要替换相同RDD的不同数据块。

(3)若满足以上两个条件,就会释放该内存块。释放内存块的过程如下:

1)确认内存块的写锁已经锁上了;

2)通过blockId的信息检查存储级别是否包含useDisk,若包含则把内存的数据写入到磁盘上。写入磁盘 的过程是通过DiskStore对象来完成的。

(4)从MemoryStore的中删除、释放blockId对应的内存块,并减少MemoryStore的内存数量。删除内存块的操作是在BlockManager#dropFromMemory函数中完成,因为,内存块删除后可能会把数据写入到磁盘上 ,此时此数据块的存储级别就发生了改变,需要通过BlockManager来把数据的最新状态同步给driver端的BlockManager Master。

到此,内存数据块就从MemoryStore中淘汰了,若对应rdd的存储级别包含了useDisk,则数据可能被保存到磁盘上,否则数据就从内存中丢失了。

小结

本文介绍了内存数据管理MemoryStore的实现。通过对该类的分析,我们就明白了在spark中内存数据到底是如何组织和管理的?LRU的淘汰算法是如何实现的?等等问题。

以上是关于Spark内存数据存储(MemoryStore)的实现的主要内容,如果未能解决你的问题,请参考以下文章

内存存储 MemoryStore

Spark存储系统详解

在 Node 中使用 MemoryStore 存储会话数据,类似于 PHP 中的 $_SESSION['data'] = value

具有内存存储的核心数据

为什么Rails的ActiveSupport :: Cache :: MemoryStore不适合大型应用程序?

超大超详细图解,让你掌握Spark memeoryStore内存管理的精髓