[Spark] Spark的History关联对应的executor的日志

场景在于,因为很多用户想在Spark的history上看到对应的executor日志,其实这个并不是Spark的什么新功能,他们不需要用yarn application -logs下载所有的日志,因为我们之前一直忽略这个,没配置,后来配置上就可以了。

除了要开启yarn的日志收集功能之外

<property>
     <name>yarn.log-aggregation-enable</name>
     <value>true</value>
</property>

还要在yarn-site.xml增加参数,并且重启NodeManager即可,当我们访问时,就会帮我们重定向到日志服务器上,找到对应的日志

<property>
      <name>yarn.log.server.url</name>
      <value>http://HistoryServerAddress:19888/jobhistory/logs</value>
</property>

然后点击对应的日志链接就可以链接过去了

记录一下以免忘记。

[YARN] FairScheduler的资源分配机制分析(一)

以下是队列资源分配的简单架构

调度器的container资源分配

通过简图可以看出,container是分配是随着每一次nodeupdate而进行资源分配的,在每一次尝试调度container之前,首先会检查改节点是否曾经有预留的app(预留指的是该node曾经因为资源不足,为了提高本地性原因,当节点有资源更新时,优先的把这个节点的资源分配给这个app的策略)

// 类名FairScheduler.java
//  Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations

FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
  Priority reservedPriority = node.getReservedContainer().getReservedPriority();
  FSQueue queue = reservedAppSchedulable.getQueue();
// 之前有预留,不满足条件则释放
  if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
      || !fitsInMaxShare(queue,
      node.getReservedContainer().getReservedResource())) {
    // Don't hold the reservation if app can no longer use it
    LOG.info("Releasing reservation that cannot be satisfied for application "
        + reservedAppSchedulable.getApplicationAttemptId()
        + " on node " + node);
    reservedAppSchedulable.unreserve(reservedPriority, node);
    reservedAppSchedulable = null;
  } else {
    // Reservation exists; try to fulfill the reservation
    if (LOG.isDebugEnabled()) {
      LOG.debug("Trying to fulfill reservation for application "
          + reservedAppSchedulable.getApplicationAttemptId()
          + " on node: " + node);
    }
   // 满足条件,则分配
    node.getReservedAppSchedulable().assignReservedContainer(node);
  }
}

总的来说:

  • 1、如果该app在该节点没有container的需求,则释放预留container
  • 2、如果该app在该节点还有container的需求,但队列的MaxShare无法满足,则释放该节点预留的container
  • 3、如果该app这个优先级的资源的请求已经为0,释放预留的container

否则检查这个node的可用资源和需求的资源是否满足,如果不满足,则预留,满足则分配

如果该节点没有app预留,则把该node从RootQueue进行分配下去,其中还有分配参数assignMultiple和maxAssign限制该node分配的container数量。

因为我们通过图可以知道,其实资源分配可以看成三层结构,ParetnQueue到LeafQueue,再到FSAppAttempt

那我们提出一个问题,当资源分配,如何选取最适合的队列?如何选取最适合的App?

在每次进行资源分配,都会采取排序的操作

Collections.sort(childQueues, policy.getComparator());

app进行排序

Collections.sort(runnableApps, comparator);

找出最适合的子队列和最适合的app, 对于采取fair算法策略的队列来说,其比较实现在FairShareComparator的compare方法中,对于每个可排序的Schedulable,包括叶子队列,FSAppAttempt,第一步都会计算两个Schedulable的minShare,即通过配置的最小资源要求和每个Schedulable的demand进行比较的最小值

  Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
      s1.getMinShare(), s1.getDemand());
  Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
      s2.getMinShare(), s2.getDemand());

然后通过当前的资源使用和minshare进行比较等等,详细可以自行查看源码

  boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s1.getResourceUsage(), minShare1);
  boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s2.getResourceUsage(), minShare2);

以上可以总结为,如果Schedulable都低于他们的minShare,则通过他们低于minShare的比例的多少来比较,例如,如果一个job A拥有8个task(minShare 10即比例为80%),job B拥有50个task(minShare 100即比例为50%),则job B将会有更高的优先级在下次资源分配时获取资源。

如果Schedulable在他们的minShare之上,则通过比较他们的(runningTasks/weight),如果所有的权重都相等,则slot资源需求少的job优先获得资源,另外,如果一个job拥有更高的权重则拥有机会获得多slot。

