2017年一月月 发布的文章

[YARN] 一个JDK的bug导致RM无法分配Container

最近集群因为NM的OOM,然后决定把ContainerMetric给关闭了,然后采取了批量重启NM的方式,采取的步骤是先批量下线,然后在上线,后来发现集群任务越来越慢,集群的利用率越来越低

以下是集群可用内存,一直在增加

其中出现的现象是:

  • 1.事件队列队列堆积非常严重,最高500W+
  • 2.Container每秒的分配速率从原来的k级别到变成几十每秒
  • 3.发现NM注册之后有些节点从NEW->RUNNING状态需要花费20+s的是状态转化时间,从而导致FINISHED_CONTAINERS_PULLED_BY_AM事件不在状态机的处理范围当中抛出了异常

然后发现社区也有类似的错误问题YARN-4741,但是看他们讨论的结果,并不能解释为什么调度事件下来的时候,NM还是分配不到container

节点每次进行addNode就不进行分配了

[2017-01-17T18:29:31.769+08:00] [INFO] resourcemanager.rmnode.RMNodeImpl.handle(RMNodeImpl.java 424) [AsyncDispatcher event handler] : xxx:50086 Node Transitioned from NEW to RUNNING
[2017-01-17T18:29:44.191+08:00] [INFO] scheduler.fair.FairScheduler.addNode(FairScheduler.java 899) [ResourceManager Event Processor] : Added node xxx:50086 cluster capacity: <memory:226007040, vCores:67872>
[2017-01-17T18:31:43.538+08:00] [INFO] hadoop.util.HostsFileReader.readFileToSetWithFileInputStream(HostsFileReader.java 88) [IPC Server handler 0 on 8033] : Adding xxx to the list of included hosts from xxx/hosts/mapred_hosts

而且NodeUpdate的平均时间很低

然后做出假设,假设NM没分配上

  • 1.NM因为状态错误,RM认为他满了,资源不满足,所以跳过分配
  • 2.NM因为被别的应用预留,状态错误,导致没分配
  • 3.队列状态错误,导致RM认为队列资源满了,所以没分配

经过反复排查之后,都不能解释上面的问题,后来通过不断上线和下线节点在测试集群重现了改问题

后来发现了连续调度的操作次数一直没有增加

然后追踪该节点的日志,FairSchedulerContinuousScheduling线程因为timsort的bug导致线程退出了

[2017-01-17T15:52:12.791+08:00] [ERROR] hadoop.yarn.YarnUncaughtExceptionHandler.uncaughtException(YarnUncaughtExceptionHandler.java 68) [FairSchedulerContinuousScheduling] : Thread Thread[FairSchedulerContinuousScheduling,5,main] threw an Exception.
java.lang.IllegalArgumentException: Comparison method violates its general contract!
        at java.util.TimSort.mergeLo(TimSort.java:747)
        at java.util.TimSort.mergeAt(TimSort.java:483)
        at java.util.TimSort.mergeCollapse(TimSort.java:410)
        at java.util.TimSort.sort(TimSort.java:214)
        at java.util.TimSort.sort(TimSort.java:173)
        at java.util.Arrays.sort(Arrays.java:659)
        at java.util.Collections.sort(Collections.java:217)
        at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.continuousSchedulingAttempt(FairScheduler.java:1058)
        at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler$ContinuousSchedulingThread.run(FairScheduler.java:292)

原因是我们开启了continuousSchedulingEnabled,当下线的节点重新注册时,completedContainers.isEmpty()肯定是true,新加入的节点无法加入调度,导致集群使用率越来越低

if (continuousSchedulingEnabled) {
      if (!completedContainers.isEmpty()) {
        attemptScheduling(node);
      }
    } else {
      attemptScheduling(node);
    }

因为TimSort的compare函数不够严谨,然后引发这个bug,所以为了解决这个问题,可以在启动时加上一下参数

-Djava.util.Arrays.useLegacyMergeSort=true

以下是提供的参考资料TimSort in Java 7

总 结

  1. 以后遇到集群慢的时候优先检查队列资源的使用率,如果对比两天的使用有明显差异应该出现了问题
  2. 关注每秒分配container的数据对比是否处于正常值
  3. 平均nodeupdate时间和连续调度的次数

