分类 hadoop 下的文章

[译] 理解HDFS恢复过程

HDFS的一个重要的设计需求就是能够确保连续且正确的操作去支持生产环境的部署,其中复杂的地方在于要在存在网络和节点异常的情况下确保写入HDFS文件的正确性。租约恢复、block恢复、pipeline恢复会在什么时候起作用,理解什么时候和为什么这些恢复机制会被调用和他们做了什么,可以帮助用户以及开发人员理解集群出现的异常问题。

背景

在HDFS中,文件被分割为block,文件的访问跟随着multi-reader,single-writer语义,为了满足容错需求,一个块的多个副本存储在不同的datanode中,副本的数量被称为副本因子(replication factor),当一个新的文件block被创建,或者已经存在的文件被打开进行append操作,HDFS的写操作就会创建pipeline到datanode,datanode接受和存储这些备份(副本因子通常决定pipeline中datanode的数量),后续的操作也是通过pipeline(如图)

recover-f1

client的读取操作则是选择拥有块副本的其中一个datanode进行

下面有两个应用的场景突显需要容错的需求。

  • HBase的Region Server(RS)的WAL(Write Ahead Log),它是一个HDFS文件,帮助防止数据丢失,如果一个RS宕机,一个新的将会读取WAL文件重建上一个RS的状态,如果写pipeline在RS死掉之前没有完成操作,在pipeline中的不同datanode可能不会同步,HDFS必须保证所有从WAL读的数据的正确性才能重建正确的RS的状态

  • 当Flume client写到HDFS文件的时候,它必须保证写是连续的,即使一些datanode在pipeline中失败或者停止响应。

租约恢复、block恢复、和pipeline恢复在这种情况下发挥作用。

  • 在client可以写一个HDFS文件之前,它必须维护一个租约(lease),它本质上是一个锁。这确保了single-write语义,client必须要预定的时间间隔内对租约进行renew才能保持写的操作,如果这个租约没有被renew或者拿着这个租约的client死了,租约就会过期,如果这发生了,HDFS将会代表client关闭这个文件并且释放租约,这样其他的client就能够写这个文件了,这个过程被称为租约恢复

  • 当租约恢复发生时,如果文件的最后一个block在写的时候并没有传播到pipeline中的所有DataNode,然后写入不同节点的数据量可能会有所不同,在租约恢复导致文件关闭之前,有必要保证最后一个block的所有replica拥有同样的长度,这个过程被称为block恢复,block恢复只有在租约恢复的过程中被触发,并且文件的最后一个block不在COMPLETE状态的时候租约恢复才能触发block恢复

  • 在写pipeline操作的过程中,一些在pipeline中的datanode可能会出现错误,当它发生时,底层写的操作不能仅是失败,相反的,HDFS将会尝试从这个错误进行恢复去允许pipeline继续和client可以继续去写这个文件,从pipeline错误中的恢复机制被称为pipeline恢复

接下来将会解释更多关于这些的细节

Blocks,Replicas,and Their States

为了区分NameNode上下文中的blocks和DataNode上下文中的blocks,我们把前者称为block,后者称为replica。

一个replica在DataNode上下文中跟随以下状态(org.apache.hadoop.hdfs.server.common.HdfsServerConstants.java):

  • FINALIZED:当一个replica在这个状态,代表写入replica已经完成并且副本中的数据是“frozen”(长度是固定的),除非这个replica被打开进行append操作,所有这个block的finalized状态的replica都有相同的GS(generation stamp),也应该有着相同的数据,一个finalized的replica的GS可能会因为recovery而增加。

  • RBW (Replica Being Written): 这是所有replica在开始写的时候的状态,无论这个文件是被write操作创建还是append操作创建,一个RBW状态的replica总是一个打开的文件的最后一个block,数据在写入到replica的过程中,处于RBW状态的replica中的数据对读的client来说仍然是可见的。

  • RWR (Replica Waiting to be Recovered):如果一个datanode死了或者重启,所有它的RBW状态的replica将会变为RWR状态,一个RWR状态的replica要么变成过期的replica被丢弃,要么参与租约恢复。

  • RUR (Replica Under Recovery):当一个非TEMPORARY状态的replica参与租约恢复时,它将会变为RUR状态。

  • TEMPORARY:一个临时的replica创建目的是为了block的复制(被replication monitor或者集群的balance触发),它和RBW状态的replica类似,除了它的数据对所有读的client是不可见的,如果block复制出现错误,TEMPORARY状态replica将会被删除。

