[基础操作] 上线节点和下线节点步骤参考

一、上线节点

1、将新增节点追加到/etc/hosts中,同步到集群所有节点

2、将新节点生成新的机架信息更新到rack.data中,并同步到集群所有节点

3、将新的节点加入到$HADOOP_CONF_DIR/slaves,$HADOOP_CONF_DIR/hosts/datanode_hosts,$HADOOP_CONF_DIR/hosts/exclude_mapred_hosts,$HADOOP_CONF_DIR/hosts/mapred_hosts中,同步到集群所有节点。

4、刷新NN,RM

刷新NN
hdfs dfsadmin -refreshNodes
刷新RM
yarn rmadmin -refreshNodes

5、启动DN,并继续保持balance,直到达到一定(建议40%)存储左右的容量

Tips1: 因为在机器刚加进集群时,如果该节点启动计算,将会耗费大量的网络带宽,影响在上面跑的Task,从而影响任务,而任务在写数据时,如果来自DN节点的请求,会先在本地写一份数据,再写远程节点,因此不会影响原有任务,当新节点数据达到一定量时,可以启动NM,换句话说,相当于提高了任务的本地化率,降低影响任务的风险.

Tips2: 在Balance同时,我们可以适当的增加某些目录的副本数一定时间后,恢复副本数量,这样可以加快Balance的效率,让节点能够较快的达到平衡状态。

hadoop-daemon.sh start datanode

6、将节点从$HADOOP_CONF_DIR/hosts/exclude_mapred_hosts去掉,刷新yarn rmadmin -refreshNodes,在启动NM

yarn-daemon.sh start nodemanager

二、下线节点

1、将下线节点加入到$HADOOP_CONF_DIR/hosts/exclude_datanode_hosts,$HADOOP_CONF_DIR/hosts/exclude_mapred_hosts中,同步到所有节点,刷新

刷新NN

hdfs dfsadmin -refreshNodes

刷新RM

yarn rmadmin -refreshNodes

2、直到页面Decommissioned: NUM完成,或者 Number of Under-Replicated Blocks变得较小,几乎没变化时,即可完成下线节点

3、节点下线完成后,把节点从slaves,datanode_hosts,exclude_datanode_hosts,exclude_mapred_hosts,mapred_hosts,机架信息中删除对应的机器信息。

其实也就是选择适合的方式把集群的变动变到最小,这里只做参考

[YARN] 基于ZKRMStateStore的Yarn的HA机制分析

前面已经说过HDFS的HA的相关机制简单了解NameNode的ZKFC机制,所以我们接着上面的说,YARN的HA切换由EmbeddedElectorService类控制,和ZKFailoverController的ElectorCallbacks一样,实现了ActiveStandbyElectorCallback接口,他们的区别是fenceOldActive方法的实现

  private Stat fenceOldActive() throws InterruptedException, KeeperException {
    .......
    if (Arrays.equals(data, appData)) {
      LOG.info("But old node has our own data, so don't need to fence it.");
    } else {
      appClient.fenceOldActive(data);
    }
    return stat;
  }

ZKFailoverController为,先GracefulFence,不行则进行真正的fence

  private synchronized void fenceOldActive(byte[] data) {
    HAServiceTarget target = dataToTarget(data);

    try {
      doFence(target);
    } catch (Throwable t) {
      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
      Throwables.propagate(t);
    }
  }

而Yarn的HA则为

 @Override
  public void fenceOldActive(byte[] oldActiveData) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Request to fence old active being ignored, " +
          "as embedded leader election doesn't support fencing");
    }
  }

NameNode通过rpc或ssh kill的防止脑裂,而ZKRMStateStore是怎么在防止脑裂的呢?

在ZKRMStateStore中,大部分的操作都会在实际操作之前创建RM_ZK_FENCING_LOCK的文件,操作完成之后则删除对应的文件,这些操作是事务性的,这样意味着同时只有一个client去写rmstore目录,当有两个rm同时写,创建RM_ZK_FENCING_LOCK时则会抛出异常,同时rm则会捕获异常,并将自己的状态转化为standby的状态。

