[YARN] Hadoop2.7.1 Linux Container和CGroup配置笔记

在新版本的hadoop中,配置linux container已经不需要在每台机器上新建用户了,让使用变得更加灵活,之前我用的是2.2版本,详细的看官网,2.7.1的配置在这里,具体配置就不多说了

secure container

对于配置的权限,还是会有要求,container-executor.cfg文件需要root权限,和其父目录等向上拥有者为root,所以,可以把改文件放置到/etc/目录下满足其要求,同时方便配置同步管理,然而,container-executor的默认读取路径是$HADOOP_HOME/etc/hadoop/目录下,所以我们需要传入指定路径去重新编译container-executor,该文件权限可以设置为6050。

mvn package -Pdist,native -DskipTests -Dtar -Dcontainer-executor.conf.dir=/etc

编译完成后,就可以检查是否成功了

strings container-executor | grep etc

关于Cgroup

要开启该功能,在新机器中要确保安装开启和内核支持该功能

在yarn配置了CGroup的模式下,它是通过cpu.cfs_period_us,cpu.cfs_quota_us,cpu.shares来对container进行CPU的资源控制。

举个例子:

假设我们设置yarn.nodemanager.resource.percentage-physical-cpu-limit参数为80和yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage参数为true(假设测试虚拟机的核数为4),在该模式下所有container能使用的核数为

yarnProcessors = nodeCpuPercentage * numProcessors = 0.8 * 4 = 3.2

CPU shares为

cpuShares = CPU_DEFAULT_WEIGHT * containerVCores = 1024 * 1

其中CPU_DEFAULT_WEIGHT=1024代表一个CPU时间

即containers能使用的核数量为3.2个,然后通过公式:

containerCPU = (containerVCores * yarnProcessors) / nodeVCores

注: nodeVCore为配置NM的核数为4,containerVCores为应用为每个container申请的核数

即按照上面的公式得到containerCPU为1*3.2除以4=0.8,然后根据quotaUS = periodUS * containerCPU,即1000*1000*0.8得到800000,即cpu.cfs_quota_us的值,即该进程使用的CPU不会超过80%,当应用申请的container核数为2时,得到shares为2048,意味着其将会获得比1024多2倍的CPU时间,由此类推。

参考资料

用 cgroups 管理 cpu 资源

[YARN] Yarn下Mapreduce的内存参数理解

这篇文章算是给自己重新缕清MR下内存参数的含义

Container是什么?

Container就是一个yarn的java进程,在Mapreduce中的AM,MapTask,ReduceTask都作为Container在Yarn的框架上执行,你可以在RM的网页上看到Container的状态

基础

Yarn的ResourceManger(简称RM)通过逻辑上的队列分配内存,CPU等资源给application,默认情况下RM允许最大AM申请Container资源为8192MB(“yarn.scheduler.maximum-allocation-mb“),默认情况下的最小分配资源为1024M(“yarn.scheduler.minimum-allocation-mb“),AM只能以增量(”yarn.scheduler.minimum-allocation-mb“)和不会超过(“yarn.scheduler.maximum-allocation-mb“)的值去向RM申请资源,AM负责将(“mapreduce.map.memory.mb“)和(“mapreduce.reduce.memory.mb“)的值规整到能被(“yarn.scheduler.minimum-allocation-mb“)整除,RM会拒绝申请内存超过8192MB和不能被1024MB整除的资源请求。

相关参数

YARN

  • yarn.scheduler.minimum-allocation-mb
  • yarn.scheduler.maximum-allocation-mb
  • yarn.nodemanager.vmem-pmem-ratio
  • yarn.nodemanager.resource.memory.mb

MapReduce

Map Memory

  • mapreduce.map.java.opts
  • mapreduce.map.memory.mb

Reduce Memory

  • mapreduce.reduce.java.opts
  • mapreduce.reduce.memory.mb

Copy_of_Yarn_mem_params

从上面的图可以看出map,reduce,AM container的JVM,“JVM”矩形代表服务进程,“Max heap”,“Max virtual”矩形代表NodeManager对JVM进程的最大内存和虚拟内存的限制。

以map container内存分配(“mapreduce.map.memory.mb“)设置为1536为例,AM将会为container向RM请求2048mb的内存资源,因为最小分配单位(“yarn.scheduler.minimum-allocation-mb“)被设置为1024,这是一种逻辑上的分配,这个值被NodeManager用来监控改进程内存资源的使用率,如果map Task堆的使用率超过了2048MB,NM将会把这个task给杀掉,JVM进程堆的大小被设置为1024(“mapreduce.map.java.opts=-Xmx1024m“)适合在逻辑分配为2048MB中,同样reduce container(“mapreduce.reduce.memory.mb“)设置为3072也是.

