分类 yarn 下的文章

[HADOOP 问题] NodeManager OOM挂掉问题解决

在更换JDK1.6_25到JDK1.7_45后,集群出现频繁死掉NM,出现结果为如下:

2015-08-12 16:35:06,662 FATAL org.apache.hadoop.yarn.YarnUncaughtExceptionHandler: Thread Thread[process reaper,10,system] threw an Error. Shutting down now...
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.UNIXProcess$ProcessPipeInputStream.drainInputStream(UNIXProcess.java:267)
at java.lang.UNIXProcess$ProcessPipeInputStream.processExited(UNIXProcess.java:280)
at java.lang.UNIXProcess.processExited(UNIXProcess.java:187)
at java.lang.UNIXProcess$3.run(UNIXProcess.java:175)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

和类似的

2015-08-12 16:37:56,893 FATAL org.apache.hadoop.yarn.YarnUncaughtExceptionHandler: Thread Thread[process reaper,10,system] threw an Error. Shutting down now...
java.lang.OutOfMemoryError: Java heap space
at java.lang.UNIXProcess$ProcessPipeInputStream.drainInputStream(UNIXProcess.java:267)
at java.lang.UNIXProcess$ProcessPipeInputStream.processExited(UNIXProcess.java:280)
at java.lang.UNIXProcess.processExited(UNIXProcess.java:187)
at java.lang.UNIXProcess$3.run(UNIXProcess.java:175)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

在google搜索关键字hadoop UNIXProcess drainInputStream,找到关于JDK7的一些bug,在NM负载高的情况下,出现OOM问题。 详情请看HADOOP-10146

和一些相关解释:

JDK-8027348

JDK-8024521

后来更换JDK1.7_67则没出现OOM的问题

[YARN] MRAppMaster心跳原理

最近集群遇到一个问题,就是集群在跑任务的时候,AM会超时10min而被KILL,但任务重跑则成功,问题是随机的出现的,所以初步怀疑是因为AM心跳汇报出现问题或则RM因为繁忙hang住,AM因为某些机制导致等待10min不汇报心跳,所以我们还是先了解,AM是如何向RM汇报心跳的。

在MRAppMaster中,ContainerAllocatorRouter负责向RM申请资源(发送心跳)

RMAM

RMContainerAllocator其最终父类是RMCommunicator,它实现了RMHeartbeatHandler接口

public interface RMHeartbeatHandler {
  long getLastHeartbeatTime(); // 获取上一次心跳的时间
  void runOnNextHeartbeat(Runnable callback); // 回调注册到callback队列的callback函数
}

每一次心跳回来,都会执行一次注册在heartbeatCallbacks中的回调函数:

allocatorThread = new Thread(new Runnable() {
      @Override
      public void run() {
        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
              ......
              heartbeat();            
              lastHeartbeatTime = context.getClock().getTime();// 记录上一次心跳时间
              executeHeartbeatCallbacks(); // 执行回调函数
              ....
});

RMCommunicator类中:

private void executeHeartbeatCallbacks() {
    Runnable callback = null;
    while ((callback = heartbeatCallbacks.poll()) != null) {
      callback.run();
    }
  }

在RMCommunicator启动时,首先会向RM注册,把自己的host和port告诉RM,然后在启动一条线程(startAllocatorThread)定期的调用RMContainerAllocator中实现的heartbeat方法(向RM申请资源,定期汇报信息,告诉RM自己还活着)。

AM初始化同时也会初始化RMCommunicator:

protected void serviceStart() throws Exception {
  scheduler= createSchedulerProxy(); // 获取RM的代理
  register(); // 注册
  startAllocatorThread(); // 心跳线程
....
}

AM的ContainerAllocatorRouter事件处理流程如下图:

RMALLO

注册流程:

调用RMCommunicator远程调用ApplicationMasterService的registerApplicationMaster方法,设置维护responseId,然后把它加入AMLivelinessMonitor中,并使用map记录时间,用来监控AM是否因为长时间没有心跳而超时,如果AM长时间没有心跳信息更新,RM就会通知NodeManager把AM移除。

心跳线程:

在发送心跳的过程中,即也是获取资源的过程

@Override
  protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
    List<Container> allocatedContainers = getResources();// 重要的方法
    if (allocatedContainers.size() > 0) {
      scheduledRequests.assign(allocatedContainers);
    }
   ......
  }

获取资源的过程:

private List<Container> getResources() throws Exception {
     ...
     response = makeRemoteRequest(); // 和RM进行交互
     ...
     // 优先处理RM发送过来的命令
     if (response.getAMCommand() != null) {
         switch(response.getAMCommand()) {
                case AM_RESYNC:
                case AM_SHUTDOWN:
                     eventHandler.handle(new JobEvent(this.getJob().getID(),
                                     JobEventType.JOB_AM_REBOOT));
                     throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
                             this.getContext().getApplicationID());
                default:
                     ....
      }
     // 等等一系列处理
}
}

构建请求:

protected AllocateResponse makeRemoteRequest() throws IOException {
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
          new ArrayList<ContainerId>(release), blacklistRequest);
    AllocateResponse allocateResponse;
    allocateResponse = scheduler.allocate(allocateRequest); // RPC调用ApplicationMasterService的allocate方法
    .....
}

每一次心跳的调用都会刷新AMLivelinessMonitor的时间,代表AM还活着

而且我们通过代码可以看出,资源请求被封装为一个ask,即一个ResourceRequest的ArrayList的资源列表 例如:

priority:20 host:host9 capability:<memory:2048, vCores:1>
priority:20 host:host2 capability:<memory:2048, vCores:1>
priority:20 host:host10 capability:<memory:2048, vCores:1>
priority:20 host:/rack/rack3203 capability:<memory:2048, vCores:1>
priority:20 host:/rack/rack3202 capability:<memory:2048, vCores:1>
priority:20 host:* capability:<memory:2048, vCores:1>

然而,ask是如何被构造的呢?

RMContainerAllocator中的addMap,addReduce,assign方法中对ask的数据内容进行了修改

addContainerReq --> addResourceRequest --> addResourceRequestToAsk;

通过在代码自己添加日志可以看出,资源会被分为local,rack,和any级别去申请资源

最终变为一个ask list发送到RM上:

 ask Capability:<memory:2048, vCores:1> ResourceName:* NumContainers:384 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3201 NumContainers:227 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3202 NumContainers:231 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3203 NumContainers:152 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3204 NumContainers:158 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:host1 NumContainers:46 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:host5 NumContainers:52 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:host6 NumContainers:38 Priority:20 RelaxLocality:true

类似日志为:

getResources() for application_1438330253091_0004: ask=29 release= 0 newContainers=0 finishedContainers=0 resourcelimit=<memory:0, vCores:0> knownNMs=24

总结:

除了了解心跳之外,还学习了许多Map和Reduce的分配机制,收获良多。

[YARN]YARN的AsyncDispatcher原理

YARN采用了基于事件驱动的并发模型,该模型能极大的提高应用程序并发性,在RM中,几乎所有的事件都通过AsyncDispatcher进行事件的派发.

其基本架构图如下:

AsyncDispatcher

从基本的架构图可以简单的看出,该模型还需要几个基本的要素,那就是事件(Event),事件类型(EventType)和处理事件对应的处理器(Handler).

在HADOOP中,事件被定义如下:

public interface Event<TYPE extends Enum<TYPE>> {

  TYPE getType();
  long getTimestamp();
  String toString();
}

事件类型(EventType)则是简单的枚举类

主要功能定义事件有哪几种类型:

public enum NodesListManagerEventType {
  NODE_USABLE,
  NODE_UNUSABLE
}

处理事件的接口

主要功能处理相应的事件

public interface EventHandler<T extends Event> {
  void handle(T event);
}

Dispatcher通过不同的事件类型(EventType)找到相应的handler对事件(event)进行处理.

对于AsyncDispatcher来说,它实现了Dispatcher接口:

public interface Dispatcher {
  EventHandler getEventHandler();
  void register(Class<? extends Enum> eventType, EventHandler handler);
}

其中有两个基本的方法,registergetEventHandler

register在AsyncDispatcher使用之前就需要先注册eventType和对应的EventHandler,而getEventHandler方法主要则是把事件(event)放入eventQueue中.

接下来在ResourceManager举个简单的例子:

在RM初始化自身基本服务的时候,会把相应的事件类型(EventType)和事件处理器(EventHandler),先注册在AsyncDispatcher上,以便于派发器在事件(event)到来时做出相应的处理.

RM的部分代码:

// Register event handler for RmNodes
this.rmDispatcher.register(RMNodeEventType.class,
    new NodeEventDispatcher(this.rmContext)); 

其实注册也就是把相应的类型和处理器放到一个HashMap

因为是资源管理方面的服务,所以我们进入ResourceTrackerService类中,找到nodesListManager这个实例,通过代码可以知道nodesListManager是用来管理节点是否可用,并作出相应的处理

// 2. Check if it's a valid (i.e. not excluded) node
    if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
      String message =
          "Disallowed NodeManager nodeId: " + nodeId + " hostname: "
              + rmNode.getNodeAddress();
      LOG.info(message);
      shutDown.setDiagnosticsMessage(message);
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
      return shutDown;
    }