private synchronized void doDeleteMultiWithRetries( final List<Op> opList) throws Exception { 
  final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2); 
  execOpList.add(createFencingNodePathOp); execOpList.addAll(opList); 
  execOpList.add(deleteFencingNodePathOp); 
  new ZKAction<Void>() { 
    @Override 
    public Void run() throws KeeperException, InterruptedException { 
      setHasDeleteNodeOp(true); 
      zkClient.multi(execOpList);
       return null;
   } }.runWithRetries(); }

举一个例子,异常会被store.notifyStoreOperationFailed(e)处理

public void transition(RMStateStore store, RMStateStoreEvent event) {
    ......
      try {
        LOG.info("Storing RMDelegationToken and SequenceNumber");
        store.storeRMDelegationTokenState(
            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
      } catch (Exception e) {
        LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
            e);
        store.notifyStoreOperationFailed(e);
      }
    }

这里就进行context相关的关闭,转化为standby的状态

  /**
   * This method is called to notify the ResourceManager that the store
   * operation has failed.
   * @param failureCause the exception due to which the operation failed
   */
  protected void notifyStoreOperationFailed(Exception failureCause) {
    if (failureCause instanceof StoreFencedException) {
      updateFencedState();
      Thread standByTransitionThread =
          new Thread(new StandByTransitionThread());
      standByTransitionThread.setName("StandByTransitionThread Handler");
      standByTransitionThread.start();
    } else {
      rmDispatcher.getEventHandler().handle(
        new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));
    }
  }

除了防止同时写的情况发生,ZKRMStateStore还在切换的时候对ZKRMStateStore的存储目录进行权限的设置,只允许自己读写,其他用户只有读的权限,我们可以通过zk命令去看到这样的权限设置

[zk: localhost:2181(CONNECTED) 27] ls /yarn-test/rmstore/ZKRMStateRoot    
[AMRMTokenSecretManagerRoot, RMAppRoot, EpochNode, RMVersionNode, RMDTSecretManagerRoot]

[zk: localhost:2181(CONNECTED) 28] getAcl /yarn-test/rmstore/ZKRMStateRoot
'world,'anyone
: rwa
'digest,'xxx-xxxx-xxx15.hadoop.xxx.com:0vfG9l2cyt85oF5/H01oip5KEGU=
: cd

参考资料

[RM HA3] Zookeeper在RM HA的应用

[Spark] Spark的History关联对应的executor的日志

场景在于,因为很多用户想在Spark的history上看到对应的executor日志,其实这个并不是Spark的什么新功能,他们不需要用yarn application -logs下载所有的日志,因为我们之前一直忽略这个,没配置,后来配置上就可以了。

除了要开启yarn的日志收集功能之外

<property>
     <name>yarn.log-aggregation-enable</name>
     <value>true</value>
</property>

还要在yarn-site.xml增加参数,并且重启NodeManager即可,当我们访问时,就会帮我们重定向到日志服务器上,找到对应的日志

<property>
      <name>yarn.log.server.url</name>
      <value>http://HistoryServerAddress:19888/jobhistory/logs</value>
</property>

然后点击对应的日志链接就可以链接过去了

记录一下以免忘记。

[YARN] FairScheduler的资源分配机制分析(一)

以下是队列资源分配的简单架构

调度器的container资源分配

通过简图可以看出,container是分配是随着每一次nodeupdate而进行资源分配的,在每一次尝试调度container之前,首先会检查改节点是否曾经有预留的app(预留指的是该node曾经因为资源不足,为了提高本地性原因,当节点有资源更新时,优先的把这个节点的资源分配给这个app的策略)

// 类名FairScheduler.java
//  Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations

FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
  Priority reservedPriority = node.getReservedContainer().getReservedPriority();
  FSQueue queue = reservedAppSchedulable.getQueue();
// 之前有预留,不满足条件则释放
  if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
      || !fitsInMaxShare(queue,
      node.getReservedContainer().getReservedResource())) {
    // Don't hold the reservation if app can no longer use it
    LOG.info("Releasing reservation that cannot be satisfied for application "
        + reservedAppSchedulable.getApplicationAttemptId()
        + " on node " + node);
    reservedAppSchedulable.unreserve(reservedPriority, node);
    reservedAppSchedulable = null;
  } else {
    // Reservation exists; try to fulfill the reservation
    if (LOG.isDebugEnabled()) {
      LOG.debug("Trying to fulfill reservation for application "
          + reservedAppSchedulable.getApplicationAttemptId()
          + " on node: " + node);
    }
   // 满足条件,则分配
    node.getReservedAppSchedulable().assignReservedContainer(node);
  }
}