当一个mapreduce job完成时,你将会看到一系列的计数器被打印出来,下面的三个计数器展示了多少物理内存和虚拟内存被分配

Physical memory (bytes) snapshot=21850116096
Virtual memory (bytes) snapshot=40047247360
Total committed heap usage (bytes)=22630105088

虚拟内存

默认的(“yarn.nodemanager.vmem-pmem-ratio“)设置为2.1,意味则map container或者reduce container分配的虚拟内存超过2.1倍的(“mapreduce.reduce.memory.mb“)或(“mapreduce.map.memory.mb“)就会被NM给KILL掉,如果 (“mapreduce.map.memory.mb”) 被设置为1536那么总的虚拟内存为2.1*1536=3225.6MB

当container的内存超出要求的,log将会打印一下信息

Current usage: 2.1gb of 2.0gb physical memory used; 1.6gb of 3.15gb virtual memory used. Killing container.

mapreduce.map.java.opts和mapreduce.map.memory.mb

大概了解完以上的参数之后,mapreduce.map.java.opts和mapreduce.map.memory.mb参数之间,有什么联系呢?

通过上面的分析,我们知道如果一个yarn的container超除了heap设置的大小,这个task将会失败,我们可以根据哪种类型的container失败去相应增大mapreduce.{map|reduce}.memory.mb去解决问题。 但同时带来的问题是集群并行跑的container的数量少了,所以适当的调整内存参数对集群的利用率的提升尤为重要。

因为在yarn container这种模式下,JVM进程跑在container中,mapreduce.{map|reduce}.java.opts能够通过Xmx设置JVM最大的heap的使用,一般设置为0.75倍的memory.mb,因为需要为java code,非JVM内存使用等预留些空间

补充一下

对于FairScheduler来说(其他我也没看),存在着一个增量参数

  /** Increment request grant-able by the RM scheduler. 
   * These properties are looked up in the yarn-site.xml  */
  public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB =
    YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-mb";
  public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB = 1024;

对于线上2560MB最小分配内存,客户端的内存为2048,incrementMemory为1024,通过其计算算法得出值,demo如下

/**
 * Created by shangwen on 15-9-14.
 */
public class TestCeil {
    public static void main(String[] args) {
        int clientMemoryReq = 2048;
        int minAllowMermory = 2560;
        int incrementResource = 1024;
        System.out.println(roundUp(Math.max(clientMemoryReq,minAllowMermory),incrementResource));
        // output 3072
    }

    public static int divideAndCeil(int a, int b) {
        if (b == 0) {
            return 0;
        }
        return (a + (b - 1)) / b;
    }

    public static int roundUp(int a, int b) {
        System.out.println("divideAndCeil:" + divideAndCeil(a, b));
        return divideAndCeil(a, b) * b;
    }
}

得出的结果为3072MB,即对于map来说,则会分配3G内存,即使你在客户端写的是2G,所以你可以看到以下日志:

Container [pid=35691,containerID=container_1441194300243_383809_01_000181] is running beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory used; 5.4 GB of 9.3 GB virtual memory used.

对于56G内存的NM来说,如果全部跑map则56/3大约跑18个container

假设修改最小分配为默认的1024,则分配的内存为2G,即大约可以跑56/2约28个container。

通过上述的描述,大概就对其参数有个比较综合的了解了。

参考资料

Mapreduce YARN Memory Parameters

[Linux] 使用noatime属性优化文件系统读取性能

当文件被创建,修改和访问时,Linux系统会记录这些时间信息,当访问足够频繁将会是很大的开销,因为每次访问都会记录时间,所以 我们今天使用bonnie++来简单测试我们修改noatime给我们带来的性能提升有多少,我们先下载最新版本的bonnie++

# tar xf bonnie++-1.97.tgz
# cd bonnie++-1.97.1
# make

编译好之后就可以使用了

注:测试数据最好为内存的2倍

所以在没修改noatime之前,我们先测试文件系统的性能

./bonnie++ -s 31896 -d /export/ -u root -q >> file.csv

运行结果如下:

Version  1.97       ------Sequential Output------ --Sequential Input- --Random-
Concurrency   1     -Per Chr- --Block-- -Rewrite- -Per Chr- --Block-- --Seeks--
Machine        Size K/sec %CP K/sec %CP K/sec %CP K/sec %CP K/sec %CP  /sec %CP
localhost    31896M   458  99 189663  52 82909  21  2487  98 214994  26 823.4  56
Latency             32591us     566ms     705ms   11924us     252ms     122ms
Version  1.97       ------Sequential Create------ --------Random Create--------
localhost           -Create-- --Read--- -Delete-- -Create-- --Read--- -Delete--
              files  /sec %CP  /sec %CP  /sec %CP  /sec %CP  /sec %CP  /sec %CP
                 16 16300  79 +++++ +++ +++++ +++ 14745  74 +++++ +++ 18007  32