[Spark] 简单了解Spark2.0的内存管理

spark在1.6的时候就已经默认把内存管理变为UnifiedMemoryManager,如果需要用回StaticMemoryManager,可以通过设置spark.memory.useLegacyMode为true,就可以用回原来的模式,下图为默认时,不同区域的内存使用的划分

  • 1.ReservedMemory:预留内存,默认为300M,如果系统内存SystemMemory小于1.5倍的ReservedMemory就会报错
  • 2.UsableMemory:可用内存,计算方式为SystemMemory-ReservedMemory
  • 3.MaxMemory:最大可以使用的内存,等于usableMemory x memoryFraction(spark.memory.fraction=0.6)
  • 4.HeapStorageMemory:存储内存等于maxMemory x storageFraction(spark.memory.storageFraction默认0.5)
  • 5 .HeapExecutionMemory:等于maxMemory – HeapStorageMemory

UnifiedMemoryManager

通过上面的简图内存划分为不同的区域,而MemoryManager主要用来管理execution和storage他们之间是如何共享内存,他有两个实现类UnifiedMemoryManager和StaticMemoryManager,接下来会重点查看UnifiedMemoryManager是如何进行内存的分配的。

MemoryManager作为抽象类,它在初始化的时候出事会初始化ON_HEAP,OFF_HEAP的两个StorageMemoryPool和ExecutionMemoryPool对象,用来管理不同模式下的内存区域。

StorageMemoryPool主要是用来记录storage的一个可调整内存池的大小,而ExecutionMemoryPool相对它复杂,因为它需要保证每个task能够合理的共享内存,如果这里有N个task,它会确保每个task在spill之前会有至少1/2N的内存,且最多不能超过1/N,因为N是动态变化的,当task的数量改变的时候,我们会需要重新计算1/2N和1/N。

从ExecutionMemoryPool获取分配需要调用acquireMemory方法,首先第一步会先判断这个task是否属于Active task,如果不属于,则把他放到memoryForTask这个map的数据结构当中,value则记录该task当前使用了多少内存。

val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

并在每次循环获取足够acquireMemory的numBytes之前,都会尝试的去回收storage从execution借去的内存

当获取到的内存不能够满足required大小的需求时,它就会阻塞等待,直到内存满足

    // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }

TaskMemoryManager

UnifiedMemoryManager是对内存的统一管理,而TaskMemoryManager则是管理每个独立Task的内存分配,TaskMemoryManager通过MemoryManager的acquireExecutionMemory接口进行内存申请,如果不能满足,则从consumers中挑选可以spill,进行内存释放,什么是consumer,其实我们可以用过日志打印直观的看出改Task使用了哪些consumer.

log4j.logger.org.apache.spark.memory.TaskMemoryManager=DEBUG

然后就可以清楚的在日志里面看到了

17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 5.2 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 10.3 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 21.4 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 47.7 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9

对于consumer来说,他有不同的实现类,ShuffleExternalSorter就是其中一个consumer的实现类,当我们在ShuffleExternalSorter插入一条record时他就会调用acquireNewPageIfNecessary,尝试的从TaskMemoryManager获取一个MemoryBlock(即一个page)同时,会将这个page记录到自己的pageTable中,并得到对应的pageCursor偏移量。

Spark用MemoryLocation记录和追踪在off-heap或on-heap内存地址,在off-heap模式,内存可以直接通过64-bit长度的地址进行寻址,在In-heap模式中,内存由该对象的引用和64-bit的offset去寻址,MemoryBlock在MemoryLocation的基础上增加了pageNumber和对应数据的length。

简单的函数调用关系为:consumer.allocatePage -> TaskMemoryManager.allocatePage -> MemoryManager的MemoryAllocator.allocate

在HeapMemoryAllocator中,我们可以看到,我们会通过内存对齐产生一个long类型的数组,并通过这个数组构成一个MemoryBlock

long[] array = new long[(int) ((size + 7) / 8)];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);

在得到对应的MemoryBlock之后,通过pageCursor偏移量(通过unsafe的arrayBaseOffset获取对象头的长度,我的是16)将数据写入到内存当中