总的来说:

  • 1、如果该app在该节点没有container的需求,则释放预留container
  • 2、如果该app在该节点还有container的需求,但队列的MaxShare无法满足,则释放该节点预留的container
  • 3、如果该app这个优先级的资源的请求已经为0,释放预留的container

否则检查这个node的可用资源和需求的资源是否满足,如果不满足,则预留,满足则分配

如果该节点没有app预留,则把该node从RootQueue进行分配下去,其中还有分配参数assignMultiple和maxAssign限制该node分配的container数量。

因为我们通过图可以知道,其实资源分配可以看成三层结构,ParetnQueue到LeafQueue,再到FSAppAttempt

那我们提出一个问题,当资源分配,如何选取最适合的队列?如何选取最适合的App?

在每次进行资源分配,都会采取排序的操作

Collections.sort(childQueues, policy.getComparator());

app进行排序

Collections.sort(runnableApps, comparator);

找出最适合的子队列和最适合的app, 对于采取fair算法策略的队列来说,其比较实现在FairShareComparator的compare方法中,对于每个可排序的Schedulable,包括叶子队列,FSAppAttempt,第一步都会计算两个Schedulable的minShare,即通过配置的最小资源要求和每个Schedulable的demand进行比较的最小值

  Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
      s1.getMinShare(), s1.getDemand());
  Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
      s2.getMinShare(), s2.getDemand());

然后通过当前的资源使用和minshare进行比较等等,详细可以自行查看源码

  boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s1.getResourceUsage(), minShare1);
  boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s2.getResourceUsage(), minShare2);

以上可以总结为,如果Schedulable都低于他们的minShare,则通过他们低于minShare的比例的多少来比较,例如,如果一个job A拥有8个task(minShare 10即比例为80%),job B拥有50个task(minShare 100即比例为50%),则job B将会有更高的优先级在下次资源分配时获取资源。

如果Schedulable在他们的minShare之上,则通过比较他们的(runningTasks/weight),如果所有的权重都相等,则slot资源需求少的job优先获得资源,另外,如果一个job拥有更高的权重则拥有机会获得多slot。

解决完选取子队列和app之后,接下来FSAppAttempt自身处理资源的问题了,也就是主要考虑这个FSAppAttempt的计算时的本地性问题

为了提升本地性,对于每个优先级,都会尝试的优先分配本地节点,然后再是机架,off-switch的请求通常会被延迟调度。

分配的策略简要为

从appSchedulingInfo中获取该节点或机架,该优先级对应的资源请求

ResourceRequest rackLocalRequest = getResourceRequest(priority,
    node.getRackName());
ResourceRequest localRequest = getResourceRequest(priority,
    node.getNodeName());

然后满足条件的情况下尝试分配node local的节点,如果满足分配条件则分配成功,如果container的请求满足队列最大的MaxShare,则预留该节点给这个FSAppAttempt

if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
    && localRequest != null && localRequest.getNumContainers() != 0) {
  return assignContainer(node, localRequest,
      NodeType.NODE_LOCAL, reserved);
} 

因为我们经常想优先的调度node-local的container, 其次再为rack-local和off-switch的container去保证最大可能的本地性,为了达到这个目标,我们首先对给定的优先级分配node-local,如果我们在很长的一段时间内没有成功调度,则放松本地性的阀值。

如上所述,通过判断上一次的NodeType类型去判断使用nodeLocalityThreshold( yarn.scheduler.fair.locality.threshold.node默认-1.0f ) 或 rackLocalityThreshold( yarn.scheduler.fair.locality.threshold.rack默认-1.0f ),从而去决定是否改变本地性级别。