Latency             10929us     478us     521us     493us     134us     374us

接下来我们修改挂载的/export,重新测试一遍

# vim /etc/fstab
UUID=d41182b5-5092-4f2f-88a3-be619feef512 /export                 ext4    defaults,noatime        1 2

设置立即生效

mount -o remount /export

执行命令:

./bonnie++ -s 31896 -d /export/ -u root -q >> file.csv

运行结果为:

Version  1.97       ------Sequential Output------ --Sequential Input- --Random-
Concurrency   1     -Per Chr- --Block-- -Rewrite- -Per Chr- --Block-- --Seeks--
Machine        Size K/sec %CP K/sec %CP K/sec %CP K/sec %CP K/sec %CP  /sec %CP
localhost    31896M   497  99 171760  35 93152  21  2276  97 240294  28 755.6  45
Latency             18716us     661ms     539ms   29368us     263ms   79468us
Version  1.97       ------Sequential Create------ --------Random Create--------
localhost           -Create-- --Read--- -Delete-- -Create-- --Read--- -Delete--
              files  /sec %CP  /sec %CP  /sec %CP  /sec %CP  /sec %CP  /sec %CP
                 16 18605  93 +++++ +++ +++++ +++ 20520  96 +++++ +++ +++++ +++
Latency              1186us     379us    1297us    1288us     127us    1443us

可能这样的结果不直观,我们可以

cat file.csv | ./bon_csv2html > result.html

网页打开为:

noatime_test1

可以看出214MBps提升到了240MBps,虽然这只是一次测试,但是理论上来说还是会有性能上的提升,在整体的集群环境下,还是有益提升集群性能的。

参考资料:

测试工具Bonnie++的使用

[HADOOP 问题] NodeManager OOM挂掉问题解决

在更换JDK1.6_25到JDK1.7_45后,集群出现频繁死掉NM,出现结果为如下:

2015-08-12 16:35:06,662 FATAL org.apache.hadoop.yarn.YarnUncaughtExceptionHandler: Thread Thread[process reaper,10,system] threw an Error. Shutting down now...
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.UNIXProcess$ProcessPipeInputStream.drainInputStream(UNIXProcess.java:267)
at java.lang.UNIXProcess$ProcessPipeInputStream.processExited(UNIXProcess.java:280)
at java.lang.UNIXProcess.processExited(UNIXProcess.java:187)
at java.lang.UNIXProcess$3.run(UNIXProcess.java:175)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

和类似的

2015-08-12 16:37:56,893 FATAL org.apache.hadoop.yarn.YarnUncaughtExceptionHandler: Thread Thread[process reaper,10,system] threw an Error. Shutting down now...
java.lang.OutOfMemoryError: Java heap space
at java.lang.UNIXProcess$ProcessPipeInputStream.drainInputStream(UNIXProcess.java:267)
at java.lang.UNIXProcess$ProcessPipeInputStream.processExited(UNIXProcess.java:280)
at java.lang.UNIXProcess.processExited(UNIXProcess.java:187)
at java.lang.UNIXProcess$3.run(UNIXProcess.java:175)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

在google搜索关键字hadoop UNIXProcess drainInputStream,找到关于JDK7的一些bug,在NM负载高的情况下,出现OOM问题。 详情请看HADOOP-10146

和一些相关解释:

JDK-8027348

JDK-8024521

后来更换JDK1.7_67则没出现OOM的问题

[YARN] MRAppMaster心跳原理

最近集群遇到一个问题,就是集群在跑任务的时候,AM会超时10min而被KILL,但任务重跑则成功,问题是随机的出现的,所以初步怀疑是因为AM心跳汇报出现问题或则RM因为繁忙hang住,AM因为某些机制导致等待10min不汇报心跳,所以我们还是先了解,AM是如何向RM汇报心跳的。

在MRAppMaster中,ContainerAllocatorRouter负责向RM申请资源(发送心跳)

RMAM

RMContainerAllocator其最终父类是RMCommunicator,它实现了RMHeartbeatHandler接口

public interface RMHeartbeatHandler {
  long getLastHeartbeatTime(); // 获取上一次心跳的时间
  void runOnNextHeartbeat(Runnable callback); // 回调注册到callback队列的callback函数
}

每一次心跳回来,都会执行一次注册在heartbeatCallbacks中的回调函数:

allocatorThread = new Thread(new Runnable() {
      @Override
      public void run() {
        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
              ......
              heartbeat();            
              lastHeartbeatTime = context.getClock().getTime();// 记录上一次心跳时间
              executeHeartbeatCallbacks(); // 执行回调函数
              ....
});

RMCommunicator类中:

private void executeHeartbeatCallbacks() {
    Runnable callback = null;
    while ((callback = heartbeatCallbacks.poll()) != null) {
      callback.run();
    }
  }

在RMCommunicator启动时,首先会向RM注册,把自己的host和port告诉RM,然后在启动一条线程(startAllocatorThread)定期的调用RMContainerAllocator中实现的heartbeat方法(向RM申请资源,定期汇报信息,告诉RM自己还活着)。

AM初始化同时也会初始化RMCommunicator:

protected void serviceStart() throws Exception {
  scheduler= createSchedulerProxy(); // 获取RM的代理
  register(); // 注册
  startAllocatorThread(); // 心跳线程
....
}

AM的ContainerAllocatorRouter事件处理流程如下图:

RMALLO

注册流程:

调用RMCommunicator远程调用ApplicationMasterService的registerApplicationMaster方法,设置维护responseId,然后把它加入AMLivelinessMonitor中,并使用map记录时间,用来监控AM是否因为长时间没有心跳而超时,如果AM长时间没有心跳信息更新,RM就会通知NodeManager把AM移除。

心跳线程:

在发送心跳的过程中,即也是获取资源的过程

@Override
  protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
    List<Container> allocatedContainers = getResources();// 重要的方法
    if (allocatedContainers.size() > 0) {
      scheduledRequests.assign(allocatedContainers);
    }
   ......
  }

获取资源的过程:

private List<Container> getResources() throws Exception {
     ...
     response = makeRemoteRequest(); // 和RM进行交互
     ...
     // 优先处理RM发送过来的命令
     if (response.getAMCommand() != null) {
         switch(response.getAMCommand()) {
                case AM_RESYNC:
                case AM_SHUTDOWN:
                     eventHandler.handle(new JobEvent(this.getJob().getID(),
                                     JobEventType.JOB_AM_REBOOT));
                     throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
                             this.getContext().getApplicationID());
                default:
                     ....
      }
     // 等等一系列处理
}
}

构建请求:

protected AllocateResponse makeRemoteRequest() throws IOException {
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
          new ArrayList<ContainerId>(release), blacklistRequest);
    AllocateResponse allocateResponse;
    allocateResponse = scheduler.allocate(allocateRequest); // RPC调用ApplicationMasterService的allocate方法
    .....
}

每一次心跳的调用都会刷新AMLivelinessMonitor的时间,代表AM还活着

而且我们通过代码可以看出,资源请求被封装为一个ask,即一个ResourceRequest的ArrayList的资源列表 例如:

priority:20 host:host9 capability:<memory:2048, vCores:1>
priority:20 host:host2 capability:<memory:2048, vCores:1>
priority:20 host:host10 capability:<memory:2048, vCores:1>
priority:20 host:/rack/rack3203 capability:<memory:2048, vCores:1>
priority:20 host:/rack/rack3202 capability:<memory:2048, vCores:1>
priority:20 host:* capability:<memory:2048, vCores:1>

然而,ask是如何被构造的呢?

RMContainerAllocator中的addMap,addReduce,assign方法中对ask的数据内容进行了修改

addContainerReq --> addResourceRequest --> addResourceRequestToAsk;

通过在代码自己添加日志可以看出,资源会被分为local,rack,和any级别去申请资源

最终变为一个ask list发送到RM上:

 ask Capability:<memory:2048, vCores:1> ResourceName:* NumContainers:384 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3201 NumContainers:227 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3202 NumContainers:231 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3203 NumContainers:152 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3204 NumContainers:158 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:host1 NumContainers:46 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:host5 NumContainers:52 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:host6 NumContainers:38 Priority:20 RelaxLocality:true

类似日志为:

getResources() for application_1438330253091_0004: ask=29 release= 0 newContainers=0 finishedContainers=0 resourcelimit=<memory:0, vCores:0> knownNMs=24

总结:

除了了解心跳之外,还学习了许多Map和Reduce的分配机制,收获良多。

[Linux] ubuntu安装zsh

据说zsh还不错,还在体验中

在ubuntu下需要安装:

sudo apt-get install zsh git

然后进行下载:

wget --no-check-certificate https://github.com/robbyrussell/oh-my-zsh/raw/master/tools/install.sh -O - | sh

稍等片刻你就可以替换你的默认bash为zsh:

chsh -s /bin/zsh

重启就可以看到终端截面:

AsyncDispatcher

提示还不错,正在摸索中…

[YARN]YARN的AsyncDispatcher原理

YARN采用了基于事件驱动的并发模型,该模型能极大的提高应用程序并发性,在RM中,几乎所有的事件都通过AsyncDispatcher进行事件的派发.

其基本架构图如下:

AsyncDispatcher

从基本的架构图可以简单的看出,该模型还需要几个基本的要素,那就是事件(Event),事件类型(EventType)和处理事件对应的处理器(Handler).

在HADOOP中,事件被定义如下:

public interface Event<TYPE extends Enum<TYPE>> {

  TYPE getType();
  long getTimestamp();
  String toString();
}

事件类型(EventType)则是简单的枚举类

主要功能定义事件有哪几种类型:

public enum NodesListManagerEventType {
  NODE_USABLE,
  NODE_UNUSABLE
}

处理事件的接口

主要功能处理相应的事件

public interface EventHandler<T extends Event> {
  void handle(T event);
}

Dispatcher通过不同的事件类型(EventType)找到相应的handler对事件(event)进行处理.

对于AsyncDispatcher来说,它实现了Dispatcher接口:

public interface Dispatcher {
  EventHandler getEventHandler();
  void register(Class<? extends Enum> eventType, EventHandler handler);
}

其中有两个基本的方法,registergetEventHandler

register在AsyncDispatcher使用之前就需要先注册eventType和对应的EventHandler,而getEventHandler方法主要则是把事件(event)放入eventQueue中.

接下来在ResourceManager举个简单的例子:

在RM初始化自身基本服务的时候,会把相应的事件类型(EventType)和事件处理器(EventHandler),先注册在AsyncDispatcher上,以便于派发器在事件(event)到来时做出相应的处理.

RM的部分代码:

// Register event handler for RmNodes
this.rmDispatcher.register(RMNodeEventType.class,
    new NodeEventDispatcher(this.rmContext)); 

其实注册也就是把相应的类型和处理器放到一个HashMap

因为是资源管理方面的服务,所以我们进入ResourceTrackerService类中,找到nodesListManager这个实例,通过代码可以知道nodesListManager是用来管理节点是否可用,并作出相应的处理

// 2. Check if it's a valid (i.e. not excluded) node
    if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
      String message =
          "Disallowed NodeManager nodeId: " + nodeId + " hostname: "
              + rmNode.getNodeAddress();
      LOG.info(message);
      shutDown.setDiagnosticsMessage(message);
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
      return shutDown;
    }

从代码可以看出,如果节点是非法的,则从Dispatcher获取Handler,并构造一个RMNodeEventType.DECOMMISSION类型的事件,这个RMNodeEvent将会被放到eventQueue中

class GenericEventHandler implements EventHandler<Event> {
    public void handle(Event event) {
      try {
        eventQueue.put(event);
      } catch (InterruptedException e) {

      }
    };
  }

最后由dispatch进行通过传入的RMNodeEventType找到相应的NodeEventDispatcher(即EventHandler),并调用handle进行处理.

总结:
通过了解AsyncDispatcher可以提升自己理解Yarn的工作流程,加深对Yarn的设计实现的了解.

[python笔记] Python简单HttpServer

在工作中经常会遇到需要下载服务器的一些小文件,虽然scrt的sz可以满足的基本的功能,但在某些情况下,你可以选择在网页上下载和浏览文件系统上的一些文件,这时你就可以快捷使用:

 python -m SimpleHTTPServer

这样你就可以通过浏览器访问http://yourIp:8000/该目录的内容了。

[译] 理解HDFS恢复过程

HDFS的一个重要的设计需求就是能够确保连续且正确的操作去支持生产环境的部署,其中复杂的地方在于要在存在网络和节点异常的情况下确保写入HDFS文件的正确性。租约恢复、block恢复、pipeline恢复会在什么时候起作用,理解什么时候和为什么这些恢复机制会被调用和他们做了什么,可以帮助用户以及开发人员理解集群出现的异常问题。

背景

在HDFS中,文件被分割为block,文件的访问跟随着multi-reader,single-writer语义,为了满足容错需求,一个块的多个副本存储在不同的datanode中,副本的数量被称为副本因子(replication factor),当一个新的文件block被创建,或者已经存在的文件被打开进行append操作,HDFS的写操作就会创建pipeline到datanode,datanode接受和存储这些备份(副本因子通常决定pipeline中datanode的数量),后续的操作也是通过pipeline(如图)

recover-f1

client的读取操作则是选择拥有块副本的其中一个datanode进行