解决完选取子队列和app之后,接下来FSAppAttempt自身处理资源的问题了,也就是主要考虑这个FSAppAttempt的计算时的本地性问题

为了提升本地性,对于每个优先级,都会尝试的优先分配本地节点,然后再是机架,off-switch的请求通常会被延迟调度。

分配的策略简要为

从appSchedulingInfo中获取该节点或机架,该优先级对应的资源请求

ResourceRequest rackLocalRequest = getResourceRequest(priority,
    node.getRackName());
ResourceRequest localRequest = getResourceRequest(priority,
    node.getNodeName());

然后满足条件的情况下尝试分配node local的节点,如果满足分配条件则分配成功,如果container的请求满足队列最大的MaxShare,则预留该节点给这个FSAppAttempt

if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
    && localRequest != null && localRequest.getNumContainers() != 0) {
  return assignContainer(node, localRequest,
      NodeType.NODE_LOCAL, reserved);
} 

因为我们经常想优先的调度node-local的container, 其次再为rack-local和off-switch的container去保证最大可能的本地性,为了达到这个目标,我们首先对给定的优先级分配node-local,如果我们在很长的一段时间内没有成功调度,则放松本地性的阀值。

如上所述,通过判断上一次的NodeType类型去判断使用nodeLocalityThreshold( yarn.scheduler.fair.locality.threshold.node默认-1.0f ) 或 rackLocalityThreshold( yarn.scheduler.fair.locality.threshold.rack默认-1.0f ),从而去决定是否改变本地性级别。

