这篇文章主要介绍如何在已有的Hadoop RPC框架上,自定义新的方法实现和NameNode的交互。

在此之前,我们需要准备:

  • hadoop的源码
  • protobuf 2.5版本
  • JDK

hadoop 2.x版本中采用了Protocol Buffer (简称protobuf)作为序列化和反序列化的工具,所以我们在修改源码时需要按照相应规则编写message来实现数据的传输。

什么是protobuf?

protobuf是Google 公司内部的混合语言数据标准,它很适合做数据存储或 RPC 数据交换格式。是一种可用于通讯协议、数据存储等领域,并且和语言无关、平台无关、可扩展的序列化结构数据格式。 简单说来 Protobuf 的主要优点就是:简单,快。

安装protobuf和编译hadoop的过程网上的资料很多,我就直接跳过了,我们可以通过Idea导入hadoop的Maven项目,方便对源码的修改

1.修改proto文件,定义message和service

假设我们现在要实现的是一个检查某个文件或文件夹权限是否符合755,并对客户端返回boolean值。 这是一个属于Client和NameNode交互的一个方法,所以我们在Idea中ctrl+shift+N快速的找到ClientNamenodeProtocol.proto,添加对应的message(结构化数据被称为message)

message CheckPermissionRequestProto {
    required string src = 1;
}
message CheckPermissionResponseProto {
    required bool checkPerm = 1;
}

我们在这个文件中会看到除了string、bool类型的前面会有三种消息成员的规则,他们的含义分别是:

  • required:这个域在消息中必须刚好有1个
  • optional:这个域在消息中可以有0或1个
  • repeated:这个域在消息中可以有从多个,包括0个

在文件中找到service,并添加方法checkPermission方法

service ClientNamenodeProtocol { 
 ...... 
rpc checkPermission(CheckPermissionRequestProto) returns(CheckPermissionResponseProto);
}

接下来编译,编译之后你可以在ClientNamenodeProtocolProtos类(编译后生成)的接口ClientNamenodeProtocol,看到新增加的方法了。

2.ClientNamenodeProtocolPB文件

这个接口是client用来和NameNode进行交互的,它继承了ClientNamenodeProtocol接口,即新生成的接口也在其中,这里不用做修改

3.ClientNamenodeProtocolTranslatorPB类

这个类是将对ClientProtocol中方法的调用转化为RPC调用Namenode的服务,并将调用参数转化为PB的类型。 所以,我们需要在ClientProtocol增加checkPermission方法,并在这个类中进行Override

在ClientProtocol中增加

  @Idempotent
  public boolean checkPermission(String src)
       throws AccessControlException, FileNotFoundException,
       UnresolvedPathException, IOException;

在ClientNamenodeProtocolTranslatorPB类中

  @Override
  public boolean checkPermission(String src) throws AccessControlException,
            FileNotFoundException, UnresolvedPathException, IOException {
     CheckPermissionRequestProto req = CheckPermissionRequestProto.newBuilder()
            .setSrc(src).build();
     try {
         return rpcProxy.checkPermission(null,req).getCheckPerm();
     } catch (ServiceException e) {
         throw ProtobufHelper.getRemoteException(e);
     }
  }

注意:要把CheckPermissionRequestProto进行import,否则编译你懂的。

相应的我们也需要在NameNodeRpcServer类中Override该方法,因为NamenodeProtocols继承了ClientProtocol,这类负责处理所有到达NN的RPC call,他负责将请求转化为NN的方法的调用,因此也可以看出它们的实现分层是很清晰的。

  @Override // ClientProtocol
  public boolean checkPermission(String src)
      throws AccessControlException, FileNotFoundException, UnresolvedPathException, IOException {
      return namesystem.checkPermission(src);
  }

4.ClientNamenodeProtocolServerSideTranslatorPB类

该类是server端用来将ClientNamenodeProtocolTranslatorPB生成的PB格式的数据转化为本地调用的数据类型,所以增加改方法

  @Override
  public CheckPermissionResponseProto checkPermission(RpcController controller,
                                                      CheckPermissionRequestProto req)
           throws ServiceException {
      try {
          boolean result = server.checkPermission(req.getSrc());
          return CheckPermissionResponseProto.newBuilder().setCheckPerm(result).build(); //将结果返回
      } catch (IOException e) {
          throw new ServiceException(e);
      }

  }

这里的server就是NameNodeRpcServer,相当于调用NameNodeRpcServer的checkPermission方法,并在NameNodeRpcServer中调用FSNamesystem完成最后的逻辑

Namesystem类增加方法

  boolean checkPermission(String src) throws IOException {  
      readLock();
      try {
          HdfsFileStatus fileStatus = getFileInfo(src,false); //这个方法我有做过修改,你们可能不一样
          FsPermission fsPermission = new FsPermission((short)0755);
          if(fileStatus != null && fsPermission.equals(fileStatus.getPermission())) {
              return true;
          }
      } finally {
          readUnlock();
      }
      return false;
  }

接下来我就举个例子怎么调用,这只是部分代码,详细的自己看看源码吧。

  proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
      ClientProtocol.class, nnFallbackToSimpleAuth);
  this.namenode = proxyInfo.getProxy();
  namenode.checkPermission(src);

5.最后一步就是编译了

希望通过这个例子能够加深对hadoop实现的理解

参考资料

Google Protocol Buffer 的使用和原理