一个block在NameNode可能会跟随下列状态(see enum BlockUCState in org.apache.hadoop.hdfs.server.common.HdfsServerConstants.java):

  • UNDER_CONSTRUCTION: 这个状态是在开始写的时候。一个UNDER_CONSTRUCTION状态的是打开的文件的最后一个block; 他的长度和GS是不可变的,它的数据对reader是可见的, 一个UNDER_CONSTRUCTION的block在NameNode中 会追踪写的pipeline (合法的RBW replicas的位置), 和定位它RWR的replicas的位置.

  • UNDER_RECOVERY: 如果client的租约过期,它对应文件的最后一个block处于 UNDER_CONSTRUCTION状态,当block恢复开始时它将会变成UNDER_RECOVERY状态。

  • COMMITTED: COMMITTED 意味着一个block的数据和GS不在是可变的 (除非它被重新打开进行append操作),还有在DataNode中不足最小副本数则会汇报相同GS和长度的FINALIZED replicas,为了服务读的请求,一个COMMITTED的block必须跟踪RBW replica的位置、GS和FINALIZED replicas的长度。当client向NameNode请求新增一个block到这个文件,或者关闭文件时,一个UNDER_CONSTRUCTION状态的block就会变为COMMITTED状态,如果最后或倒数第二个block在COMMITTED状态,这个文件不能被关闭,client必须进行重试。

  • COMPLETE: 当NameNode看最小副本数和FINALIZED的replicas相匹配,一个COMMITTED block 变为 COMPLETE。 一个文件可以被关闭只有所有他的block状态变为COMPLETE状态。一个block可能会被强制的变为COMPLETE状态,即使他没能达到最小副本的数量, 例如, 当一个client请求新的block,并且上一个block还没变为COMPLETE。

DataNode会存储replica的状态在磁盘上,但NameNode它不会存储block的状态在磁盘上,当NameNode重启,他则会把之前打开的文件的最后一个block的状态变为UNDER_CONSTRUCTION状态,其他block则变为COMPLETE状态。

简单的replica和block状态转换图如下

recover-f2

recover-f3

Generation Stamp

GS对于每个block来说是一个单调递增的8位数,它被维护在NameNode中. block和replica的GS (设计规范: HDFS Append and Truncates) 被用于以下目的:

检测一个block的陈旧replica(s): 也就是说, 当这个replica GS比block GS要老,这是可能发生的, 例如, 一个append操作以某种方法跳过了这个replica. datanode检测过期的replica(s),当datanode死了很长时间且重新加入到这个cluster中.

如果下面的情况发生,则需要新的GS:

  • 一个新的文件被创建
  • client打开一个已经存在的文件进行append或truncate
  • client在写data到DataNode(s)的过程中出现错误,并且请求一个新的GS
  • NameNode初始化文件的租约机制

Lease Recovery and Block Recovery

Lease Manager

NameNode中的lease manager管理着租约,NameNode追踪每个打开文件进行写的client。租约被NameNode上的lease manager管理。NameNode追踪每个打开文件进行写的client。client对其打开的每个文件枚举进行租约更新是没必要的,相反的,可以定期的发送请求到NameNode一次进行更新所有的租约。

每一个NameNode管理着单一的HDFS namespace,且对应的都有一个lease manager管理着所有client的租约,联邦集群虽然有多个namespace,它们都会有自己的lease manager。

lease manager维护着一个soft limit(1 minute)和一个hard limit(1 hour)的过期时间(这些限制当前是不可配置的),并且所有的租约在lease manager中进行维护,他们有相同的soft limit和hard limit。在soft limit过期之前,client拿着这个文件特有的写访问租约,如果soft limit过期并且client并没有重新更新租约或者关闭了这个文件(文件关闭时这个文件的租约将会被释放),其他client就能强制的接管这个租约,如果硬链接过期并且client没有重新更新这个租约,HDFS将会假定client已经退出并且自动关闭,从而恢复这个租约。

事实是这个文件的租约被一个client拿着是不会阻止其他client读这个文件的,这个文件可能当前会有很多client正在在读,即使有一个client正在写入。

lease manager支持的操作包括:

为一个client增加一个租约和路径(如果这个client已经有了一个租约,则增加这个路径到这个租约。否则,创建一个新的租约,并添加路径到租约里面) 移除client的租约和路径(如果这是租约的最后一个路径,则移除这个租约) 检查soft / hard limit是否过期,和对给定的client进行renew租约。 lease manager有一个监控线程周期行的检查租约是否超过了hard limit,如果过期,则触发这个文件的租约恢复。