从代码可以看出,如果节点是非法的,则从Dispatcher获取Handler,并构造一个RMNodeEventType.DECOMMISSION类型的事件,这个RMNodeEvent将会被放到eventQueue中

class GenericEventHandler implements EventHandler<Event> {
    public void handle(Event event) {
      try {
        eventQueue.put(event);
      } catch (InterruptedException e) {

      }
    };
  }

最后由dispatch进行通过传入的RMNodeEventType找到相应的NodeEventDispatcher(即EventHandler),并调用handle进行处理.

总结:
通过了解AsyncDispatcher可以提升自己理解Yarn的工作流程,加深对Yarn的设计实现的了解.

[YARN] Label Based Scheduling

目标:

  1. 提供一种可靠的机制在特定的节点运行应用程序。
  2. 为队列提供一种可靠的机制在特定的节点上进行资源的调度
  3. 提供一种机制解决队列和application所定义节点在哪运行

定义

  • Node Label – 描述一个节点的标签,每个节点可以拥有多个标签
  • Label expression – 逻辑上的标签组合(使用 && 或 and,|| 或 or,!或 not)
  • Queue label – Label expression 决定哪些节点属于这个队列
  • Application Label – Label expression 决定应用程序容器在哪些节点运行
  • Queue Label Policy – Label Policy 配置在Queue中,其定义规则解决application label和queue label之间的冲突

使用案例

cluster A使用Fair Scheduler去调度application在他的节点上。 cluster 管理员定义多个队列包括嵌套队列去划分资源给不同的组织,所以每个组织都可以共享资源去使用。 除此之外,集群还在不同方面存在异构:

  • 拥有不同类型的硬件(一些节点拥有大内存,多CPU,有些则网络带宽占优势)
  • 数据在DFS中切分到分配到不同的节点/机架上
  • 其他的不同等等

cluster管理员根据差异用标签标记所有的节点:

  • 大内存使用red标记
  • 高CPU的使用blue标记
  • 高网络带宽的使用green标记
  • 节点在机架A标记为-rackA
  • 节点在机架B标记未-rackB

因为被标记为red的节点可以在rackA和rackB cluster管理员可以分配多个标签给它:

Node Regex Label1 Label2 LabelN
rackA_red red rackA
rackA_blue blue rackA
rackB_red red rackB

如果我们希望所有的应用提交到Queue1即运行在“rackA_red”或“rackA_blue”节点上,cluster管理员可以调整label expression即配置Queue1为(red || blue)&& rackA

另外cluster管理员配置Queue Policy去定义调度器的行为,假设提交的application也拥有自己的label expression

  • Queue Lable Expression优先
  • Application Label Expression优先
  • 取它们的交集
  • 取它们的或集

现在,用户B提交一个app到Queue1,但是这个app属于计算密集型的,他希望运行在高cpu的节点上,所以,这个app的lable expression可以为 “blue && rackA”

幸运的是集群管理员设置Queue Policy为“AND”,所以application将会被调度在“rackA_blue”的节点上运行。 假设集群管理员设置Queue Policy为“OR”,结果将会变为:“((red || blue) && rackA) || (blue && rackA)” ,即”(red || blue) && rackA”

设计

Node Labels, Label Expressions, Policies

Node Labels

最初的JIRA建议每个节点指定他们自己拥有的标签,并提供这些信息给ResourceManager. 我们建议集中式的把节点的标签存储到DFS上,所有的YARN daemons都可以访问,修改也不需要同步到整个集群或者客户端

标签可以被指定为下列的格式:

perfnode.* big, fast, physical, scale.* virtual, “slow”, perfnode30 ‘bad’

Label Expressions, Policies

Label expression 是一种逻辑表达式,它可以包含多个标签 (例如:big || virtual && !bad). Application Label expression 可以定义应用想要运行在哪些节点的集合 Queue Label expression 可以定义该队列“希望”应用运行在哪些节点。

Queue Label Policy

每个队列可以配置label expression和application Label Expression相互作用,通过配置Queue Label Policy可以解决Queue label和application label之间的冲突,决定应用在哪些类型节点上运行。

Label Manager

Label Manager是RM的一个新的服务 这个服务的主要作用为:

  • 从DFS加载节点的标签和维护内部节点和节点标签的map
  • 检查文件的变化,并更新内部节点和标签的映射关系
  • 基于 Application Label Expression, Queue Label Expression和Queue Label Policy结合构造新的逻辑表达式

英文不太好,望多多指正

参考资料: YARN-796