/**
 * Created by tangshangwen on 17-1-13.
 */
public class TestUnsafe {
    public static void main(String[] args) {
        sun.misc.Unsafe unsafe;
        try {
            Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            unsafe = (sun.misc.Unsafe) unsafeField.get(null);
        } catch (Throwable cause) {
            unsafe = null;
        }
        System.out.println(unsafe.arrayBaseOffset(long[].class));
    }
}

和copyMemory将数据复制到对应的内存位置当中,并每次对pageCursor进行和数据长度length相加,找到下次数据写入的位置

总结

通过阅读和理解代码,加深了spark对内存管理方面知识的理解

参考资料

Spark Tungsten in-heap / off-heap 内存管理机制

[Hadoop] NameNode sshFence的一个小bug

前几天集群在发生异常切换的时候,除了了以下警告日志

[2017-01-10T01:42:37.234+08:00] [WARN] hadoop.ha.SshFenceByTcpPort.pump(StreamPumper.java 88) [nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: StreamPumper for STDERR] : nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: nc: invalid option -- 'z'
[2017-01-10T01:42:37.235+08:00] [WARN] hadoop.ha.SshFenceByTcpPort.pump(StreamPumper.java 88) [nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: StreamPumper for STDERR] : nc -z xxx-xxx-17224.hadoop.xxx.com8021 via ssh: Ncat: Try `--help' or man(1) ncat for more information, usage options and help. QUITTING.

我们知道tryGracefulFence不成功之后会去fence的那个进程NN,我们看下代码

private boolean doFence(Session session, InetSocketAddress serviceAddr)
      throws JSchException {
    int port = serviceAddr.getPort();
    try {
      //这段日志已经出现,所以忽略
      LOG.info("Looking for process running on port " + port);
      int rc = execCommand(session,
          "PATH=$PATH:/sbin:/usr/sbin fuser -v -k -n tcp " + port);
      //这段日志没出现,所以代表执行该命令返回值为rc == 1
      if (rc == 0) {
        LOG.info("Successfully killed process that was " +
            "listening on port " + port);
        // exit code 0 indicates the process was successfully killed.
        return true;
      } else if (rc == 1) {
        // exit code 1 indicates either that the process was not running
        // or that fuser didn't have root privileges in order to find it
        // (eg running as a different user)
        LOG.info(
            "Indeterminate response from trying to kill service. " +
            "Verifying whether it is running using nc...");
     //然后通过这个命令去检查端口是否还在的时候,报错了,返回2,并不是执行返回1,而是执行方法有误
        rc = execCommand(session, "nc -z " + serviceAddr.getHostName() +
            " " + serviceAddr.getPort());
        if (rc == 0) {
          // the service is still listening - we are unable to fence
          LOG.warn("Unable to fence - it is running but we cannot kill it");
          return false;
        } else {
          LOG.info("Verified that the service is down.");
          return true;          
        }
      } else {
        // other 
      }
      LOG.info("rc: " + rc);
      return rc == 0;
    } catch (InterruptedException e) {
      LOG.warn("Interrupted while trying to fence via ssh", e);
      return false;
    } catch (IOException e) {
      LOG.warn("Unknown failure while trying to fence via ssh", e);
      return false;
    }
  }

然后在CentOS7执行这个命令时

[XXX@XXX-XXX-17223 ~]$ nc -z
nc: invalid option -- 'z'
Ncat: Try `--help' or man(1) ncat for more information, usage options and help. QUITTING.
[XX@XXX-XXX-17223 ~]$ echo $?       
2

意味着这句话返回值不为0,不代表fence成功,因为操作系统的nc版本问题,也有可能不为0,而为2

rc = execCommand(session, "nc -z " + serviceAddr.getHostName() +
            " " + serviceAddr.getPort());

更严格的判断为不为1,也不能判断fence成功

➜  ~ nc -z 127.0.0.1 3200
➜  ~ echo $?             
0
➜  ~ nc -z 127.0.0.3 3200
➜  ~ echo $?             
1

改代码已经在社区被提出了,也有了解决方案,但是没被合并,详情请看HDFS-3618HDFS-11308