下面有两个应用的场景突显需要容错的需求。

  • HBase的Region Server(RS)的WAL(Write Ahead Log),它是一个HDFS文件,帮助防止数据丢失,如果一个RS宕机,一个新的将会读取WAL文件重建上一个RS的状态,如果写pipeline在RS死掉之前没有完成操作,在pipeline中的不同datanode可能不会同步,HDFS必须保证所有从WAL读的数据的正确性才能重建正确的RS的状态

  • 当Flume client写到HDFS文件的时候,它必须保证写是连续的,即使一些datanode在pipeline中失败或者停止响应。

租约恢复、block恢复、和pipeline恢复在这种情况下发挥作用。

  • 在client可以写一个HDFS文件之前,它必须维护一个租约(lease),它本质上是一个锁。这确保了single-write语义,client必须要预定的时间间隔内对租约进行renew才能保持写的操作,如果这个租约没有被renew或者拿着这个租约的client死了,租约就会过期,如果这发生了,HDFS将会代表client关闭这个文件并且释放租约,这样其他的client就能够写这个文件了,这个过程被称为租约恢复

  • 当租约恢复发生时,如果文件的最后一个block在写的时候并没有传播到pipeline中的所有DataNode,然后写入不同节点的数据量可能会有所不同,在租约恢复导致文件关闭之前,有必要保证最后一个block的所有replica拥有同样的长度,这个过程被称为block恢复,block恢复只有在租约恢复的过程中被触发,并且文件的最后一个block不在COMPLETE状态的时候租约恢复才能触发block恢复

  • 在写pipeline操作的过程中,一些在pipeline中的datanode可能会出现错误,当它发生时,底层写的操作不能仅是失败,相反的,HDFS将会尝试从这个错误进行恢复去允许pipeline继续和client可以继续去写这个文件,从pipeline错误中的恢复机制被称为pipeline恢复

接下来将会解释更多关于这些的细节

Blocks,Replicas,and Their States

为了区分NameNode上下文中的blocks和DataNode上下文中的blocks,我们把前者称为block,后者称为replica。

一个replica在DataNode上下文中跟随以下状态(org.apache.hadoop.hdfs.server.common.HdfsServerConstants.java):

  • FINALIZED:当一个replica在这个状态,代表写入replica已经完成并且副本中的数据是“frozen”(长度是固定的),除非这个replica被打开进行append操作,所有这个block的finalized状态的replica都有相同的GS(generation stamp),也应该有着相同的数据,一个finalized的replica的GS可能会因为recovery而增加。

  • RBW (Replica Being Written): 这是所有replica在开始写的时候的状态,无论这个文件是被write操作创建还是append操作创建,一个RBW状态的replica总是一个打开的文件的最后一个block,数据在写入到replica的过程中,处于RBW状态的replica中的数据对读的client来说仍然是可见的。

  • RWR (Replica Waiting to be Recovered):如果一个datanode死了或者重启,所有它的RBW状态的replica将会变为RWR状态,一个RWR状态的replica要么变成过期的replica被丢弃,要么参与租约恢复。

  • RUR (Replica Under Recovery):当一个非TEMPORARY状态的replica参与租约恢复时,它将会变为RUR状态。

  • TEMPORARY:一个临时的replica创建目的是为了block的复制(被replication monitor或者集群的balance触发),它和RBW状态的replica类似,除了它的数据对所有读的client是不可见的,如果block复制出现错误,TEMPORARY状态replica将会被删除。

一个block在NameNode可能会跟随下列状态(see enum BlockUCState in org.apache.hadoop.hdfs.server.common.HdfsServerConstants.java):

  • UNDER_CONSTRUCTION: 这个状态是在开始写的时候。一个UNDER_CONSTRUCTION状态的是打开的文件的最后一个block; 他的长度和GS是不可变的,它的数据对reader是可见的, 一个UNDER_CONSTRUCTION的block在NameNode中 会追踪写的pipeline (合法的RBW replicas的位置), 和定位它RWR的replicas的位置.

  • UNDER_RECOVERY: 如果client的租约过期,它对应文件的最后一个block处于 UNDER_CONSTRUCTION状态,当block恢复开始时它将会变成UNDER_RECOVERY状态。

  • COMMITTED: COMMITTED 意味着一个block的数据和GS不在是可变的 (除非它被重新打开进行append操作),还有在DataNode中不足最小副本数则会汇报相同GS和长度的FINALIZED replicas,为了服务读的请求,一个COMMITTED的block必须跟踪RBW replica的位置、GS和FINALIZED replicas的长度。当client向NameNode请求新增一个block到这个文件,或者关闭文件时,一个UNDER_CONSTRUCTION状态的block就会变为COMMITTED状态,如果最后或倒数第二个block在COMMITTED状态,这个文件不能被关闭,client必须进行重试。

  • COMPLETE: 当NameNode看最小副本数和FINALIZED的replicas相匹配,一个COMMITTED block 变为 COMPLETE。 一个文件可以被关闭只有所有他的block状态变为COMPLETE状态。一个block可能会被强制的变为COMPLETE状态,即使他没能达到最小副本的数量, 例如, 当一个client请求新的block,并且上一个block还没变为COMPLETE。

