以下是队列资源分配的简单架构

调度器的container资源分配

通过简图可以看出,container是分配是随着每一次nodeupdate而进行资源分配的,在每一次尝试调度container之前,首先会检查改节点是否曾经有预留的app(预留指的是该node曾经因为资源不足,为了提高本地性原因,当节点有资源更新时,优先的把这个节点的资源分配给这个app的策略)

// 类名FairScheduler.java
//  Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations

FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
  Priority reservedPriority = node.getReservedContainer().getReservedPriority();
  FSQueue queue = reservedAppSchedulable.getQueue();
// 之前有预留,不满足条件则释放
  if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
      || !fitsInMaxShare(queue,
      node.getReservedContainer().getReservedResource())) {
    // Don't hold the reservation if app can no longer use it
    LOG.info("Releasing reservation that cannot be satisfied for application "
        + reservedAppSchedulable.getApplicationAttemptId()
        + " on node " + node);
    reservedAppSchedulable.unreserve(reservedPriority, node);
    reservedAppSchedulable = null;
  } else {
    // Reservation exists; try to fulfill the reservation
    if (LOG.isDebugEnabled()) {
      LOG.debug("Trying to fulfill reservation for application "
          + reservedAppSchedulable.getApplicationAttemptId()
          + " on node: " + node);
    }
   // 满足条件,则分配
    node.getReservedAppSchedulable().assignReservedContainer(node);
  }
}

总的来说:

  • 1、如果该app在该节点没有container的需求,则释放预留container
  • 2、如果该app在该节点还有container的需求,但队列的MaxShare无法满足,则释放该节点预留的container
  • 3、如果该app这个优先级的资源的请求已经为0,释放预留的container

否则检查这个node的可用资源和需求的资源是否满足,如果不满足,则预留,满足则分配

如果该节点没有app预留,则把该node从RootQueue进行分配下去,其中还有分配参数assignMultiple和maxAssign限制该node分配的container数量。

因为我们通过图可以知道,其实资源分配可以看成三层结构,ParetnQueue到LeafQueue,再到FSAppAttempt

那我们提出一个问题,当资源分配,如何选取最适合的队列?如何选取最适合的App?

在每次进行资源分配,都会采取排序的操作

Collections.sort(childQueues, policy.getComparator());

app进行排序

Collections.sort(runnableApps, comparator);

找出最适合的子队列和最适合的app, 对于采取fair算法策略的队列来说,其比较实现在FairShareComparator的compare方法中,对于每个可排序的Schedulable,包括叶子队列,FSAppAttempt,第一步都会计算两个Schedulable的minShare,即通过配置的最小资源要求和每个Schedulable的demand进行比较的最小值

  Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
      s1.getMinShare(), s1.getDemand());
  Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
      s2.getMinShare(), s2.getDemand());

然后通过当前的资源使用和minshare进行比较等等,详细可以自行查看源码

  boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s1.getResourceUsage(), minShare1);
  boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s2.getResourceUsage(), minShare2);

以上可以总结为,如果Schedulable都低于他们的minShare,则通过他们低于minShare的比例的多少来比较,例如,如果一个job A拥有8个task(minShare 10即比例为80%),job B拥有50个task(minShare 100即比例为50%),则job B将会有更高的优先级在下次资源分配时获取资源。

如果Schedulable在他们的minShare之上,则通过比较他们的(runningTasks/weight),如果所有的权重都相等,则slot资源需求少的job优先获得资源,另外,如果一个job拥有更高的权重则拥有机会获得多slot。

解决完选取子队列和app之后,接下来FSAppAttempt自身处理资源的问题了,也就是主要考虑这个FSAppAttempt的计算时的本地性问题

为了提升本地性,对于每个优先级,都会尝试的优先分配本地节点,然后再是机架,off-switch的请求通常会被延迟调度。

分配的策略简要为

从appSchedulingInfo中获取该节点或机架,该优先级对应的资源请求

ResourceRequest rackLocalRequest = getResourceRequest(priority,
    node.getRackName());
ResourceRequest localRequest = getResourceRequest(priority,
    node.getNodeName());

然后满足条件的情况下尝试分配node local的节点,如果满足分配条件则分配成功,如果container的请求满足队列最大的MaxShare,则预留该节点给这个FSAppAttempt

if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
    && localRequest != null && localRequest.getNumContainers() != 0) {
  return assignContainer(node, localRequest,
      NodeType.NODE_LOCAL, reserved);
} 

因为我们经常想优先的调度node-local的container, 其次再为rack-local和off-switch的container去保证最大可能的本地性,为了达到这个目标,我们首先对给定的优先级分配node-local,如果我们在很长的一段时间内没有成功调度,则放松本地性的阀值。

如上所述,通过判断上一次的NodeType类型去判断使用nodeLocalityThreshold( yarn.scheduler.fair.locality.threshold.node默认-1.0f ) 或 rackLocalityThreshold( yarn.scheduler.fair.locality.threshold.rack默认-1.0f ),从而去决定是否改变本地性级别。

if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
  // 满足调度机会条件,则将NODE_LOCAL级别降为RACK_LOCAL级别
  if (allowed.equals(NodeType.NODE_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
    resetSchedulingOpportunities(priority);
  }
  // 满足调度机会条件,则将RACK_LOCAL级别降为OFF_SWITCH级别
  else if (allowed.equals(NodeType.RACK_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
    resetSchedulingOpportunities(priority);
  }
}
// 否则本地性等级不变
return allowedLocalityLevel.get(priority);

Application所需要的demand如何计算?

在FSAppAttempt中,其维护着每一个Application在Fair Scheduler的相关调度信息,包括Application所需要的demand,它的fairShare和allowedLocalityLevel等其他相关信息

然而在上文也有提到,每个FSAppAttempt或queue都会用它的minshare和demand比较,而demand是怎么来的呢?

在FSAppAttempt中,每一次AppMaster申请资源都会更新appSchedulingInfo里面的requests对象

在FairScheduler中则有线程定时的去调度UpdateThread线程,去重新更新每个队列,FSAppAttempt所需demand和重新计算FairShare等

每一层的demand等于min(下一层demand,最大资源分配),FSAppAttempt的demand则为当前使用的资源大小+未分配的资源大小

  @Override
  public void updateDemand() {
    demand = Resources.createResource(0);
    // Demand is current consumption plus outstanding requests
    Resources.addTo(demand, getCurrentConsumption());

    // Add up outstanding resource requests
    synchronized (this) {
      for (Priority p : getPriorities()) {
        for (ResourceRequest r : getResourceRequests(p).values()) {
          Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
          Resources.addTo(demand, total);
        }
      }
    }
  }