分类 spark 下的文章

[SPARK] Spark Streaming在进行YARN kill的时候无法对eventlog的inprogress进行rename

Spark Streaming程序在cluster模式下,进行yarn kill操作时,对导致eventlog无法在stop的时候进行rename,导致spark history无法读取对应的日志

经过分析,原因时在进行kill操作时,YARN的cluster模式下,进行TERM和KILL两个信号会有250ms的时间差,意味着TERM(kill -15 pid)进行了250ms会进行KILL(kill -9 pid)的操作,TERM操作会触发Spark StreamingContext的jvm hook,之后会调用sparkContext的hook,但ss的hook会可能会等待上12s的时间

// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.
receiverJobExitLatch.await(10, TimeUnit.SECONDS)

还有在JobScheduler上

jobExecutor.awaitTermination(2, TimeUnit.SECONDS)

因此,在streaming未完全停止之前,就执行了kill -9的操作,导致程序过早退出,有可能还会导致streaming程序丢数。

因此可以配置yarn.nodemanager.sleep-delay-before-sigkill.ms(默认250ms)增加,至少为12s之上

// kill process
      if (processId != null) {
        String user = container.getUser();
        LOG.debug("Sending signal to pid " + processId
            + " as user " + user
            + " for container " + containerIdStr);

        final Signal signal = sleepDelayBeforeSigKill > 0
          ? Signal.TERM
          : Signal.KILL;

        boolean result = exec.signalContainer(new ContainerSignalContext.Builder()
                  .setContainer(container)
                  .setUser(user)
                  .setPid(processId)
                  .setSignal(signal)
                  .build()

          );

        LOG.debug("Sent signal " + signal + " to pid " + processId
          + " as user " + user
          + " for container " + containerIdStr
          + ", result=" + (result? "success" : "failed"));

        if (sleepDelayBeforeSigKill > 0) {
          new DelayedProcessKiller(container, user,
              processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
        }

[Spark] 简单了解Spark2.0的内存管理

spark在1.6的时候就已经默认把内存管理变为UnifiedMemoryManager,如果需要用回StaticMemoryManager,可以通过设置spark.memory.useLegacyMode为true,就可以用回原来的模式,下图为默认时,不同区域的内存使用的划分

  • 1.ReservedMemory:预留内存,默认为300M,如果系统内存SystemMemory小于1.5倍的ReservedMemory就会报错
  • 2.UsableMemory:可用内存,计算方式为SystemMemory-ReservedMemory
  • 3.MaxMemory:最大可以使用的内存,等于usableMemory x memoryFraction(spark.memory.fraction=0.6)
  • 4.HeapStorageMemory:存储内存等于maxMemory x storageFraction(spark.memory.storageFraction默认0.5)
  • 5 .HeapExecutionMemory:等于maxMemory – HeapStorageMemory

UnifiedMemoryManager

通过上面的简图内存划分为不同的区域,而MemoryManager主要用来管理execution和storage他们之间是如何共享内存,他有两个实现类UnifiedMemoryManager和StaticMemoryManager,接下来会重点查看UnifiedMemoryManager是如何进行内存的分配的。

MemoryManager作为抽象类,它在初始化的时候出事会初始化ON_HEAP,OFF_HEAP的两个StorageMemoryPool和ExecutionMemoryPool对象,用来管理不同模式下的内存区域。

StorageMemoryPool主要是用来记录storage的一个可调整内存池的大小,而ExecutionMemoryPool相对它复杂,因为它需要保证每个task能够合理的共享内存,如果这里有N个task,它会确保每个task在spill之前会有至少1/2N的内存,且最多不能超过1/N,因为N是动态变化的,当task的数量改变的时候,我们会需要重新计算1/2N和1/N。

从ExecutionMemoryPool获取分配需要调用acquireMemory方法,首先第一步会先判断这个task是否属于Active task,如果不属于,则把他放到memoryForTask这个map的数据结构当中,value则记录该task当前使用了多少内存。

val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

并在每次循环获取足够acquireMemory的numBytes之前,都会尝试的去回收storage从execution借去的内存

当获取到的内存不能够满足required大小的需求时,它就会阻塞等待,直到内存满足

    // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }

TaskMemoryManager

UnifiedMemoryManager是对内存的统一管理,而TaskMemoryManager则是管理每个独立Task的内存分配,TaskMemoryManager通过MemoryManager的acquireExecutionMemory接口进行内存申请,如果不能满足,则从consumers中挑选可以spill,进行内存释放,什么是consumer,其实我们可以用过日志打印直观的看出改Task使用了哪些consumer.

log4j.logger.org.apache.spark.memory.TaskMemoryManager=DEBUG

然后就可以清楚的在日志里面看到了

17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 5.2 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 10.3 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 21.4 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 47.7 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9

对于consumer来说,他有不同的实现类,ShuffleExternalSorter就是其中一个consumer的实现类,当我们在ShuffleExternalSorter插入一条record时他就会调用acquireNewPageIfNecessary,尝试的从TaskMemoryManager获取一个MemoryBlock(即一个page)同时,会将这个page记录到自己的pageTable中,并得到对应的pageCursor偏移量。

Spark用MemoryLocation记录和追踪在off-heap或on-heap内存地址,在off-heap模式,内存可以直接通过64-bit长度的地址进行寻址,在In-heap模式中,内存由该对象的引用和64-bit的offset去寻址,MemoryBlock在MemoryLocation的基础上增加了pageNumber和对应数据的length。

简单的函数调用关系为:consumer.allocatePage -> TaskMemoryManager.allocatePage -> MemoryManager的MemoryAllocator.allocate

在HeapMemoryAllocator中,我们可以看到,我们会通过内存对齐产生一个long类型的数组,并通过这个数组构成一个MemoryBlock

long[] array = new long[(int) ((size + 7) / 8)];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);

在得到对应的MemoryBlock之后,通过pageCursor偏移量(通过unsafe的arrayBaseOffset获取对象头的长度,我的是16)将数据写入到内存当中

/**
 * Created by tangshangwen on 17-1-13.
 */
public class TestUnsafe {
    public static void main(String[] args) {
        sun.misc.Unsafe unsafe;
        try {
            Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            unsafe = (sun.misc.Unsafe) unsafeField.get(null);
        } catch (Throwable cause) {
            unsafe = null;
        }
        System.out.println(unsafe.arrayBaseOffset(long[].class));
    }
}

和copyMemory将数据复制到对应的内存位置当中,并每次对pageCursor进行和数据长度length相加,找到下次数据写入的位置

总结

通过阅读和理解代码,加深了spark对内存管理方面知识的理解

参考资料

Spark Tungsten in-heap / off-heap 内存管理机制

[Spark] 简单分析Spark的RPC通信框架

在Spark中,已经采用了Netty作为RPC的通信框架,其通信的RpcEndpointRef和RpcEndpoint都是通过RpcEnv进行创建,在创建RpcEnv时,最终会调用NettyRpcEnvFactory中的create方法,并通过传入的clientMode去决定是否启动TransportServer。

以下是NettyRpcEnv的简图

除了Dispatcher之外,还有Inbox,Outbox,RpcEndpoint,RpcEndpointRef等几个重要的类

RpcEndpointRef的作用

进行远程通信时,一般都需要一个client一个server,而RpcEndpointRef就相当于一个client的角色,并且通过RpcEnv的实现类(NettyRpcEnv)的asyncSetupEndpointRefByURI进行创建在NettyRpcEndpointRef中我们可以看到,其实他只是需要RpcEndpoint对应的ip,port和RpcEndpoint name,然后程序在调用ask或者send方法发送信息时,NettyRpcEnv会根据他所提供的地址信息封装成RequestMessage进行处理,这里这个this指的是一个NettyRpcEndpointRef

  override def send(message: Any): Unit = {
    require(message != null, "Message is null")
    nettyEnv.send(RequestMessage(nettyEnv.address, this, message))
  }

程序在发送RequestMessage之前,会先判断改发送的地址是否是本地的地址,如果不是,则将message封装为OutboxMessage,并放到Outbox当中,如果是本地,则通过Dispatcher把message放到对应EndpointData的inbox里面

  private[netty] def send(message: RequestMessage): Unit = {
    val remoteAddr = message.receiver.address
    if (remoteAddr == address) {
      // Message to a local RPC endpoint.
      try {
        dispatcher.postOneWayMessage(message)
      } catch {
        case e: RpcEnvStoppedException => logWarning(e.getMessage)
      }
    } else {
      // Message to a remote RPC endpoint.
      postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))
    }
  }