DataNode会存储replica的状态在磁盘上,但NameNode它不会存储block的状态在磁盘上,当NameNode重启,他则会把之前打开的文件的最后一个block的状态变为UNDER_CONSTRUCTION状态,其他block则变为COMPLETE状态。

简单的replica和block状态转换图如下

recover-f2

recover-f3

Generation Stamp

GS对于每个block来说是一个单调递增的8位数,它被维护在NameNode中. block和replica的GS (设计规范: HDFS Append and Truncates) 被用于以下目的:

检测一个block的陈旧replica(s): 也就是说, 当这个replica GS比block GS要老,这是可能发生的, 例如, 一个append操作以某种方法跳过了这个replica. datanode检测过期的replica(s),当datanode死了很长时间且重新加入到这个cluster中.

如果下面的情况发生,则需要新的GS:

  • 一个新的文件被创建
  • client打开一个已经存在的文件进行append或truncate
  • client在写data到DataNode(s)的过程中出现错误,并且请求一个新的GS
  • NameNode初始化文件的租约机制

Lease Recovery and Block Recovery

Lease Manager

NameNode中的lease manager管理着租约,NameNode追踪每个打开文件进行写的client。租约被NameNode上的lease manager管理。NameNode追踪每个打开文件进行写的client。client对其打开的每个文件枚举进行租约更新是没必要的,相反的,可以定期的发送请求到NameNode一次进行更新所有的租约。

每一个NameNode管理着单一的HDFS namespace,且对应的都有一个lease manager管理着所有client的租约,联邦集群虽然有多个namespace,它们都会有自己的lease manager。

lease manager维护着一个soft limit(1 minute)和一个hard limit(1 hour)的过期时间(这些限制当前是不可配置的),并且所有的租约在lease manager中进行维护,他们有相同的soft limit和hard limit。在soft limit过期之前,client拿着这个文件特有的写访问租约,如果soft limit过期并且client并没有重新更新租约或者关闭了这个文件(文件关闭时这个文件的租约将会被释放),其他client就能强制的接管这个租约,如果硬链接过期并且client没有重新更新这个租约,HDFS将会假定client已经退出并且自动关闭,从而恢复这个租约。

事实是这个文件的租约被一个client拿着是不会阻止其他client读这个文件的,这个文件可能当前会有很多client正在在读,即使有一个client正在写入。

lease manager支持的操作包括:

为一个client增加一个租约和路径(如果这个client已经有了一个租约,则增加这个路径到这个租约。否则,创建一个新的租约,并添加路径到租约里面) 移除client的租约和路径(如果这是租约的最后一个路径,则移除这个租约) 检查soft / hard limit是否过期,和对给定的client进行renew租约。 lease manager有一个监控线程周期行的检查租约是否超过了hard limit,如果过期,则触发这个文件的租约恢复。

HDFS client会通过org.apache.hadoop.hdfs.LeaseRenewer.LeaseRenewer类renews它自己的租约。

(注意: 一个HDFS client只会关联一个NameNode; 请看它的构造器 org.apache.hadoop.hdfs.DFSClient)。 如果同一个应用想要访问联邦集群的不同NS的不同的文件,需要为每一个NameNode创建client。

Lease Recovery Process

租约的恢复过程被NameNode触发用来对给定的client进行恢复租约,通过监控线程检查到达hard limit的有效期,或者soft limit过期时,其他客户端尝试接管租约,它检查每个相同的client打开的文件,如果这个文件的最后一个block不在COMPLETE状态中,则执行块恢复,并且关闭这个文件,一个文件的block恢复只有在恢复这个文件的租约的时候被触发。

下面是给定文件f 租约恢复算法的过程,当client死了,该过程同样应用于每个client打开文件进行写的操作。

  • 获取包含文件f最后一个block的DataNode
  • 分配一个DataNode作为主要的DataNode p
  • p 在NameNode中维护一个新的GS
  • p 从其他DataNode获取这个block的信息
  • p 计算这个block的最小长度
  • p 更新DataNodes, 拥有合法的GS, 即新的GS和最小的block长度。
  • p 确认NameNode更新的结果.
  • NameNode更新BlockInfo
  • NameNode 移除文件 f 的租约(其他写的操作现在可以维护这个文件f的租约,进行写入操作)
  • NameNode 提交这些改变到edit log.

