[YARN] NodeManager因为ContainerMetric导致OOM

现象描述

之前hadoop2.7.1的集群,经常运行一段时间后会触发OOM,导致上面的map需要重跑,能想到的一种方案是调整GC参数,利用GC回收器对内存进行回收,另外一种情况则感觉代码可能处理有问题。

关键日志:

2016-07-05 09:35:40,907 WARN org.apache.hadoop.ipc.Client: Unexpected error reading responses on connection Thread[IPC Client (2069725879) connection to rmhost:8031 from yarn,5,main]
java.lang.OutOfMemoryError: Java heap space
        at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:120)
        at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:68)

后来调整GC参数,追踪到底哪里出了问题,以下是参数参考

YARN_NODEMANAGER_OPTS="-Xmx2g -Xms2g -Xmn1g -XX:PermSize=128M -XX:MaxPermSize=128M -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data1/yarn-logs/nm_dump.log -Dcom.sun.management.jmxremote -Xloggc:/data1/yarn-logs/nm_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:ErrorFile=/data1/yarn-logs/nm_err_pid"

后来经过分析,确实是代码处理有问题

containerMetric占用了大部分内存

后来社区有patch可以修复HADOOP-13362

解决方案

  • 1.关掉ContainerMetric,默认是开启的(yarn.nodemanager.container-metrics.enable默认true
    改为false)
  • 2.打patch

参考工具

IBM分析工具

[Hadoop问题] IOException: Got error, status message , ack with firstBadLink as xxx.xx.xx.xx:50010

关键日志:

2016-08-24T02:00:32.362 08:00] [ERROR] hadoop.hdfs.DFSClient.closeAllFilesBeingWritten(DFSClient.java 980) [Thread-0] : Failed to close inode 22083674
java.io.IOException: Got error, status message , ack with firstBadLink as xxx.xxx.xxx.xxx:50010
at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:140)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1334)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1237)
at org.apache.hadoop.hdfs.DFSOutputStream

原因当时DN超过了每个磁盘预留的限制:

[2016-08-24T02:00:31.494+08:00] [INFO] server.datanode.DataNode.writeBlock(DataXceiver.java 837) [DataXceiver for client DFSClient_NONMAPREDUCE_103164656_1 at /172.16.183.26:29414 [Receiving block BP-964157621-172.16.172.17-1470393241028:blk_1094132102_20397583]] : opWriteBlock BP-964157621-172.16.172.17-1470393241028:blk_1094132102_20397583 received exception org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space: The volume with the most available space (=123444224 B) is less than the block size (=134217728 B).

原因是balance的速度不够迅速,有一种的解决方案,但没测过,想法是每个盘可写最多不超出128M(预留为100G),即第一个满了可以写,但不能超过一个block的大小,这样目测可以避免这样的问题。

[HADOOP] 慢任务排查问题集合

1.某些任务因为一个task变慢导致整个job变慢

场景:

之前遇到有些任务的map执行很慢,然后发现在执行任务时读取某些文件变慢,但就是不知道慢在哪,这时我们可以在那台机器,打开debug日志

export HADOOP_ROOT_LOGGER=DEBUG,console

然后用hdfs dfs -get /path/to/yourFIle就可以详细的看到他是链接到哪台DN导致响应缓慢,然后就可以登陆机器排查改DN的网络是否流量过高,机器负载等相关信息

解决方案

把该DN临时停止,读取数据时连接到其他副本的DN

[Linux] 简单了解Linux的Page Cache

在Linux下,Page Cache可以加速访问磁盘上的文件,这是因为,当它第一次读或写数据到类似硬盘一样的存储介质时,Linux会存储这些数据到闲置的内存区域,它充当于一个缓存的角色,如果以后读取数据,他就可以迅速的在缓存中读取,这篇文章将会简单的让你了解有关Page Cache的一些信息。

内存使用

在Linux下,有多少内存被Page Cache使用,可以用free -m查看Cached那一列,例如:

➜  free -m
             total       used       free     shared    buffers     cached
Mem:          7684       7437        246        230        322        734
-/+ buffers/cache:       6380       1303
Swap:          255        255          0

数据写入

如果数据被写入,他首先会写到Page Cache,并把其作为一个脏页(dirty page)进行管理,“dirty”意味着数据存储在Page Cache中,但需要先写入到底层的存储设备,这些脏页的内容会被周期性的转移(系统调用sync或fsync)到底层的存储设备上。