HDFS client会通过org.apache.hadoop.hdfs.LeaseRenewer.LeaseRenewer类renews它自己的租约。

(注意: 一个HDFS client只会关联一个NameNode; 请看它的构造器 org.apache.hadoop.hdfs.DFSClient)。 如果同一个应用想要访问联邦集群的不同NS的不同的文件,需要为每一个NameNode创建client。

Lease Recovery Process

租约的恢复过程被NameNode触发用来对给定的client进行恢复租约,通过监控线程检查到达hard limit的有效期,或者soft limit过期时,其他客户端尝试接管租约,它检查每个相同的client打开的文件,如果这个文件的最后一个block不在COMPLETE状态中,则执行块恢复,并且关闭这个文件,一个文件的block恢复只有在恢复这个文件的租约的时候被触发。

下面是给定文件f 租约恢复算法的过程,当client死了,该过程同样应用于每个client打开文件进行写的操作。

  • 获取包含文件f最后一个block的DataNode
  • 分配一个DataNode作为主要的DataNode p
  • p 在NameNode中维护一个新的GS
  • p 从其他DataNode获取这个block的信息
  • p 计算这个block的最小长度
  • p 更新DataNodes, 拥有合法的GS, 即新的GS和最小的block长度。
  • p 确认NameNode更新的结果.
  • NameNode更新BlockInfo
  • NameNode 移除文件 f 的租约(其他写的操作现在可以维护这个文件f的租约,进行写入操作)
  • NameNode 提交这些改变到edit log.

步骤3到步骤7是block恢复部分的算法,如果一个文件需要block恢复,NameNode将会挑选这个文件的的最后一个block的副本的DataNode作为主datanode,并告诉这个DataNode协调其他DataNode进行block恢复工作,那些DataNode完成后将会汇报给NameNode,NameNode会定期更新这个block的状态,移除它的租约,并提交这些改变到edit log中。

总结

租约恢复,块恢复,和pipeline恢复对HDFS容错至关重要,他们可以确保存在网络故障和节点故障的情况下保证HDFS的持久性和一致性。

参考资料

Understanding HDFS Recovery Processes

[HADOOP问题] 常见问题解决

问题1:在程序的日志中看到,在reduce阶段出现了异常:Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out ,程序里需要打开文件,系统默认为1024,也可以通过ulimit -a查看

编辑文件/etc/security/limits.conf 在文件后面添加:

# End of file
*       soft       nofile  102400
*       hard       nofile  409600

遇到这种错误网上也有不同的可能解决方法和解释,你们可以自己找找。

问题2: yarn日志页面出现异常:

Java HotSpot(TM) 64-Bit Server VM warning: Insufficient space for shared memory file:
   /tmp/hsperfdata_hdp/6676
Try using the -Djava.io.tmpdir= option to select an alternate temp location

原因是根目录空间不足,解决问题的办法之一是清理根目录下不必要的文件,解决办法二就是,上面也就有提示了,不多说了。

/tmp/hsperfdata_username 目录的作用是什么呢?

jvm运行时在linux下默认在/tmp下生成上面的目录,目录下存放pid文件,和一些jvm进程信息,jmap、jstack等工具会读取该目录下的pid文件获取链接信息

问题3: 当任务不能跑满集群的时,为什么集群的节点会出现几个节点跑满容器,而其他节点则非常空闲?

原因是集群调度器默认处于批处理模式下,一个心跳会尽可能的分配任务,心跳先到达则会优先领取任务,我们可以通过参数yarn.scheduler.fair.max.assign参数设置为1,就可以大致的均衡任务到不同的节点

更新时间2015-07-18

[原]如何利用hadoop RPC框架实现和NameNode的交互

这篇文章主要介绍如何在已有的Hadoop RPC框架上,自定义新的方法实现和NameNode的交互。

在此之前,我们需要准备:

  • hadoop的源码
  • protobuf 2.5版本
  • JDK

hadoop 2.x版本中采用了Protocol Buffer (简称protobuf)作为序列化和反序列化的工具,所以我们在修改源码时需要按照相应规则编写message来实现数据的传输。

什么是protobuf?

protobuf是Google 公司内部的混合语言数据标准,它很适合做数据存储或 RPC 数据交换格式。是一种可用于通讯协议、数据存储等领域,并且和语言无关、平台无关、可扩展的序列化结构数据格式。 简单说来 Protobuf 的主要优点就是:简单,快。

安装protobuf和编译hadoop的过程网上的资料很多,我就直接跳过了,我们可以通过Idea导入hadoop的Maven项目,方便对源码的修改

