以下是队列资源分配的简单架构

调度器的container资源分配
通过简图可以看出,container是分配是随着每一次nodeupdate而进行资源分配的,在每一次尝试调度container之前,首先会检查改节点是否曾经有预留的app(预留指的是该node曾经因为资源不足,为了提高本地性原因,当节点有资源更新时,优先的把这个节点的资源分配给这个app的策略)
// 类名FairScheduler.java
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
FSQueue queue = reservedAppSchedulable.getQueue();
// 之前有预留,不满足条件则释放
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
|| !fitsInMaxShare(queue,
node.getReservedContainer().getReservedResource())) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node " + node);
reservedAppSchedulable.unreserve(reservedPriority, node);
reservedAppSchedulable = null;
} else {
// Reservation exists; try to fulfill the reservation
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to fulfill reservation for application "
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node: " + node);
}
// 满足条件,则分配
node.getReservedAppSchedulable().assignReservedContainer(node);
}
}
总的来说:
- 1、如果该app在该节点没有container的需求,则释放预留container
- 2、如果该app在该节点还有container的需求,但队列的MaxShare无法满足,则释放该节点预留的container
- 3、如果该app这个优先级的资源的请求已经为0,释放预留的container
否则检查这个node的可用资源和需求的资源是否满足,如果不满足,则预留,满足则分配
如果该节点没有app预留,则把该node从RootQueue进行分配下去,其中还有分配参数assignMultiple和maxAssign限制该node分配的container数量。
因为我们通过图可以知道,其实资源分配可以看成三层结构,ParetnQueue到LeafQueue,再到FSAppAttempt
那我们提出一个问题,当资源分配,如何选取最适合的队列?如何选取最适合的App?
在每次进行资源分配,都会采取排序的操作
Collections.sort(childQueues, policy.getComparator());
app进行排序
Collections.sort(runnableApps, comparator);
找出最适合的子队列和最适合的app, 对于采取fair算法策略的队列来说,其比较实现在FairShareComparator的compare方法中,对于每个可排序的Schedulable,包括叶子队列,FSAppAttempt,第一步都会计算两个Schedulable的minShare,即通过配置的最小资源要求和每个Schedulable的demand进行比较的最小值
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
然后通过当前的资源使用和minshare进行比较等等,详细可以自行查看源码
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
以上可以总结为,如果Schedulable都低于他们的minShare,则通过他们低于minShare的比例的多少来比较,例如,如果一个job A拥有8个task(minShare 10即比例为80%),job B拥有50个task(minShare 100即比例为50%),则job B将会有更高的优先级在下次资源分配时获取资源。
如果Schedulable在他们的minShare之上,则通过比较他们的(runningTasks/weight),如果所有的权重都相等,则slot资源需求少的job优先获得资源,另外,如果一个job拥有更高的权重则拥有机会获得多slot。
解决完选取子队列和app之后,接下来FSAppAttempt自身处理资源的问题了,也就是主要考虑这个FSAppAttempt的计算时的本地性问题
为了提升本地性,对于每个优先级,都会尝试的优先分配本地节点,然后再是机架,off-switch的请求通常会被延迟调度。
分配的策略简要为
从appSchedulingInfo中获取该节点或机架,该优先级对应的资源请求
ResourceRequest rackLocalRequest = getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = getResourceRequest(priority,
node.getNodeName());
然后满足条件的情况下尝试分配node local的节点,如果满足分配条件则分配成功,如果container的请求满足队列最大的MaxShare,则预留该节点给这个FSAppAttempt
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, localRequest,
NodeType.NODE_LOCAL, reserved);
}
因为我们经常想优先的调度node-local的container, 其次再为rack-local和off-switch的container去保证最大可能的本地性,为了达到这个目标,我们首先对给定的优先级分配node-local,如果我们在很长的一段时间内没有成功调度,则放松本地性的阀值。
如上所述,通过判断上一次的NodeType类型去判断使用nodeLocalityThreshold( yarn.scheduler.fair.locality.threshold.node默认-1.0f ) 或 rackLocalityThreshold( yarn.scheduler.fair.locality.threshold.rack默认-1.0f ),从而去决定是否改变本地性级别。
if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
// 满足调度机会条件,则将NODE_LOCAL级别降为RACK_LOCAL级别
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(priority);
}
// 满足调度机会条件,则将RACK_LOCAL级别降为OFF_SWITCH级别
else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(priority);
}
}
// 否则本地性等级不变
return allowedLocalityLevel.get(priority);
Application所需要的demand如何计算?
在FSAppAttempt中,其维护着每一个Application在Fair Scheduler的相关调度信息,包括Application所需要的demand,它的fairShare和allowedLocalityLevel等其他相关信息
然而在上文也有提到,每个FSAppAttempt或queue都会用它的minshare和demand比较,而demand是怎么来的呢?
在FSAppAttempt中,每一次AppMaster申请资源都会更新appSchedulingInfo里面的requests对象

在FairScheduler中则有线程定时的去调度UpdateThread线程,去重新更新每个队列,FSAppAttempt所需demand和重新计算FairShare等

每一层的demand等于min(下一层demand,最大资源分配),FSAppAttempt的demand则为当前使用的资源大小+未分配的资源大小
@Override
public void updateDemand() {
demand = Resources.createResource(0);
// Demand is current consumption plus outstanding requests
Resources.addTo(demand, getCurrentConsumption());
// Add up outstanding resource requests
synchronized (this) {
for (Priority p : getPriorities()) {
for (ResourceRequest r : getResourceRequests(p).values()) {
Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
Resources.addTo(demand, total);
}
}
}
}