if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
  // 满足调度机会条件,则将NODE_LOCAL级别降为RACK_LOCAL级别
  if (allowed.equals(NodeType.NODE_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
    resetSchedulingOpportunities(priority);
  }
  // 满足调度机会条件,则将RACK_LOCAL级别降为OFF_SWITCH级别
  else if (allowed.equals(NodeType.RACK_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
    resetSchedulingOpportunities(priority);
  }
}
// 否则本地性等级不变
return allowedLocalityLevel.get(priority);

Application所需要的demand如何计算?

在FSAppAttempt中,其维护着每一个Application在Fair Scheduler的相关调度信息,包括Application所需要的demand,它的fairShare和allowedLocalityLevel等其他相关信息

然而在上文也有提到,每个FSAppAttempt或queue都会用它的minshare和demand比较,而demand是怎么来的呢?

在FSAppAttempt中,每一次AppMaster申请资源都会更新appSchedulingInfo里面的requests对象

在FairScheduler中则有线程定时的去调度UpdateThread线程,去重新更新每个队列,FSAppAttempt所需demand和重新计算FairShare等

每一层的demand等于min(下一层demand,最大资源分配),FSAppAttempt的demand则为当前使用的资源大小+未分配的资源大小

  @Override
  public void updateDemand() {
    demand = Resources.createResource(0);
    // Demand is current consumption plus outstanding requests
    Resources.addTo(demand, getCurrentConsumption());

    // Add up outstanding resource requests
    synchronized (this) {
      for (Priority p : getPriorities()) {
        for (ResourceRequest r : getResourceRequests(p).values()) {
          Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
          Resources.addTo(demand, total);
        }
      }
    }
  }

[YARN] FairScheduler设置AM的vcore无法生效

在集群上了Linux Container之后,如果我们想增加AM在NM上获取时间片的能力,我们可以在配置文件中找到以下参数

<property> 
<name>yarn.app.mapreduce.am.resource.cpu-vcores</name> 
<value>15</value> 
</property>

该参数描述的是给AM多少个虚拟核,但是查看RM日志确发现,不管怎么设置AM的核数量都无法生效

[2016-10-27T16:36:37.280 08:00] [INFO] resourcemanager.scheduler.SchedulerNode.allocateContainer(SchedulerNode.java 153) [ResourceManager Event Processor] : Assigned container container_1477059529836_336635_01_000001 of capacity <memory:2048, vCores:1>

原因是对于FairScheduler默认的策略说,每次都会对AM的提交请求进行规整,而且只会考虑mem的情况

SchedulerUtils.normalizeRequest(amReq, scheduler.getResourceCalculator(), scheduler.getClusterResource(), scheduler.getMinimumResourceCapability(), scheduler.getMaximumResourceCapability(), scheduler.getMinimumResourceCapability());

在DefaultResourceCalculator类中,只对内存进行规整,createResource(memory, (memory > 0) ? 1 : 0),如果内存大于0,则默认变为1

  @Override
  public Resource normalize(Resource r, Resource minimumResource,
      Resource maximumResource, Resource stepFactor) {
    int normalizedMemory = Math.min(
        roundUp(
            Math.max(r.getMemory(), minimumResource.getMemory()),
            stepFactor.getMemory()),
            maximumResource.getMemory());
    return Resources.createResource(normalizedMemory);
  }

所以在默认的DefaultResourceCalculator当中,当你不管怎么设置AM的核数都无法生效

[基础操作] 新增加NS操作参考

0.前期环境准备

  • 硬件检查确认准备的机器的物理内存是和之前的NameNode一样
  • 打通hadp用户nn1到nn5,nn6的ssh
  • 打通nn5,nn6的hadp用户的双方的ssh,两台机器互通,不需要输入密码
  • 在nn5,nn6新建mapred,yarn用户,并将hadp,yarn,mapred用户加入hadoop组
  • 调整linux网络参数和其余NS的NameNode保持一致
  • 把新增节点的host加入到集群的/etc/hosts

1.找到奇数台机器,并安装配置ZooKeeper(如果共享Zookeeper,跳过此步骤)

zkHost1-zkHost5

sh zkServer.sh start

2.找到奇数台机器,安装配置hadoop,并启动JournalNode

jnHost1-jnHost5

hadoop-daemon.sh start journalnode

3.增加配置(假设添加NS3)

修改core-site.xml(大体需要注意)

    <property>
      <name>fs.defaultFS</name>
      <value>hdfs://ns3</value>
    </property>

如果和原来的NS不共用ZK,修改填写ZK地址

    <property>
       <name>ha.zookeeper.quorum</name>
       <value>zkHost1:2181,zkHost2:2181,zkHost3:2181,zkHost4:2181,zkHost5:2181</value>
    </property>

修改hdfs-site.xml,增加NS3

    <property>
      <name>dfs.nameservices</name>
      <value>ns1,ns2,ns3</value>
    </property>

增加配置

<property>
  <name>dfs.ha.namenodes.ns3</name>
  <value>nn5,nn6</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.ns3.nn5</name>
  <value>nn5Host:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.ns3.nn6</name>
  <value>nn6Host:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.ns3.nn5</name>
  <value>nn5Host:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.ns3.nn6</name>
  <value>nn6Host:50070</value>
</property>
<property>
    <name>dfs.namenode.servicerpc-address.ns3.nn5</name>
    <value>nn5Host:8021</value>
</property>
<property>
    <name>dfs.namenode.servicerpc-address.ns3.nn6</name>
    <value>nn6Host:8021</value>
</property>
<property>
    <name>dfs.client.failover.proxy.provider.ns3</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

修改JournalNode的上传地址和目录

 <property> 
        <name>dfs.namenode.shared.edits.dir</name> 
        <value>qjournal://jnHost1:8485;jnHost2:8485;jnHost3:8485;jnHost4:8485;jnHost5:8485/ns3</value> 
</property>

4.执行操作

找到ns1的集群cid(例如:CID-338ca6d3-15bb-4941-bb1a-8faa3c3ba79d)

在nn5上指定clusterId执行

hdfs namenode -format -clusterId cid

hadoop-daemon.sh start namenode

在nn6上执行

hdfs namenode -bootstrapStandby

hadoop-daemon.sh start namenode

刷新datanode

cat $HADOOP_CONF_DIR/slaves | xargs -t -i hdfs dfsadmin -refreshNamenodes {}:50020

初始化zkfc目录

hdfs zkfc -formatZK

在两个节点上启动ZKFC

hadoop-daemon.sh start zkfc

滚动重启datanode

[Hadoop] 自定义Hadoop ACL设计

原文已经发表在数据杂货铺,转载请标注原文出处

背 景

随着集群变大,使用用户越来越多,原生的Hadoop权限控制已经不能满足用户的需求,社区也围绕着安全做了很多工作,例如原生支持的Kerberos认证和LDAP,Apache Sentry和Apache Ranger等开源项目也在不同程度上做了很多工作,但是各自都有自己的优缺点,所以按照自身需求选择不同的尤为重要。

JDH ACL设计原理

新版的Hadoop权限设计主要围绕着两方面:用户和组的关系,组和目录之间的关系,并且以组作为权限管理的单位,在新版本的Hadoop开放了INodeAttributeProvider类,允许用户去自定义权限检查的逻辑,Jdh ACL的权限控制就是基于该前提去实现用户级别的权限控制。

新的ACL并没有完全的抛弃原生Hadoop的权限检查,而是在其基础上增加了多组的权限检查规则

即使新版本的Hadoop中支持了像Linux一样的ACL(即支持一个文件可以属于多用户和多组的设置,并且可以设置多种权限,权限的设置需要手动递归,不能自动依赖与上层目录的权限),但是不好的地方在于权限不方便集中的统一管理,而且权限数据会固化到fsimage中等等,所以我们希望找到一种集中管理权限,灵活配置权限的方法,并且以插件的方式实现权限管理的方案,社区中Apache Ranger项目基本符合这种需求,但由于其并不能很好的和公司已有的系统兼容,所以我们参照Ranger的设计思路,按照适合自身的权限需求,设计了自己的一套ACL控制。

像文章开头所说,我们是以组为最小单位做权限控制的,即权限规则的检查也是基于组这个概念,用户是否能够有权限访问这个目录,也就是通过组给关联起来

从上图可以知道,他们之间的映射关系主要可以分为两部分:用户和组的映射关系,组和目录的关系

用户和组的映射关系

对于用户和组的映射关系,hadoop早已经开放接口(GroupMappingServiceProvider),允许开发者自己实现用户和组的映射关系,LdapGroupsMapping也是基于该接口实现用户和组的映射逻辑,我们自己实现了基于文件可配置的用户和组的映射,添加新的映射关系只需要在统一的权限管理界面UGDAP配置好,触发刷新即可

组和目录的关系

组和目录的映射关系和Ranger一样,采用的是JSON的格式形式存储,对于Ranger来说,多个目录对应多个用户和多个组,其中用户和组可以配置不同的权限,其称为一个Policy,而我们设计的Jdh policy为一个目录对应多个组,不同组分别可以有不同的权限,其中添加了组规则是否递归,是否是排除组的一些概念,以适应自身的需求。

JDH ACL主要一些模块的交互简图如下:

模块的交互简图中PolicyRefresher定时的加载来自UGDAP同步过来的权限策略数据,并重新初始化PolicyEngine和更新PolicyRepository,PolicyRepository会按照一定结构初始化PolicyMaps,以加快权限检查的速度。

权限校验逻辑

对于每一个JDH Policy来说,除了可以配置不同组不同权限之外,还可以配置该目录权限是否递归生效以方便用户权限的配置,另一方面可以配置排除组规则把小部分人加入到排除组中,做到对小部分人关键目录进行隔离。

在每个权限Policy中,都有一个includeMap和excludeMap去存储正常组和排除组规则,权限检查会优先检查excludeMap中的内容,如果符合排除标准,则直接返回,如果排除组检查没有符合排除的标准,则走正常组的权限检查,如果两者都没有符合的标准,则走HDFS原生的权限检查流程。

缓存策略和检查优化

为了尽可能的减少权限路径的查找和路径匹配的时间,我们在PolicyRepository初始化时,对Policy的规则进行了前部分路径的作为查找PolicyMap的key,这样的好处是当一个访问路径/user/bdp/a/b来时,就可以快速的通过key(/user/bdp/a)去找到对应的Policy,如果找不到对应的Policy则默认走Hadoop原生的权限,value则通过目录的深度进行排序,这样也就意味着只要父目录的权限符合要求,就不会检查子目录的权限要求。

对于缓存优化来说,PolicyEngine内部会维护一个固定大小的CacheMap,当进行路径匹配时,可以先从CacheMap里面lookup,看是否已经匹配成功(matchedResourceCache)或不成功的路径(notMatchResourceCache),如果有这直接返回之前的匹配结果,避免了再次匹配目录,如果没有则加入到缓存中,CacheMap通过Key的访问时间去决定这个Entry是否从CacheMap中移除。

[Hadoop] 关于Hadoop的NetworkTopology的遇到的一些问题

背景问题

问题是周末,其他团队HBase推数据变慢了,我也跟着一起找问题,现象是新加的节点的网络流量非常大,后来帮哥发现是原来没有配机架的集群,新机器配了机架,既然加了机架导致这个问题,那我们把机架变为没变机架的时候,是否就可以解决这个问题?后来发现已经加的节点无法从已经加入的机架去除,这就要找根本的问题了。

问题:为什么我更新原有DN的机架信息不生效?

关于机架

我们都知道,Hdfs会根据你配的机架,适当的分配block的分布,AM的也需要相应的机架信息去申请资源,机架信息在DN注册的会执行我们配置的机架感知的脚本,把DN对应到相应的机架当中

  /**
   * Resolve network locations for specified hosts
   *
   * @param names
   * @return Network locations if available, Else returns null
   */
  public List<String> resolveNetworkLocation(List<String> names) {
    // resolve its network location
    List<String> rName = dnsToSwitchMapping.resolve(names);
    return rName;
  }

其中,代码中的resolve方法就是调用了ScriptBasedMapping的resolve中的方法,而ScriptBasedMapping是CachedDNSToSwitchMapping的子类,在调用ScriptBasedMapping的resolve方法之前

  @Override
  public List<String> resolve(List<String> names) {
    // normalize all input names to be in the form of IP addresses
    names = NetUtils.normalizeHostNames(names);

    List <String> result = new ArrayList<String>(names.size());
    if (names.isEmpty()) {
      return result;
    }
//这里就是关键所在
    List<String> uncachedHosts = getUncachedHosts(names);

    // Resolve the uncached hosts
    List<String> resolvedHosts = rawMapping.resolve(uncachedHosts);
    //cache them
    cacheResolvedHosts(uncachedHosts, resolvedHosts);
    //now look up the entire list in the cache
    return getCachedHosts(names);

  }

他会检查这个节点是否曾经在Cache中,如果在即使你的机架信息改变了,还是会从Cache中拿第一次的机架信息,所以更改机架信息对已经注册的DN是不生效的。 对于来说,其实它也是有刷新Cache的方法的,但是他只有抛InvalidTopologyException时才会进行重新刷新

  @Override
  public void reloadCachedMappings() {
    cache.clear();
  }

DatanodeManager类里面的reloadCachedMappings

catch (InvalidTopologyException e) {
      // If the network location is invalid, clear the cached mappings
      // so that we have a chance to re-add this DataNode with the
      // correct network location later.
      List<String> invalidNodeNames = new ArrayList<String>(3);
      // clear cache for nodes in IP or Hostname
      invalidNodeNames.add(nodeReg.getIpAddr());
      invalidNodeNames.add(nodeReg.getHostName());
      invalidNodeNames.add(nodeReg.getPeerHostName());
      dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames);
      throw e;
    }