步骤3到步骤7是block恢复部分的算法,如果一个文件需要block恢复,NameNode将会挑选这个文件的的最后一个block的副本的DataNode作为主datanode,并告诉这个DataNode协调其他DataNode进行block恢复工作,那些DataNode完成后将会汇报给NameNode,NameNode会定期更新这个block的状态,移除它的租约,并提交这些改变到edit log中。

总结

租约恢复,块恢复,和pipeline恢复对HDFS容错至关重要,他们可以确保存在网络故障和节点故障的情况下保证HDFS的持久性和一致性。

参考资料

Understanding HDFS Recovery Processes

[YARN] Label Based Scheduling

目标:

  1. 提供一种可靠的机制在特定的节点运行应用程序。
  2. 为队列提供一种可靠的机制在特定的节点上进行资源的调度
  3. 提供一种机制解决队列和application所定义节点在哪运行

定义

  • Node Label – 描述一个节点的标签,每个节点可以拥有多个标签
  • Label expression – 逻辑上的标签组合(使用 && 或 and,|| 或 or,!或 not)
  • Queue label – Label expression 决定哪些节点属于这个队列
  • Application Label – Label expression 决定应用程序容器在哪些节点运行
  • Queue Label Policy – Label Policy 配置在Queue中,其定义规则解决application label和queue label之间的冲突

使用案例

cluster A使用Fair Scheduler去调度application在他的节点上。 cluster 管理员定义多个队列包括嵌套队列去划分资源给不同的组织,所以每个组织都可以共享资源去使用。 除此之外,集群还在不同方面存在异构:

  • 拥有不同类型的硬件(一些节点拥有大内存,多CPU,有些则网络带宽占优势)
  • 数据在DFS中切分到分配到不同的节点/机架上
  • 其他的不同等等

cluster管理员根据差异用标签标记所有的节点:

  • 大内存使用red标记
  • 高CPU的使用blue标记
  • 高网络带宽的使用green标记
  • 节点在机架A标记为-rackA
  • 节点在机架B标记未-rackB

因为被标记为red的节点可以在rackA和rackB cluster管理员可以分配多个标签给它:

Node Regex Label1 Label2 LabelN
rackA_red red rackA
rackA_blue blue rackA
rackB_red red rackB

如果我们希望所有的应用提交到Queue1即运行在“rackA_red”或“rackA_blue”节点上,cluster管理员可以调整label expression即配置Queue1为(red || blue)&& rackA

另外cluster管理员配置Queue Policy去定义调度器的行为,假设提交的application也拥有自己的label expression

  • Queue Lable Expression优先
  • Application Label Expression优先
  • 取它们的交集
  • 取它们的或集

现在,用户B提交一个app到Queue1,但是这个app属于计算密集型的,他希望运行在高cpu的节点上,所以,这个app的lable expression可以为 “blue && rackA”

幸运的是集群管理员设置Queue Policy为“AND”,所以application将会被调度在“rackA_blue”的节点上运行。 假设集群管理员设置Queue Policy为“OR”,结果将会变为:“((red || blue) && rackA) || (blue && rackA)” ,即”(red || blue) && rackA”

设计

Node Labels, Label Expressions, Policies

Node Labels

最初的JIRA建议每个节点指定他们自己拥有的标签,并提供这些信息给ResourceManager. 我们建议集中式的把节点的标签存储到DFS上,所有的YARN daemons都可以访问,修改也不需要同步到整个集群或者客户端

标签可以被指定为下列的格式:

perfnode.* big, fast, physical, scale.* virtual, “slow”, perfnode30 ‘bad’

Label Expressions, Policies

Label expression 是一种逻辑表达式,它可以包含多个标签 (例如:big || virtual && !bad). Application Label expression 可以定义应用想要运行在哪些节点的集合 Queue Label expression 可以定义该队列“希望”应用运行在哪些节点。

Queue Label Policy

每个队列可以配置label expression和application Label Expression相互作用,通过配置Queue Label Policy可以解决Queue label和application label之间的冲突,决定应用在哪些类型节点上运行。

Label Manager

Label Manager是RM的一个新的服务 这个服务的主要作用为:

  • 从DFS加载节点的标签和维护内部节点和节点标签的map
  • 检查文件的变化,并更新内部节点和标签的映射关系
  • 基于 Application Label Expression, Queue Label Expression和Queue Label Policy结合构造新的逻辑表达式

英文不太好,望多多指正

参考资料: YARN-796