if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
  // 满足调度机会条件,则将NODE_LOCAL级别降为RACK_LOCAL级别
  if (allowed.equals(NodeType.NODE_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
    resetSchedulingOpportunities(priority);
  }
  // 满足调度机会条件,则将RACK_LOCAL级别降为OFF_SWITCH级别
  else if (allowed.equals(NodeType.RACK_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
    resetSchedulingOpportunities(priority);
  }
}
// 否则本地性等级不变
return allowedLocalityLevel.get(priority);

Application所需要的demand如何计算?

在FSAppAttempt中,其维护着每一个Application在Fair Scheduler的相关调度信息,包括Application所需要的demand,它的fairShare和allowedLocalityLevel等其他相关信息

然而在上文也有提到,每个FSAppAttempt或queue都会用它的minshare和demand比较,而demand是怎么来的呢?

在FSAppAttempt中,每一次AppMaster申请资源都会更新appSchedulingInfo里面的requests对象

在FairScheduler中则有线程定时的去调度UpdateThread线程,去重新更新每个队列,FSAppAttempt所需demand和重新计算FairShare等

每一层的demand等于min(下一层demand,最大资源分配),FSAppAttempt的demand则为当前使用的资源大小+未分配的资源大小

  @Override
  public void updateDemand() {
    demand = Resources.createResource(0);
    // Demand is current consumption plus outstanding requests
    Resources.addTo(demand, getCurrentConsumption());

    // Add up outstanding resource requests
    synchronized (this) {
      for (Priority p : getPriorities()) {
        for (ResourceRequest r : getResourceRequests(p).values()) {
          Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
          Resources.addTo(demand, total);
        }
      }
    }
  }

[YARN] FairScheduler设置AM的vcore无法生效

在集群上了Linux Container之后,如果我们想增加AM在NM上获取时间片的能力,我们可以在配置文件中找到以下参数

<property> 
<name>yarn.app.mapreduce.am.resource.cpu-vcores</name> 
<value>15</value> 
</property>

该参数描述的是给AM多少个虚拟核,但是查看RM日志确发现,不管怎么设置AM的核数量都无法生效

[2016-10-27T16:36:37.280 08:00] [INFO] resourcemanager.scheduler.SchedulerNode.allocateContainer(SchedulerNode.java 153) [ResourceManager Event Processor] : Assigned container container_1477059529836_336635_01_000001 of capacity <memory:2048, vCores:1>

原因是对于FairScheduler默认的策略说,每次都会对AM的提交请求进行规整,而且只会考虑mem的情况

SchedulerUtils.normalizeRequest(amReq, scheduler.getResourceCalculator(), scheduler.getClusterResource(), scheduler.getMinimumResourceCapability(), scheduler.getMaximumResourceCapability(), scheduler.getMinimumResourceCapability());

在DefaultResourceCalculator类中,只对内存进行规整,createResource(memory, (memory > 0) ? 1 : 0),如果内存大于0,则默认变为1

  @Override
  public Resource normalize(Resource r, Resource minimumResource,
      Resource maximumResource, Resource stepFactor) {
    int normalizedMemory = Math.min(
        roundUp(
            Math.max(r.getMemory(), minimumResource.getMemory()),
            stepFactor.getMemory()),
            maximumResource.getMemory());
    return Resources.createResource(normalizedMemory);
  }

所以在默认的DefaultResourceCalculator当中,当你不管怎么设置AM的核数都无法生效

[基础操作] 新增加NS操作参考

0.前期环境准备

  • 硬件检查确认准备的机器的物理内存是和之前的NameNode一样
  • 打通hadp用户nn1到nn5,nn6的ssh
  • 打通nn5,nn6的hadp用户的双方的ssh,两台机器互通,不需要输入密码
  • 在nn5,nn6新建mapred,yarn用户,并将hadp,yarn,mapred用户加入hadoop组
  • 调整linux网络参数和其余NS的NameNode保持一致
  • 把新增节点的host加入到集群的/etc/hosts

1.找到奇数台机器,并安装配置ZooKeeper(如果共享Zookeeper,跳过此步骤)

zkHost1-zkHost5

sh zkServer.sh start

2.找到奇数台机器,安装配置hadoop,并启动JournalNode

jnHost1-jnHost5

hadoop-daemon.sh start journalnode

3.增加配置(假设添加NS3)

修改core-site.xml(大体需要注意)

    <property>
      <name>fs.defaultFS</name>
      <value>hdfs://ns3</value>
    </property>

如果和原来的NS不共用ZK,修改填写ZK地址

    <property>
       <name>ha.zookeeper.quorum</name>
       <value>zkHost1:2181,zkHost2:2181,zkHost3:2181,zkHost4:2181,zkHost5:2181</value>
    </property>

修改hdfs-site.xml,增加NS3

    <property>
      <name>dfs.nameservices</name>
      <value>ns1,ns2,ns3</value>
    </property>

增加配置

<property>
  <name>dfs.ha.namenodes.ns3</name>
  <value>nn5,nn6</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.ns3.nn5</name>
  <value>nn5Host:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.ns3.nn6</name>
  <value>nn6Host:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.ns3.nn5</name>
  <value>nn5Host:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.ns3.nn6</name>
  <value>nn6Host:50070</value>
</property>
<property>
    <name>dfs.namenode.servicerpc-address.ns3.nn5</name>
    <value>nn5Host:8021</value>
</property>
<property>
    <name>dfs.namenode.servicerpc-address.ns3.nn6</name>
    <value>nn6Host:8021</value>
</property>
<property>
    <name>dfs.client.failover.proxy.provider.ns3</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

修改JournalNode的上传地址和目录

 <property> 
        <name>dfs.namenode.shared.edits.dir</name> 
        <value>qjournal://jnHost1:8485;jnHost2:8485;jnHost3:8485;jnHost4:8485;jnHost5:8485/ns3</value> 
</property>

4.执行操作

找到ns1的集群cid(例如:CID-338ca6d3-15bb-4941-bb1a-8faa3c3ba79d)

在nn5上指定clusterId执行

hdfs namenode -format -clusterId cid

hadoop-daemon.sh start namenode

在nn6上执行

hdfs namenode -bootstrapStandby

hadoop-daemon.sh start namenode

刷新datanode

cat $HADOOP_CONF_DIR/slaves | xargs -t -i hdfs dfsadmin -refreshNamenodes {}:50020

初始化zkfc目录

hdfs zkfc -formatZK

在两个节点上启动ZKFC

hadoop-daemon.sh start zkfc

滚动重启datanode

[Hadoop] 自定义Hadoop ACL设计

原文已经发表在数据杂货铺,转载请标注原文出处

背 景

随着集群变大,使用用户越来越多,原生的Hadoop权限控制已经不能满足用户的需求,社区也围绕着安全做了很多工作,例如原生支持的Kerberos认证和LDAP,Apache Sentry和Apache Ranger等开源项目也在不同程度上做了很多工作,但是各自都有自己的优缺点,所以按照自身需求选择不同的尤为重要。

JDH ACL设计原理

新版的Hadoop权限设计主要围绕着两方面:用户和组的关系,组和目录之间的关系,并且以组作为权限管理的单位,在新版本的Hadoop开放了INodeAttributeProvider类,允许用户去自定义权限检查的逻辑,Jdh ACL的权限控制就是基于该前提去实现用户级别的权限控制。

新的ACL并没有完全的抛弃原生Hadoop的权限检查,而是在其基础上增加了多组的权限检查规则

即使新版本的Hadoop中支持了像Linux一样的ACL(即支持一个文件可以属于多用户和多组的设置,并且可以设置多种权限,权限的设置需要手动递归,不能自动依赖与上层目录的权限),但是不好的地方在于权限不方便集中的统一管理,而且权限数据会固化到fsimage中等等,所以我们希望找到一种集中管理权限,灵活配置权限的方法,并且以插件的方式实现权限管理的方案,社区中Apache Ranger项目基本符合这种需求,但由于其并不能很好的和公司已有的系统兼容,所以我们参照Ranger的设计思路,按照适合自身的权限需求,设计了自己的一套ACL控制。

像文章开头所说,我们是以组为最小单位做权限控制的,即权限规则的检查也是基于组这个概念,用户是否能够有权限访问这个目录,也就是通过组给关联起来

从上图可以知道,他们之间的映射关系主要可以分为两部分:用户和组的映射关系,组和目录的关系

用户和组的映射关系

对于用户和组的映射关系,hadoop早已经开放接口(GroupMappingServiceProvider),允许开发者自己实现用户和组的映射关系,LdapGroupsMapping也是基于该接口实现用户和组的映射逻辑,我们自己实现了基于文件可配置的用户和组的映射,添加新的映射关系只需要在统一的权限管理界面UGDAP配置好,触发刷新即可

组和目录的关系

组和目录的映射关系和Ranger一样,采用的是JSON的格式形式存储,对于Ranger来说,多个目录对应多个用户和多个组,其中用户和组可以配置不同的权限,其称为一个Policy,而我们设计的Jdh policy为一个目录对应多个组,不同组分别可以有不同的权限,其中添加了组规则是否递归,是否是排除组的一些概念,以适应自身的需求。

JDH ACL主要一些模块的交互简图如下:

模块的交互简图中PolicyRefresher定时的加载来自UGDAP同步过来的权限策略数据,并重新初始化PolicyEngine和更新PolicyRepository,PolicyRepository会按照一定结构初始化PolicyMaps,以加快权限检查的速度。

权限校验逻辑

对于每一个JDH Policy来说,除了可以配置不同组不同权限之外,还可以配置该目录权限是否递归生效以方便用户权限的配置,另一方面可以配置排除组规则把小部分人加入到排除组中,做到对小部分人关键目录进行隔离。

在每个权限Policy中,都有一个includeMap和excludeMap去存储正常组和排除组规则,权限检查会优先检查excludeMap中的内容,如果符合排除标准,则直接返回,如果排除组检查没有符合排除的标准,则走正常组的权限检查,如果两者都没有符合的标准,则走HDFS原生的权限检查流程。

缓存策略和检查优化

为了尽可能的减少权限路径的查找和路径匹配的时间,我们在PolicyRepository初始化时,对Policy的规则进行了前部分路径的作为查找PolicyMap的key,这样的好处是当一个访问路径/user/bdp/a/b来时,就可以快速的通过key(/user/bdp/a)去找到对应的Policy,如果找不到对应的Policy则默认走Hadoop原生的权限,value则通过目录的深度进行排序,这样也就意味着只要父目录的权限符合要求,就不会检查子目录的权限要求。

对于缓存优化来说,PolicyEngine内部会维护一个固定大小的CacheMap,当进行路径匹配时,可以先从CacheMap里面lookup,看是否已经匹配成功(matchedResourceCache)或不成功的路径(notMatchResourceCache),如果有这直接返回之前的匹配结果,避免了再次匹配目录,如果没有则加入到缓存中,CacheMap通过Key的访问时间去决定这个Entry是否从CacheMap中移除。

[Hadoop] 关于Hadoop的NetworkTopology的遇到的一些问题

背景问题

问题是周末,其他团队HBase推数据变慢了,我也跟着一起找问题,现象是新加的节点的网络流量非常大,后来帮哥发现是原来没有配机架的集群,新机器配了机架,既然加了机架导致这个问题,那我们把机架变为没变机架的时候,是否就可以解决这个问题?后来发现已经加的节点无法从已经加入的机架去除,这就要找根本的问题了。

问题:为什么我更新原有DN的机架信息不生效?

关于机架

我们都知道,Hdfs会根据你配的机架,适当的分配block的分布,AM的也需要相应的机架信息去申请资源,机架信息在DN注册的会执行我们配置的机架感知的脚本,把DN对应到相应的机架当中

  /**
   * Resolve network locations for specified hosts
   *
   * @param names
   * @return Network locations if available, Else returns null
   */
  public List<String> resolveNetworkLocation(List<String> names) {
    // resolve its network location
    List<String> rName = dnsToSwitchMapping.resolve(names);
    return rName;
  }

其中,代码中的resolve方法就是调用了ScriptBasedMapping的resolve中的方法,而ScriptBasedMapping是CachedDNSToSwitchMapping的子类,在调用ScriptBasedMapping的resolve方法之前

  @Override
  public List<String> resolve(List<String> names) {
    // normalize all input names to be in the form of IP addresses
    names = NetUtils.normalizeHostNames(names);

    List <String> result = new ArrayList<String>(names.size());
    if (names.isEmpty()) {
      return result;
    }
//这里就是关键所在
    List<String> uncachedHosts = getUncachedHosts(names);

    // Resolve the uncached hosts
    List<String> resolvedHosts = rawMapping.resolve(uncachedHosts);
    //cache them
    cacheResolvedHosts(uncachedHosts, resolvedHosts);
    //now look up the entire list in the cache
    return getCachedHosts(names);

  }

他会检查这个节点是否曾经在Cache中,如果在即使你的机架信息改变了,还是会从Cache中拿第一次的机架信息,所以更改机架信息对已经注册的DN是不生效的。 对于来说,其实它也是有刷新Cache的方法的,但是他只有抛InvalidTopologyException时才会进行重新刷新

  @Override
  public void reloadCachedMappings() {
    cache.clear();
  }

DatanodeManager类里面的reloadCachedMappings

catch (InvalidTopologyException e) {
      // If the network location is invalid, clear the cached mappings
      // so that we have a chance to re-add this DataNode with the
      // correct network location later.
      List<String> invalidNodeNames = new ArrayList<String>(3);
      // clear cache for nodes in IP or Hostname
      invalidNodeNames.add(nodeReg.getIpAddr());
      invalidNodeNames.add(nodeReg.getHostName());
      invalidNodeNames.add(nodeReg.getPeerHostName());
      dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames);
      throw e;
    }