但对于NetworkTopology来说,当你移除这个机架的最后一个节点的时候,他会相应的把这个机架从NetworkTopology中移除

解决方案和结论

结论是机架信息对于新的注册的DN有效,如果你是变更了原有DN的机架信息是不生效的,只会用第一次注册的机架信息

解决方案:

如果要实现机架变更后,DN重新注册刷新机架信息也是可以的,只要自己实现DNSToSwitchMapping不加Cache就行了

[故障恢复] Active NameNode出现异常处理方案

Active NameNode出现异常处理方案

注意:仅供参考

一般来说Active NameNode出现异常可以分为两种:

  • Active NameNode出现异常宕机,导致无法正常切换
  • Active NameNode因为切换导致被killed

情景一:Active NameNode出现异常宕机

状态描述:

ANN断电、网络异常、负载过高或者机器出现异常无法连接,SNN无法转化为Active,使得HA集群无法对外服务。

解决方案:

确认原ANN已经关机或确保原ANN进程已经挂了,才能使用该命令

操作步骤:

  • 1.对Active NN关机
  • 2.确保关机完成,在SNN上执行 hdfs zkfc -formatZK,如果提示输入Y或者N,则输入Y
  • 3.直到原SNN页面确保变为Active
  • 4.启动原ANN的机器
  • 5.启动原ANN上的NameNode和zkfc进程

