前面已经说过HDFS的HA的相关机制简单了解NameNode的ZKFC机制,所以我们接着上面的说,YARN的HA切换由EmbeddedElectorService类控制,和ZKFailoverController的ElectorCallbacks一样,实现了ActiveStandbyElectorCallback接口,他们的区别是fenceOldActive方法的实现

  private Stat fenceOldActive() throws InterruptedException, KeeperException {
    .......
    if (Arrays.equals(data, appData)) {
      LOG.info("But old node has our own data, so don't need to fence it.");
    } else {
      appClient.fenceOldActive(data);
    }
    return stat;
  }

ZKFailoverController为,先GracefulFence,不行则进行真正的fence

  private synchronized void fenceOldActive(byte[] data) {
    HAServiceTarget target = dataToTarget(data);

    try {
      doFence(target);
    } catch (Throwable t) {
      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
      Throwables.propagate(t);
    }
  }

而Yarn的HA则为

 @Override
  public void fenceOldActive(byte[] oldActiveData) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Request to fence old active being ignored, " +
          "as embedded leader election doesn't support fencing");
    }
  }

NameNode通过rpc或ssh kill的防止脑裂,而ZKRMStateStore是怎么在防止脑裂的呢?

在ZKRMStateStore中,大部分的操作都会在实际操作之前创建RM_ZK_FENCING_LOCK的文件,操作完成之后则删除对应的文件,这些操作是事务性的,这样意味着同时只有一个client去写rmstore目录,当有两个rm同时写,创建RM_ZK_FENCING_LOCK时则会抛出异常,同时rm则会捕获异常,并将自己的状态转化为standby的状态。

private synchronized void doDeleteMultiWithRetries( final List<Op> opList) throws Exception { 
  final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2); 
  execOpList.add(createFencingNodePathOp); execOpList.addAll(opList); 
  execOpList.add(deleteFencingNodePathOp); 
  new ZKAction<Void>() { 
    @Override 
    public Void run() throws KeeperException, InterruptedException { 
      setHasDeleteNodeOp(true); 
      zkClient.multi(execOpList);
       return null;
   } }.runWithRetries(); }

举一个例子,异常会被store.notifyStoreOperationFailed(e)处理

public void transition(RMStateStore store, RMStateStoreEvent event) {
    ......
      try {
        LOG.info("Storing RMDelegationToken and SequenceNumber");
        store.storeRMDelegationTokenState(
            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
      } catch (Exception e) {
        LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
            e);
        store.notifyStoreOperationFailed(e);
      }
    }

这里就进行context相关的关闭,转化为standby的状态

  /**
   * This method is called to notify the ResourceManager that the store
   * operation has failed.
   * @param failureCause the exception due to which the operation failed
   */
  protected void notifyStoreOperationFailed(Exception failureCause) {
    if (failureCause instanceof StoreFencedException) {
      updateFencedState();
      Thread standByTransitionThread =
          new Thread(new StandByTransitionThread());
      standByTransitionThread.setName("StandByTransitionThread Handler");
      standByTransitionThread.start();
    } else {
      rmDispatcher.getEventHandler().handle(
        new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));
    }
  }

除了防止同时写的情况发生,ZKRMStateStore还在切换的时候对ZKRMStateStore的存储目录进行权限的设置,只允许自己读写,其他用户只有读的权限,我们可以通过zk命令去看到这样的权限设置

[zk: localhost:2181(CONNECTED) 27] ls /yarn-test/rmstore/ZKRMStateRoot    
[AMRMTokenSecretManagerRoot, RMAppRoot, EpochNode, RMVersionNode, RMDTSecretManagerRoot]

[zk: localhost:2181(CONNECTED) 28] getAcl /yarn-test/rmstore/ZKRMStateRoot
'world,'anyone
: rwa
'digest,'xxx-xxxx-xxx15.hadoop.xxx.com:0vfG9l2cyt85oF5/H01oip5KEGU=
: cd

参考资料

[RM HA3] Zookeeper在RM HA的应用