但对于NetworkTopology来说,当你移除这个机架的最后一个节点的时候,他会相应的把这个机架从NetworkTopology中移除

解决方案和结论

结论是机架信息对于新的注册的DN有效,如果你是变更了原有DN的机架信息是不生效的,只会用第一次注册的机架信息

解决方案:

如果要实现机架变更后,DN重新注册刷新机架信息也是可以的,只要自己实现DNSToSwitchMapping不加Cache就行了

[故障恢复] Active NameNode出现异常处理方案

Active NameNode出现异常处理方案

注意:仅供参考

一般来说Active NameNode出现异常可以分为两种:

  • Active NameNode出现异常宕机,导致无法正常切换
  • Active NameNode因为切换导致被killed

情景一:Active NameNode出现异常宕机

状态描述:

ANN断电、网络异常、负载过高或者机器出现异常无法连接,SNN无法转化为Active,使得HA集群无法对外服务。

解决方案:

确认原ANN已经关机或确保原ANN进程已经挂了,才能使用该命令

操作步骤:

  • 1.对Active NN关机
  • 2.确保关机完成,在SNN上执行 hdfs zkfc -formatZK,如果提示输入Y或者N,则输入Y
  • 3.直到原SNN页面确保变为Active
  • 4.启动原ANN的机器
  • 5.启动原ANN上的NameNode和zkfc进程