情景二:Active NameNode因为切换导致被killed

状态描述:

ANN因为网络或异常任务导致无响应,切换过程中导致其中一台NN被fence

解决方案

根据经验,当SNN在启动的时候和块汇报在一起会引起DN丢节点异常,所以为了避免以上问题,采取下列操作

操作步骤:

  • 先停止该节点的zkfc
  • 把挂掉的NN用防火墙关闭8021(DN汇报端口),禁止DN进行汇报,启动NN,让NN只做合并操作(集群小可以忽略)
  • 等待SNN合并完,并到等待块汇报时,打开8021端口,让DN进行块汇报

[YARN] YARN的HA是如何工作的

YARN的HA已经实现了Active和Standby的架构,即一个ResourceManager是Active,另一个ResourceManager处于Standby模式,如果Active出现问题,随时接管Active

Client_Failover

在下面你将会了解到如何配置重启ResourceManager不会丢失任务的状态,如何对ResourceManager进行切换等相关信息。

ResourceManager状态存储

RM会把其内部的状态(applications和他们的attempts,delegation token和version信息)存储到指定的ResourceManagerStateStore中,NM的节点资源状态则通过心跳定期的汇报给RM.

对于状态存储的实现现有三种:

  • 基于内存(主要用来测试)
  • 基于文件系统(Hdfs或本地)
  • 基于Zookeeper实现

对于ZKStore来说,RM的状态会不断的同步到外部的存储实现当中,其存储的目录结构如下:

RMState主要保存三个状态的数据:appState,rmSecretManagerState,amrmTokenSecretManagerState,其中appState由一个ApplicationId作为Key和ApplicationStateData做为value的TreeMap组成,每个ApplicationStateData包含了对应的apptemps的详细信息,当RM进行状态恢复时,会从zk的存储中,读取这些信息,恢复RMState