下面的例子将会展示创建10M的文件,它首先将会写入到Page Cache中,相应的脏页将会增加,直到他把数据写到磁盘上,在这种情况下我们将会使用sync命令

➜  ~ cat /proc/meminfo | grep Dirty
Dirty:               160 kB
➜  ~ dd if=/dev/zero of=testfile.txt bs=1M count=10
记录了10+0 的读入
记录了10+0 的写出
10485760字节(10 MB)已复制,0.0157552 秒,666 MB/秒
➜  ~ cat /proc/meminfo | grep Dirty                
Dirty:             10480 kB
➜  ~ sync                                          
➜  ~ cat /proc/meminfo | grep Dirty
Dirty:               124 kB

数据读取

文件的写入操作不仅仅只有写入,也有读的操作,例如,当你读取同一个40M的文件两次,第二次读取将会变快,因为文件的block直接来自Page Cache的内存而不会去读取磁盘

➜  dumpdir free -m
             total       used       free     shared    buffers     cached
Mem:          7684       7491        192        243        328        667
-/+ buffers/cache:       6495       1188
Swap:          255        255          0
➜  dumpdir vim dump.dat
➜  dumpdir free -m
             total       used       free     shared    buffers     cached
Mem:          7684       7515        168        243        271        691
-/+ buffers/cache:       6552       1132
Swap:          255        255          0

如果Linux的application使用的可用内存不能够满足,长时间未被使用的Page Cache将会被删除

[YARN] Reduce一直被kill分析

现象描述:

之前遇到有些任务在队列满时,reduce一直被kill,任务运行了很久,导致进入了死循环的状态,以下是原因分析

我们先看任务:

2016-05-31 19:29:46,382 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from NEW to UNASSIGNED

到分配

2016-05-31 19:39:50,999 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigned container container_e08_1464351197737_140388_01_001657 to attempt_1464351197737_140388_m_000836_0
2016-05-31 19:39:51,002 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from UNASSIGNED to ASSIGNED

启动:

2016-05-31 19:39:51,002 INFO [ContainerLauncher #216] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Processing the event EventType: CONTAINER_REMOTE_LAUNCH for container container_e08_1464351197737_140388_01_001657 taskAttempt attempt_1464351197737_140388_m_000836_0

开始跑:

2016-05-31 19:39:51,014 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from ASSIGNED to RUNNING

完成清理相关信息,此时container已经退出,只是AM保存这个Task在这个机器的状态是success的:

2016-05-31 19:40:00,511 INFO [IPC Server handler 20 on 34613] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Done acknowledgement from attempt_1464351197737_140388_m_000836_0
2016-05-31 19:40:00,512 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from RUNNING to SUCCESS_CONTAINER_CLEANUP
2016-05-31 19:40:00,518 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from SUCCESS_CONTAINER_CLEANUP to SUCCEED

因为该节点是不可用的状态,打印了日志,原理是,如果该节点不可用则把该节点的TaskAttemp给kill了

2016-05-31 21:09:35,528 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl: TaskAttempt killed because it ran on unusable node BJM6-Decare-14057.hadoop.jd.local:50086. AttemptId:attempt_1464351197737_140388_m_000836_0

但是该Task是SUCCESS状态:

2016-05-31 21:09:35,529 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from SUCCEEDED to KILLED

总的来说这个task完成了他自身的所有流程,程序也跑完了,就是因为NM汇报它自身为不健康节点导致AM把这个节点的所有Task给KILL了所以有SUCCEEDED to KILLED

每个Task完成都会触发AttemptSucceededTransition,会发送事件到AM中,触发TaskCompletedTransition,然后job.completedTaskCount++就进行相加了,也就是说每一个task完成都会被相加,直到所有的map跑完才执行reduce

private static class TaskCompletedTransition implements
    MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {

  @Override
  public JobStateInternal transition(JobImpl job, JobEvent event) {
    job.completedTaskCount++;
    LOG.info("Num completed Tasks: " + job.completedTaskCount);
    JobTaskEvent taskEvent = (JobTaskEvent) event;
    Task task = job.tasks.get(taskEvent.getTaskID());
    if (taskEvent.getState() == TaskState.SUCCEEDED) {
      taskSucceeded(job, task);
    } else if (taskEvent.getState() == TaskState.FAILED) {
      taskFailed(job, task);
    } else if (taskEvent.getState() == TaskState.KILLED) {
      taskKilled(job, task);
    }

    return checkJobAfterTaskCompletion(job);
  }

所有有了以下日志:

2016-05-31 19:45:09,275 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Before Scheduling: PendingReds:100 ScheduledMaps:0 ScheduledReds:0 AssignedMaps:2 AssignedReds:0 CompletedMaps:1885 CompletedReds:0 ContAlloc:1899 ContRel:9 HostLocal:1572 RackLocal:291
2016-05-31 19:45:09,279 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Recalculating schedule, headroom=&lt;memory:-4096, vCores:763&gt;
2016-05-31 19:45:09,279 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Reduce slow start threshold reached. Scheduling reduces.

因为在Reduce跑着的过程中,发现map所在的机器的节点出现了不健康,所以AM把Task的状态SUCCESS转换为KILLED事件,进行Task的重新调度:

private static class MapTaskRescheduledTransition implements
    SingleArcTransition<JobImpl, JobEvent> {
  @Override
  public void transition(JobImpl job, JobEvent event) {
    //succeeded map task is restarted back
    job.completedTaskCount--;
    job.succeededMapTaskCount--;
  }
}

对应的数值开始减1,然后在心跳线程中重新申请资源preemptReducesIfNeeded:

if (recalculateReduceSchedule) {
  preemptReducesIfNeeded();
  scheduleReduces(
      getJob().getTotalMaps(), completedMaps,
      scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
      assignedRequests.maps.size(), assignedRequests.reduces.size(),
      mapResourceRequest, reduceResourceRequest,
      pendingReduces.size(), 
      maxReduceRampupLimit, reduceSlowStart);
  recalculateReduceSchedule = false;
}

如果map的数量大于0,集群资源不满足则kill reduce

@Private
@VisibleForTesting
void preemptReducesIfNeeded() {
  if (reduceResourceRequest.equals(Resources.none())) {
    return; // no reduces
  }
  //check if reduces have taken over the whole cluster and there are 
  //unassigned maps
  if (scheduledRequests.maps.size() > 0) {
    Resource resourceLimit = getResourceLimit();
    Resource availableResourceForMap =

日志都打印出来没什么好说的了:

2016-05-31 21:12:35,401 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Going to preempt 1 due to lack of space for maps

然后抢到资源:

2016-05-31 21:12:37,408 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigned container container_e08_1464351197737_140388_01_002139 to attempt_1464351197737_140388_m_000836_1

开始跑了

总结:

设置slow start阀值为1,map跑完后就开始跑Reduce了,但是过程中有些节点变成了不可用,所以相应的task就会从SUCCESS状态变为KILLED状态进行重新调度,因为所有的Reduce都开始跑队列资源沾满的情况下,开始尝试kill自己的Reduce看能不能抢到资源,抢不到继续KILL,直到抢到运行完map为止,Reduce才能继续执行。

[Linux] abrt-hook-ccpp: Saved core dump of pid, 无法找到ccpp的dump文件

现象描述:

DN打日志正常,但进程却挂掉了,查看/var/log/messages日志中,当时出现abrt-hook-ccpp的日志:

 Jun 21 07:05:06 XXX-XXX-8152 abrt-hook-ccpp: Saved core dump of pid 10066 (/software/servers/jdk1.7.0_67/bin/java) to /var/tmp/abrt/ccpp-2016-06-21-07:04:59-10066 (912011264 bytes)
Jun 21 07:05:07 XXX-XXX-8152 abrt-server: Executable '/software/servers/jdk1.7.0_67/bin/java' doesn't belong to any package and ProcessUnpackaged is set to 'no'
Jun 21 07:05:07 XXX-XXX-8152 abrt-server: 'post-create' on '/var/tmp/abrt/ccpp-2016-06-21-07:04:59-10066' exited with 1
Jun 21 07:05:07 XXX-XXX-8152 abrt-server: Deleting problem directory '/var/tmp/abrt/ccpp-2016-06-21-07:04:59-10066'

在日志中可以发现无法创建ccpp文件,需要设置

How to enable handling of unpackaged software
Edit /etc/abrt/abrt-action-save-package-data.conf and change ProcessUnpackaged = no to ProcessUnpackaged = yes

# sed -i 's/ProcessUnpackaged = no/ProcessUnpackaged = yes/' /etc/abrt/abrt-action-save-package-data.conf

这时,你可能还需要设置coredumpsize unlimited才能看到超过1000M的文件

参考文章:

CentOS讨论

abrt常见问题

abrt 產生的 core dump 不見了?

[HADOOP问题] DataNode日志中出现Input/output error

场景是这样的,有些用户因为某些原因将几个目录的副本从3降为1(一般情况下是不建议这么做的),后来出现丢块后,在把副本升为2了,后来还是出现文件无法读取这种情况:

****/user/username/fileName-000xx.lzo                                                      
0. BP-1422437282658:blk_1344672165_270983563 len=134217728 repl=2 [host1:50010, host3:50010]
1. BP-1422437282658:blk_1344672816_270984214 len=134217728 repl=1 [host11:50010]
2. BP-1422437282658:blk_1344672898_270984296 len=134217728 repl=2 [host19:50010, host16:50010]

对于NameNode来说,他是感知这个块是正常的,从datanode的日志看出,该datanode一直进行块复制操作,但每次想把这个块复制出去时,却出现了以下异常:

java.io.IOException: Input/output error
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
at org.apache.hadoop.net.SocketOutputStream.transferToFully(SocketOutputStream.java:223)

然后我们通过locate这个块所在的目录,进行了scp操作,还是出现了Input/output error,意味着这个块是无法完整读取的,所以我们进行了fsck了那个磁盘进行修复,因为磁盘问题导致无法读取,所以对这个并无太大影响,毕竟副本为1是风险很大的,所以那个文件是恢复不了了。

另外一个场景是集群有些datanode因为报block pool BP-1337805335-XXX-1422437282658 is not found,造成在联邦模式下有些NS显示为live节点,有些NS为dead节点,但是block pool在那个data盘是存在的

2016-01-15 21:30:48,225 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in offerService
java.io.IOException: block pool BP-1337805335-XXX-1422437282658 is not found
at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl.getBlockPoolSlice(FsVolumeImpl.java:122)
at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl.getBlockPoolUsed(FsVolumeImpl.java:92)
at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList.getBlockPoolUsed(FsVolumeList.java:74)

这些节点的数据量比较大,因为死了比较久,而且副本为3的情况下,把一个DataNode停止是不会影响集群的,后来进行了fsck的操作,发现一个盘有问题,并修复了,后来启动datanode,节点也再也没有死了,fsck这个命令请在专业的人指导下面操作把,毕竟数据安全为最重要。

[HADOOP] Unknown error encountered while tailing edits. Shutting down standby NN.

前段时间,standby的NN挂掉了,并且怎么起也起不来,如下日志:

2016-01-03 14:04:19,293 FATAL org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer: Unknown error encountered while tailing edits. Shutting down standby NN.
java.io.IOException: Failed to apply edit log operation ReassignLeaseOp [leaseHolder=DFSClient_NONMAPREDUCE_854707399_1, path=/tmp/jrdw/kafka2hdfs/log_mobile_gateway-21-1443245603647--6536501137915724876, newHolde
r=HDFS_NameNode, opCode=OP_REASSIGN_LEASE, txid=20790808505]: error File is not under construction: /tmp/jrdw/kafka2hdfs/log_mobile_gateway-21-1443245603647--6536501137915724876
       at org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext.editLogLoaderPrompt(MetaRecoveryContext.java:94)
       at org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.loadEditRecords(FSEditLogLoader.java:205)
       at org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.loadFSEdits(FSEditLogLoader.java:112)
       at org.apache.hadoop.hdfs.server.namenode.FSImage.loadEdits(FSImage.java:771)
       at org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer.doTailEdits(EditLogTailer.java:227)
       at org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer$EditLogTailerThread.doWork(EditLogTailer.java:321)
       at org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer$EditLogTailerThread.access$200(EditLogTailer.java:279)
       at org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer$EditLogTailerThread$1.run(EditLogTailer.java:296)
       at org.apache.hadoop.security.SecurityUtil.doAsLoginUserOrFatal(SecurityUtil.java:456)
       at org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer$EditLogTailerThread.run(EditLogTailer.java:292)

刚开始怀疑是不是editlog的下载有问题,后来发现editlog是可以解析出来的,但是不排除editlog的顺序存在bug

我们采取的方案是:

因为每次启动都需要加载editlog,所以为了跳过此操作,我们将Active进入安全模式,并进行saveNameSpace将dump出来的fsimage和txid拷贝到Standby NN上,重启就可以避免加载editlog了。

如果两个NN都挂掉了,那就要做相对麻烦的操作了,而且相对对数据不安全

以下是参考案例:

Namenode异常停止后无法正常启动

[HADOOP] 简单了解NameNode的ZKFC机制

之前在准备中级课程PPT,整理了下HA的基本内容,并且感谢松哥为我们提供了HA不会切的问题,以至于之后刚好出现的NameNode宕机,能够快速解决。

NameNode的HA可以个人认为简单分为共享editLog机制和ZKFC对NameNode状态的控制

在此之前,我先提几个问题:

  • 一般导致NameNode切换的原因
  • ZKFC的作用是什么?如何判断一个NN是否健康
  • NameNode HA是如何实现的?
  • NameNode因为断电导致不能切换的原理,怎样进行恢复

一般导致NameNode切换的原因

随着集群规模的变大和任务量变多,NameNode的压力会越来越大,一些默认参数已经不能满足集群的日常需求,除此之外,异常的Job在短时间内创建和删除大量文件,引起NN节点频繁更新内存的数据结构从而导致RPC的处理时间变长,CallQueue里面的RpcCall堆积,甚至严重的情况下打满CallQueue,导致NameNode响应变慢,甚至无响应,ZKFC的HealthMonitor监控自己的NN异常时,则会断开与ZooKeeper的链接,从而释放锁,另外一个NN上的ZKFC进行抢锁进行Standby到Active状态的切换。这是一般引起的切换的流程。

当然,如果你是手动去切换这也是可以的,当Active主机出现异常时,有时候则需要在必要的时间内进行切换。

ZKFC的作用是什么?如何判断一个NN是否健康

在正常的情况下,ZKFC的HealthMonitor主要是监控NameNode主机上的磁盘还是否可用(空间),我们都知道,NameNode负责维护集群上的元数据信息,当磁盘不可用的时候,NN就该进行切换了。

 /**
   * Return true if disk space is available on at least one of the configured
   * redundant volumes, and all of the configured required volumes.
   * 
   * @return True if the configured amount of disk space is available on at
   *         least one redundant volume and all of the required volumes, false
   *         otherwise.
   */
  public boolean hasAvailableDiskSpace() {
    return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
        minimumRedundantVolumes);
  }

除了可用状态(SERVICE_HEALTHY)之外,还有SERVICE_UNHEALTHY(磁盘空间不可用),SERVICE_NOT_RESPONDING(其他的一些情况)状态,在这两个状态中,它都认为NN是不健康的。

NameNode HA是如何实现的?

我们前面说到,ZKFC是如何判断NN是否健康,接下来当NN处于非健康状态时,NameNode是如何进行切换的呢?

zkfc

在ZKFailoverController这个类中,实行了两个重要的Callbacks函数,一个叫ElectorCallbacks,另一个叫HealthCallbacks,顾名思义就是选举和健康检查用的回调函数,其中还有两个重要的组成部分elector(ActiveStandbyElector)healthMonitor(HealthMonitor),总体的就如上图所示。

ElectorCallbacks:

/**
   * Callbacks from elector
   */
  class ElectorCallbacks implements ActiveStandbyElectorCallback {
    @Override
    public void becomeActive() throws ServiceFailedException {
      ZKFailoverController.this.becomeActive();
    }

    @Override
    public void becomeStandby() {
      ZKFailoverController.this.becomeStandby();
    }
...
}

HealthCallbacks:

 /**
   * Callbacks from HealthMonitor
   */
  class HealthCallbacks implements HealthMonitor.Callback {
    @Override
    public void enteredState(HealthMonitor.State newState) {
      setLastHealthState(newState);
      recheckElectability();
    }
  }

对于HealthMonitor来说,在ZKFC进程启动的时候,就已经将HealthCallbacks注册进去了,HealthMonitor都会定期的检查NameNode是否健康,我们可以通过监控ha.health-monitor.check-interval.ms去设置监控的间隔时间和通过参数ha.health-monitor.rpc-timeout.ms设置timeout时间,当集群变大的时候,需要适当的设置改值,让ZKFC的HealthMonitor没那么“敏感”

ZKFC通过RPC调用监控NN进程,当出现异常时,则进入不同的处理逻辑,以下是简化的代码:

 private void doHealthChecks() throws InterruptedException {
    while (shouldRun) {     
      try {
        status = proxy.getServiceStatus();
        proxy.monitorHealth();
        healthy = true;
      } catch (HealthCheckFailedException e) {
       ...
        enterState(State.SERVICE_UNHEALTHY);
      } catch (Throwable t) {
       ...
        enterState(State.SERVICE_NOT_RESPONDING);
        Thread.sleep(sleepAfterDisconnectMillis);
        return;
      }
      ...
}

回调函数就是这么起作用啦,那么回调函数做了什么呢?总的来说,如果NN健康(SERVICE_HEALTHY)就加入选举,如果不健康就退出选举(SERVICE_UNHEALTHYSERVICE_NOT_RESPONDING

 case SERVICE_UNHEALTHY:
        case SERVICE_NOT_RESPONDING:
          LOG.info("Quitting master election for " + localTarget +
              " and marking that fencing is necessary");
          elector.quitElection(true);
          break;

说到退出选举就关系到elector(ActiveStandbyElector)了,true代表如果NN从Actice变为Standby出现异常是要去fence的,这就是为啥NN会挂掉的原因之一

如何退出选举?就是close zkClient的链接,让ZooKeeper上面的维持的选举锁消失

void terminateConnection() {
    if (zkClient == null) {
      return;
    }
    LOG.debug("Terminating ZK connection for " + this);
    ZooKeeper tempZk = zkClient;
    ...
    try {
      tempZk.close();
    } catch(InterruptedException e) {
      LOG.warn(e);
    }
   ...
  }

对于ActiveStandbyElector来说,他有个WatcherWithClientRef类专门用来监听ZooKeeper上的的znode的事件变化,当事件变化时,就会调用ActiveStandbyElector的processWatchEvent的方法

watcher = new WatcherWithClientRef();
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);

/**
   * Watcher implementation which keeps a reference around to the
   * original ZK connection, and passes it back along with any
   * events.
   */
  private final class WatcherWithClientRef implements Watcher {
...
    @Override
        public void process(WatchedEvent event) {
          hasReceivedEvent.countDown();
          try {
            hasSetZooKeeper.await(zkSessionTimeout, TimeUnit.MILLISECONDS);
            ActiveStandbyElector.this.processWatchEvent(
                zk, event);
          } catch (Throwable t) {
            fatalError(
                "Failed to process watcher event " + event + ": " +
                StringUtils.stringifyException(t));
          }
        }
...
}

在ActiveStandbyElector的processWatchEvent方法中,处理来自不同事件的逻辑重新加入选举或者继续监控znode的变化,当另外一个ZKFC监控到事件变化得时候,就去抢锁,抢锁实质上就是创建znode的过程,而且创建的是CreateMode.EPHEMERAL类型的,所以,当HealthMonitor监控到NN不健康时,就会断开连接,节点就会消失,watcher就会监控到NodeDeleted事件,进行创建节点。

 switch (eventType) {
      case NodeDeleted:
        if (state == State.ACTIVE) {
          enterNeutralMode();
        }
        joinElectionInternal();
        break;
      case NodeDataChanged:
        monitorActiveStatus();
        break;

又因为ActiveStandbyElector实现了StatCallback接口,当节点创建成功时,就会回调processResult方法看是否创建成功,如果创建成功则去检查zkBreadCrumbPath是否存在之前的Active节点,如果存在,则调用RPC让其变为Standby,看能否转变成功,否则则SSH过去fence掉NN进程。,保持Active节点只有一个,并且恢复正常服务

NameNode因为断电导致不能切换的原理,怎样进行恢复

ActiveNN断电,网络异常,负载过高或者机器出现异常无法连接,Standby NN无法转化为Active,使得HA集群无法对外服务,原因是Active NN节点在断电和不能服务的情况下,zknode上保存着ActiveBreadCrumb, ActiveStandbyElectorLock两个Active NN的信息,ActiveStandbyElectorLock由于Active NN出现异常断开,Standby NN去抢锁的时候就会去检查ActiveBreadCrumb是否有上一次的Active NN节点,如果有,就会就会尝试让Active NN变为Standby NN,自己转化为Active NN,但是由于调用出现异常,所以会采用ssh的方式去Fence之前的Active NN,因为机器始终连接不上,所以无法确保old active NN变为Standby NN,自己也无法变为Active NN,所以还是保持Standby状态,避免出现脑裂问题。

解决方案是确定Active关机的情况下重新hdfs zkfc -formatZK就可以了。

总 结

NN GC或者在压力大的情况下可以调整GC算法和增加NameNode节点的线程数,加快NN对请求的处理速度,也可以分离节点的端口dfs.namenode.rpc-address.ns1.nn2dfs.namenode.servicerpc-address.ns1.nn2分离client和datanode节点等服务类型的请求,进行分担压力,也可以适当的调整ZKFC的监控timeout的时间等等

但是遇到异常的job,只能通过别的方式去处理问题了,祷告吧!哈哈

[YARN] 简单了解FairShare是如何进行计算的?

这个我一直处于看了忘,忘了看的过程,所以决定把总结写下。

在此之前,先提两个问题

  • 队列配置的Max Resources加起来超过集群资源的总量时,会有什么影响?
  • 队列使用的资源量不会超过我们设置的Max Resources?

我使用的版本是Apache hadoop 2.2.0,所以下都是基于该版本进行分析的。

在YARN中,我们都是通过队列的方式进行资源管理的,队列中可以建立子队列,队列可以通过配置不同的属性,包括提交权限,资源大小,权重等,对于FS来说,每个队列都被抽象成一个Schedulable,其计算过程就是给定一系列的Schedulable和一系列的slot,计算出他们的加权fairshare(weighted fair share)。

资源公平性计算

如果我们不设置minumum share和maximum share,加权fairshare的R = soltAssigned / weight(默认1),可以看出每个Schedulable都可以获得相同的资源量

在实际中,有些Schedulable可能会出现minShare高于他们设置的maxShare或者maxShare低于他们分配的share值,为了处理这些类似的问题,资源分配需要满足:

  • 如果S.minShare > R*S.weight,则分配S.minShare
  • 如果S.maxShare < R*S.weirht,则分配S.maxShare
  • 其他则分配R*S.weight

主要计算实现:ComputeFairShares

基本原理:

  • 0.首先计算每个schedulable的maxShare的值的和totalResource比较,取最小值
  • 1、rMax=1
  • 2、计算当前的使用量U
  • 3、如果U < TotalResource,则rMax = rMax*2,跳转2
  • 定义left=0,right=rMax
  • 4、mid =(left + right)/ 2
  • 5、计算当前的使用量U
  • 6、如果U<TotalResource,left = mid
  • 7、否则right = mid
  • 8、当前迭代次数<规定迭代次数,跳转4,否则退出
  • 9、通过right值计算每个schedulable的FairShare

如何计算当前资源的使用量U,通过尝试w2rRatio的值,计算出每个schedulable资源的使用量之和,使之接近于资源总量

 private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, Collection<? extends Schedulable> schedulables) {
        int resourceTaken = 0;
        for (Schedulable sched : schedulables) {
            int share = computeShare(sched, w2rRatio);
            resourceTaken += share;
        }
        return resourceTaken;
    }

上述所说的主要方法

private static int computeShare(Schedulable schedulable, double w2rRatio) {
    double share = schedulable.getWeight() * w2rRatio;
    share = Math.max(share, schedulable.getMinShare());
    share = Math.min(share, schedulable.getMaxShare());
    return (int) share;
}

说完如何计算fairshare的基本流程,下面来讨论FairSharePolicy是如何对slot进行分配的

对于FairScheduler的FairSharePolicy机制,它采用的是DefaultResourceCalculator,即只对内存做资源计算,规则为通过比较两个Schedulable的加权fairShare(weighted fair sharing),那些低于minShare的Schedulable将会比那些高于minShare的拥有更高的优先级获取资源。

如果Schedulable都低于他们的minShare,则通过他们低于minShare的比例的多少来比较,例如,如果一个job A拥有8个task(minShare 10即比例为80%),job B拥有50个task(minShare 100即比例为50%),则job B将会有更高的优先级在下次资源分配时获取资源。

如果Schedulable在他们的minShare之上,则通过比较他们的(runningTasks/weight),如果所有的权重都相等,则slot资源需求少的job优先获得资源,另外,如果一个job拥有更高的权重则拥有机会获得多slot。

另外还有一张图:

fair

或许可以参考这里In fair scheduler, Queue should not been assigned more containers when its usedResource had reach the maxResource limit

总结

这篇文档,断断续续写了很久,温故知新,能有让自己能有对FS调度有个较为全面的认识。