情景二:Active NameNode因为切换导致被killed

状态描述:

ANN因为网络或异常任务导致无响应,切换过程中导致其中一台NN被fence

解决方案

根据经验,当SNN在启动的时候和块汇报在一起会引起DN丢节点异常,所以为了避免以上问题,采取下列操作

操作步骤:

  • 先停止该节点的zkfc
  • 把挂掉的NN用防火墙关闭8021(DN汇报端口),禁止DN进行汇报,启动NN,让NN只做合并操作(集群小可以忽略)
  • 等待SNN合并完,并到等待块汇报时,打开8021端口,让DN进行块汇报

[YARN] YARN的HA是如何工作的

YARN的HA已经实现了Active和Standby的架构,即一个ResourceManager是Active,另一个ResourceManager处于Standby模式,如果Active出现问题,随时接管Active

Client_Failover

在下面你将会了解到如何配置重启ResourceManager不会丢失任务的状态,如何对ResourceManager进行切换等相关信息。

ResourceManager状态存储

RM会把其内部的状态(applications和他们的attempts,delegation token和version信息)存储到指定的ResourceManagerStateStore中,NM的节点资源状态则通过心跳定期的汇报给RM.

对于状态存储的实现现有三种:

  • 基于内存(主要用来测试)
  • 基于文件系统(Hdfs或本地)
  • 基于Zookeeper实现