[Spark] 简单分析Spark的RPC通信框架

在Spark中,已经采用了Netty作为RPC的通信框架,其通信的RpcEndpointRef和RpcEndpoint都是通过RpcEnv进行创建,在创建RpcEnv时,最终会调用NettyRpcEnvFactory中的create方法,并通过传入的clientMode去决定是否启动TransportServer。

以下是NettyRpcEnv的简图

除了Dispatcher之外,还有Inbox,Outbox,RpcEndpoint,RpcEndpointRef等几个重要的类

RpcEndpointRef的作用

进行远程通信时,一般都需要一个client一个server,而RpcEndpointRef就相当于一个client的角色,并且通过RpcEnv的实现类(NettyRpcEnv)的asyncSetupEndpointRefByURI进行创建在NettyRpcEndpointRef中我们可以看到,其实他只是需要RpcEndpoint对应的ip,port和RpcEndpoint name,然后程序在调用ask或者send方法发送信息时,NettyRpcEnv会根据他所提供的地址信息封装成RequestMessage进行处理,这里这个this指的是一个NettyRpcEndpointRef

  override def send(message: Any): Unit = {
    require(message != null, "Message is null")
    nettyEnv.send(RequestMessage(nettyEnv.address, this, message))
  }

程序在发送RequestMessage之前,会先判断改发送的地址是否是本地的地址,如果不是,则将message封装为OutboxMessage,并放到Outbox当中,如果是本地,则通过Dispatcher把message放到对应EndpointData的inbox里面

  private[netty] def send(message: RequestMessage): Unit = {
    val remoteAddr = message.receiver.address
    if (remoteAddr == address) {
      // Message to a local RPC endpoint.
      try {
        dispatcher.postOneWayMessage(message)
      } catch {
        case e: RpcEnvStoppedException => logWarning(e.getMessage)
      }
    } else {
      // Message to a remote RPC endpoint.
      postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))
    }
  }

Inbox,Outbox的作用

简单来说,Inbox主要的作用是存储发送给RpcEndpoint的消息,Outbox就是存放发送到remote host的message的地方

当程序在调用targetOutbox.send(message)时,该message会先放到OutBox内部的messages的list当中,然后通过传入的TransportClient发送到对应的RpcEndpoint

  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) {
        true
      } else {
        messages.add(message)
        false
      }
    }
    if (dropped) {
      message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
    } else {
      drainOutbox()
    }
  }

RpcEndpoint的作用

像刚才说的,客户端有了,服务端也就是RpcEndpoint,就像CoarseGrainedExecutorBackend一样,它也是一个RpcEndpoint,并实现了对应的接口(receive,onStart)等等。 在RpcEndpoint启动时,需要RpcEnv中setupEndpoint,也就是向Dispatcher注册RpcEndpoint,这样dispatcher才能把message分发对应的RpcEndpoint当中

    env.rpcEnv.setupEndpoint("Executor", 
new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

下面的就是Dispatcher中的注册逻辑,里面维护着endpoints,endpointRefs和receivers等几个重要的数据结构

      def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
        val addr = RpcEndpointAddress(nettyEnv.address, name)
        val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
        synchronized {
          if (stopped) {
            throw new IllegalStateException("RpcEnv has been stopped")
          }
          if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          }
          val data = endpoints.get(name)
          endpointRefs.put(data.endpoint, data.ref)
          receivers.offer(data)  // for the OnStart message
        }
    endpointRef
}

就想刚才所说的如果我传进去的clientMode为false,就会启动相应的TransportServer监听该主机对应的端口,NettyRpcHandler通过反序列化得到对应的RequestMessage,并通过message的message.receiver.name找到对应EndpointData,并把message放到对应的inbox中

  private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        data.inbox.post(message)
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

然后通过把对应的EndpointData放到receivers中,通过设置好的线程池的线程去消费receivers里面的EndpointData,从而调用Endpoint里面的的receive等实现方法进行不同的逻辑处理

/** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            val data = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

一个流程就大约这样了,通过分析能够加深对框架设计和spark的认识。

参考资料

Spark RPC通信层设计原理分析