Inbox,Outbox的作用

简单来说,Inbox主要的作用是存储发送给RpcEndpoint的消息,Outbox就是存放发送到remote host的message的地方

当程序在调用targetOutbox.send(message)时,该message会先放到OutBox内部的messages的list当中,然后通过传入的TransportClient发送到对应的RpcEndpoint

  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) {
        true
      } else {
        messages.add(message)
        false
      }
    }
    if (dropped) {
      message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
    } else {
      drainOutbox()
    }
  }

RpcEndpoint的作用

像刚才说的,客户端有了,服务端也就是RpcEndpoint,就像CoarseGrainedExecutorBackend一样,它也是一个RpcEndpoint,并实现了对应的接口(receive,onStart)等等。 在RpcEndpoint启动时,需要RpcEnv中setupEndpoint,也就是向Dispatcher注册RpcEndpoint,这样dispatcher才能把message分发对应的RpcEndpoint当中

    env.rpcEnv.setupEndpoint("Executor", 
new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

下面的就是Dispatcher中的注册逻辑,里面维护着endpoints,endpointRefs和receivers等几个重要的数据结构

      def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
        val addr = RpcEndpointAddress(nettyEnv.address, name)
        val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
        synchronized {
          if (stopped) {
            throw new IllegalStateException("RpcEnv has been stopped")
          }
          if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          }
          val data = endpoints.get(name)
          endpointRefs.put(data.endpoint, data.ref)
          receivers.offer(data)  // for the OnStart message
        }
    endpointRef
}