对于ZKStore来说,RM的状态会不断的同步到外部的存储实现当中,其存储的目录结构如下:

RMState主要保存三个状态的数据:appState,rmSecretManagerState,amrmTokenSecretManagerState,其中appState由一个ApplicationId作为Key和ApplicationStateData做为value的TreeMap组成,每个ApplicationStateData包含了对应的apptemps的详细信息,当RM进行状态恢复时,会从zk的存储中,读取这些信息,恢复RMState

[YARN] NodeManager因为ContainerMetric导致OOM

现象描述

之前hadoop2.7.1的集群,经常运行一段时间后会触发OOM,导致上面的map需要重跑,能想到的一种方案是调整GC参数,利用GC回收器对内存进行回收,另外一种情况则感觉代码可能处理有问题。

关键日志:

2016-07-05 09:35:40,907 WARN org.apache.hadoop.ipc.Client: Unexpected error reading responses on connection Thread[IPC Client (2069725879) connection to rmhost:8031 from yarn,5,main]
java.lang.OutOfMemoryError: Java heap space
        at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:120)
        at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:68)

后来调整GC参数,追踪到底哪里出了问题,以下是参数参考

YARN_NODEMANAGER_OPTS="-Xmx2g -Xms2g -Xmn1g -XX:PermSize=128M -XX:MaxPermSize=128M -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data1/yarn-logs/nm_dump.log -Dcom.sun.management.jmxremote -Xloggc:/data1/yarn-logs/nm_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:ErrorFile=/data1/yarn-logs/nm_err_pid"

后来经过分析,确实是代码处理有问题

containerMetric占用了大部分内存

后来社区有patch可以修复HADOOP-13362

解决方案

  • 1.关掉ContainerMetric,默认是开启的(yarn.nodemanager.container-metrics.enable默认true
    改为false)
  • 2.打patch

参考工具

IBM分析工具

[Hadoop问题] IOException: Got error, status message , ack with firstBadLink as xxx.xx.xx.xx:50010

关键日志:

2016-08-24T02:00:32.362 08:00] [ERROR] hadoop.hdfs.DFSClient.closeAllFilesBeingWritten(DFSClient.java 980) [Thread-0] : Failed to close inode 22083674
java.io.IOException: Got error, status message , ack with firstBadLink as xxx.xxx.xxx.xxx:50010
at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:140)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1334)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1237)
at org.apache.hadoop.hdfs.DFSOutputStream

原因当时DN超过了每个磁盘预留的限制:

[2016-08-24T02:00:31.494+08:00] [INFO] server.datanode.DataNode.writeBlock(DataXceiver.java 837) [DataXceiver for client DFSClient_NONMAPREDUCE_103164656_1 at /172.16.183.26:29414 [Receiving block BP-964157621-172.16.172.17-1470393241028:blk_1094132102_20397583]] : opWriteBlock BP-964157621-172.16.172.17-1470393241028:blk_1094132102_20397583 received exception org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space: The volume with the most available space (=123444224 B) is less than the block size (=134217728 B).

原因是balance的速度不够迅速,有一种的解决方案,但没测过,想法是每个盘可写最多不超出128M(预留为100G),即第一个满了可以写,但不能超过一个block的大小,这样目测可以避免这样的问题。