现象描述:

之前遇到有些任务在队列满时,reduce一直被kill,任务运行了很久,导致进入了死循环的状态,以下是原因分析

我们先看任务:

2016-05-31 19:29:46,382 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from NEW to UNASSIGNED

到分配

2016-05-31 19:39:50,999 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigned container container_e08_1464351197737_140388_01_001657 to attempt_1464351197737_140388_m_000836_0
2016-05-31 19:39:51,002 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from UNASSIGNED to ASSIGNED

启动:

2016-05-31 19:39:51,002 INFO [ContainerLauncher #216] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Processing the event EventType: CONTAINER_REMOTE_LAUNCH for container container_e08_1464351197737_140388_01_001657 taskAttempt attempt_1464351197737_140388_m_000836_0

开始跑:

2016-05-31 19:39:51,014 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from ASSIGNED to RUNNING

完成清理相关信息,此时container已经退出,只是AM保存这个Task在这个机器的状态是success的:

2016-05-31 19:40:00,511 INFO [IPC Server handler 20 on 34613] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Done acknowledgement from attempt_1464351197737_140388_m_000836_0
2016-05-31 19:40:00,512 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from RUNNING to SUCCESS_CONTAINER_CLEANUP
2016-05-31 19:40:00,518 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from SUCCESS_CONTAINER_CLEANUP to SUCCEED

因为该节点是不可用的状态,打印了日志,原理是,如果该节点不可用则把该节点的TaskAttemp给kill了

2016-05-31 21:09:35,528 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl: TaskAttempt killed because it ran on unusable node BJM6-Decare-14057.hadoop.jd.local:50086. AttemptId:attempt_1464351197737_140388_m_000836_0

但是该Task是SUCCESS状态:

2016-05-31 21:09:35,529 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from SUCCEEDED to KILLED

总的来说这个task完成了他自身的所有流程,程序也跑完了,就是因为NM汇报它自身为不健康节点导致AM把这个节点的所有Task给KILL了所以有SUCCEEDED to KILLED

每个Task完成都会触发AttemptSucceededTransition,会发送事件到AM中,触发TaskCompletedTransition,然后job.completedTaskCount++就进行相加了,也就是说每一个task完成都会被相加,直到所有的map跑完才执行reduce

private static class TaskCompletedTransition implements
    MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {

  @Override
  public JobStateInternal transition(JobImpl job, JobEvent event) {
    job.completedTaskCount++;
    LOG.info("Num completed Tasks: " + job.completedTaskCount);
    JobTaskEvent taskEvent = (JobTaskEvent) event;
    Task task = job.tasks.get(taskEvent.getTaskID());
    if (taskEvent.getState() == TaskState.SUCCEEDED) {
      taskSucceeded(job, task);
    } else if (taskEvent.getState() == TaskState.FAILED) {
      taskFailed(job, task);
    } else if (taskEvent.getState() == TaskState.KILLED) {
      taskKilled(job, task);
    }

    return checkJobAfterTaskCompletion(job);
  }

所有有了以下日志:

2016-05-31 19:45:09,275 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Before Scheduling: PendingReds:100 ScheduledMaps:0 ScheduledReds:0 AssignedMaps:2 AssignedReds:0 CompletedMaps:1885 CompletedReds:0 ContAlloc:1899 ContRel:9 HostLocal:1572 RackLocal:291
2016-05-31 19:45:09,279 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Recalculating schedule, headroom=&lt;memory:-4096, vCores:763&gt;
2016-05-31 19:45:09,279 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Reduce slow start threshold reached. Scheduling reduces.

因为在Reduce跑着的过程中,发现map所在的机器的节点出现了不健康,所以AM把Task的状态SUCCESS转换为KILLED事件,进行Task的重新调度:

private static class MapTaskRescheduledTransition implements
    SingleArcTransition<JobImpl, JobEvent> {
  @Override
  public void transition(JobImpl job, JobEvent event) {
    //succeeded map task is restarted back
    job.completedTaskCount--;
    job.succeededMapTaskCount--;
  }
}

对应的数值开始减1,然后在心跳线程中重新申请资源preemptReducesIfNeeded:

if (recalculateReduceSchedule) {
  preemptReducesIfNeeded();
  scheduleReduces(
      getJob().getTotalMaps(), completedMaps,
      scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
      assignedRequests.maps.size(), assignedRequests.reduces.size(),
      mapResourceRequest, reduceResourceRequest,
      pendingReduces.size(), 
      maxReduceRampupLimit, reduceSlowStart);
  recalculateReduceSchedule = false;
}

如果map的数量大于0,集群资源不满足则kill reduce

@Private
@VisibleForTesting
void preemptReducesIfNeeded() {
  if (reduceResourceRequest.equals(Resources.none())) {
    return; // no reduces
  }
  //check if reduces have taken over the whole cluster and there are 
  //unassigned maps
  if (scheduledRequests.maps.size() > 0) {
    Resource resourceLimit = getResourceLimit();
    Resource availableResourceForMap =

日志都打印出来没什么好说的了:

2016-05-31 21:12:35,401 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Going to preempt 1 due to lack of space for maps

然后抢到资源:

2016-05-31 21:12:37,408 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigned container container_e08_1464351197737_140388_01_002139 to attempt_1464351197737_140388_m_000836_1

开始跑了

总结:

设置slow start阀值为1,map跑完后就开始跑Reduce了,但是过程中有些节点变成了不可用,所以相应的task就会从SUCCESS状态变为KILLED状态进行重新调度,因为所有的Reduce都开始跑队列资源沾满的情况下,开始尝试kill自己的Reduce看能不能抢到资源,抢不到继续KILL,直到抢到运行完map为止,Reduce才能继续执行。