1.修改proto文件,定义message和service

假设我们现在要实现的是一个检查某个文件或文件夹权限是否符合755,并对客户端返回boolean值。 这是一个属于Client和NameNode交互的一个方法,所以我们在Idea中ctrl+shift+N快速的找到ClientNamenodeProtocol.proto,添加对应的message(结构化数据被称为message)

message CheckPermissionRequestProto {
    required string src = 1;
}
message CheckPermissionResponseProto {
    required bool checkPerm = 1;
}

我们在这个文件中会看到除了string、bool类型的前面会有三种消息成员的规则,他们的含义分别是:

  • required:这个域在消息中必须刚好有1个
  • optional:这个域在消息中可以有0或1个
  • repeated:这个域在消息中可以有从多个,包括0个

在文件中找到service,并添加方法checkPermission方法

service ClientNamenodeProtocol { 
 ...... 
rpc checkPermission(CheckPermissionRequestProto) returns(CheckPermissionResponseProto);
}

接下来编译,编译之后你可以在ClientNamenodeProtocolProtos类(编译后生成)的接口ClientNamenodeProtocol,看到新增加的方法了。

2.ClientNamenodeProtocolPB文件

这个接口是client用来和NameNode进行交互的,它继承了ClientNamenodeProtocol接口,即新生成的接口也在其中,这里不用做修改

3.ClientNamenodeProtocolTranslatorPB类

这个类是将对ClientProtocol中方法的调用转化为RPC调用Namenode的服务,并将调用参数转化为PB的类型。 所以,我们需要在ClientProtocol增加checkPermission方法,并在这个类中进行Override

在ClientProtocol中增加

  @Idempotent
  public boolean checkPermission(String src)
       throws AccessControlException, FileNotFoundException,
       UnresolvedPathException, IOException;

在ClientNamenodeProtocolTranslatorPB类中

  @Override
  public boolean checkPermission(String src) throws AccessControlException,
            FileNotFoundException, UnresolvedPathException, IOException {
     CheckPermissionRequestProto req = CheckPermissionRequestProto.newBuilder()
            .setSrc(src).build();
     try {
         return rpcProxy.checkPermission(null,req).getCheckPerm();
     } catch (ServiceException e) {
         throw ProtobufHelper.getRemoteException(e);
     }
  }

注意:要把CheckPermissionRequestProto进行import,否则编译你懂的。

相应的我们也需要在NameNodeRpcServer类中Override该方法,因为NamenodeProtocols继承了ClientProtocol,这类负责处理所有到达NN的RPC call,他负责将请求转化为NN的方法的调用,因此也可以看出它们的实现分层是很清晰的。

  @Override // ClientProtocol
  public boolean checkPermission(String src)
      throws AccessControlException, FileNotFoundException, UnresolvedPathException, IOException {
      return namesystem.checkPermission(src);
  }

4.ClientNamenodeProtocolServerSideTranslatorPB类

该类是server端用来将ClientNamenodeProtocolTranslatorPB生成的PB格式的数据转化为本地调用的数据类型,所以增加改方法

  @Override
  public CheckPermissionResponseProto checkPermission(RpcController controller,
                                                      CheckPermissionRequestProto req)
           throws ServiceException {
      try {
          boolean result = server.checkPermission(req.getSrc());
          return CheckPermissionResponseProto.newBuilder().setCheckPerm(result).build(); //将结果返回
      } catch (IOException e) {
          throw new ServiceException(e);
      }

  }

这里的server就是NameNodeRpcServer,相当于调用NameNodeRpcServer的checkPermission方法,并在NameNodeRpcServer中调用FSNamesystem完成最后的逻辑

Namesystem类增加方法

  boolean checkPermission(String src) throws IOException {  
      readLock();
      try {
          HdfsFileStatus fileStatus = getFileInfo(src,false); //这个方法我有做过修改,你们可能不一样
          FsPermission fsPermission = new FsPermission((short)0755);
          if(fileStatus != null && fsPermission.equals(fileStatus.getPermission())) {
              return true;
          }
      } finally {
          readUnlock();
      }
      return false;
  }

接下来我就举个例子怎么调用,这只是部分代码,详细的自己看看源码吧。

  proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
      ClientProtocol.class, nnFallbackToSimpleAuth);
  this.namenode = proxyInfo.getProxy();
  namenode.checkPermission(src);

5.最后一步就是编译了

希望通过这个例子能够加深对hadoop实现的理解

参考资料

Google Protocol Buffer 的使用和原理