就想刚才所说的如果我传进去的clientMode为false,就会启动相应的TransportServer监听该主机对应的端口,NettyRpcHandler通过反序列化得到对应的RequestMessage,并通过message的message.receiver.name找到对应EndpointData,并把message放到对应的inbox中

  private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        data.inbox.post(message)
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

然后通过把对应的EndpointData放到receivers中,通过设置好的线程池的线程去消费receivers里面的EndpointData,从而调用Endpoint里面的的receive等实现方法进行不同的逻辑处理

/** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            val data = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

一个流程就大约这样了,通过分析能够加深对框架设计和spark的认识。

参考资料

Spark RPC通信层设计原理分析

[Spark] Spark的History关联对应的executor的日志

场景在于,因为很多用户想在Spark的history上看到对应的executor日志,其实这个并不是Spark的什么新功能,他们不需要用yarn application -logs下载所有的日志,因为我们之前一直忽略这个,没配置,后来配置上就可以了。

除了要开启yarn的日志收集功能之外

<property>
     <name>yarn.log-aggregation-enable</name>
     <value>true</value>
</property>

还要在yarn-site.xml增加参数,并且重启NodeManager即可,当我们访问时,就会帮我们重定向到日志服务器上,找到对应的日志

<property>
      <name>yarn.log.server.url</name>
      <value>http://HistoryServerAddress:19888/jobhistory/logs</value>
</property>

然后点击对应的日志链接就可以链接过去了

记录一下以免忘记。