spark在1.6的时候就已经默认把内存管理变为UnifiedMemoryManager,如果需要用回StaticMemoryManager,可以通过设置spark.memory.useLegacyMode为true,就可以用回原来的模式,下图为默认时,不同区域的内存使用的划分

  • 1.ReservedMemory:预留内存,默认为300M,如果系统内存SystemMemory小于1.5倍的ReservedMemory就会报错
  • 2.UsableMemory:可用内存,计算方式为SystemMemory-ReservedMemory
  • 3.MaxMemory:最大可以使用的内存,等于usableMemory x memoryFraction(spark.memory.fraction=0.6)
  • 4.HeapStorageMemory:存储内存等于maxMemory x storageFraction(spark.memory.storageFraction默认0.5)
  • 5 .HeapExecutionMemory:等于maxMemory – HeapStorageMemory

UnifiedMemoryManager

通过上面的简图内存划分为不同的区域,而MemoryManager主要用来管理execution和storage他们之间是如何共享内存,他有两个实现类UnifiedMemoryManager和StaticMemoryManager,接下来会重点查看UnifiedMemoryManager是如何进行内存的分配的。

MemoryManager作为抽象类,它在初始化的时候出事会初始化ON_HEAP,OFF_HEAP的两个StorageMemoryPool和ExecutionMemoryPool对象,用来管理不同模式下的内存区域。

StorageMemoryPool主要是用来记录storage的一个可调整内存池的大小,而ExecutionMemoryPool相对它复杂,因为它需要保证每个task能够合理的共享内存,如果这里有N个task,它会确保每个task在spill之前会有至少1/2N的内存,且最多不能超过1/N,因为N是动态变化的,当task的数量改变的时候,我们会需要重新计算1/2N和1/N。

从ExecutionMemoryPool获取分配需要调用acquireMemory方法,首先第一步会先判断这个task是否属于Active task,如果不属于,则把他放到memoryForTask这个map的数据结构当中,value则记录该task当前使用了多少内存。

val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

并在每次循环获取足够acquireMemory的numBytes之前,都会尝试的去回收storage从execution借去的内存

当获取到的内存不能够满足required大小的需求时,它就会阻塞等待,直到内存满足

    // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }

TaskMemoryManager

UnifiedMemoryManager是对内存的统一管理,而TaskMemoryManager则是管理每个独立Task的内存分配,TaskMemoryManager通过MemoryManager的acquireExecutionMemory接口进行内存申请,如果不能满足,则从consumers中挑选可以spill,进行内存释放,什么是consumer,其实我们可以用过日志打印直观的看出改Task使用了哪些consumer.

log4j.logger.org.apache.spark.memory.TaskMemoryManager=DEBUG

然后就可以清楚的在日志里面看到了

17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 5.2 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 10.3 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 21.4 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 47.7 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9

对于consumer来说,他有不同的实现类,ShuffleExternalSorter就是其中一个consumer的实现类,当我们在ShuffleExternalSorter插入一条record时他就会调用acquireNewPageIfNecessary,尝试的从TaskMemoryManager获取一个MemoryBlock(即一个page)同时,会将这个page记录到自己的pageTable中,并得到对应的pageCursor偏移量。

Spark用MemoryLocation记录和追踪在off-heap或on-heap内存地址,在off-heap模式,内存可以直接通过64-bit长度的地址进行寻址,在In-heap模式中,内存由该对象的引用和64-bit的offset去寻址,MemoryBlock在MemoryLocation的基础上增加了pageNumber和对应数据的length。

简单的函数调用关系为:consumer.allocatePage -> TaskMemoryManager.allocatePage -> MemoryManager的MemoryAllocator.allocate

在HeapMemoryAllocator中,我们可以看到,我们会通过内存对齐产生一个long类型的数组,并通过这个数组构成一个MemoryBlock

long[] array = new long[(int) ((size + 7) / 8)];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);

在得到对应的MemoryBlock之后,通过pageCursor偏移量(通过unsafe的arrayBaseOffset获取对象头的长度,我的是16)将数据写入到内存当中

/**
 * Created by tangshangwen on 17-1-13.
 */
public class TestUnsafe {
    public static void main(String[] args) {
        sun.misc.Unsafe unsafe;
        try {
            Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            unsafe = (sun.misc.Unsafe) unsafeField.get(null);
        } catch (Throwable cause) {
            unsafe = null;
        }
        System.out.println(unsafe.arrayBaseOffset(long[].class));
    }
}

和copyMemory将数据复制到对应的内存位置当中,并每次对pageCursor进行和数据长度length相加,找到下次数据写入的位置

总结

通过阅读和理解代码,加深了spark对内存管理方面知识的理解

参考资料

Spark Tungsten in-heap / off-heap 内存管理机制