分类 hadoop 下的文章

[Hadoop] 浅谈Hadoop Distcp工具的InputFormat

在集群迁移或者数据跨集群同步的过程中,必要少不了数据拷贝的动作,在同一个集群内,跨NameSpace的数据拷贝,你可以使用distcp,你也可以自己实现类似facebook提供的fastcopy的拷贝(社区好像没提供),那么在使用distcp工具的过程中,其中的一些参数到底影响了什么,他是一个怎样的原理,今天就和大家简单的分享下。

我们在命令行执行hadoop distcp命令回车,就会看到他所支持的很多参数,其中在命令行拷贝策略(-strategy)选项中,有两个参数可选参数:dynamic,uniformsize。在默认情况下使用的是uniformsize,含义是distcp的每个map会相对均衡去复制数据量大小的文件。

我们通过查看源码容易可以看出,除了命令行选项之外,distcp还能默认的去加载distcp-default.xml,我们可以放置到$HADOOP_CONF_DIR下,我们可以配置相对常用的参数到这个文件中。

  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
    Configuration config = new Configuration(configuration);
    config.addResource(DISTCP_DEFAULT_XML);
    setConf(config);
    this.inputOptions = inputOptions;
    this.metaFolder   = createMetaFolderPath();
  }

除此之外,我们还从默认的配置当中,看到了两个参数:

<property>
<name>distcp.dynamic.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
<description>Implementation of dynamic input format</description>
</property>

<property>
<name>distcp.static.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
<description>Implementation of static input format</description>
</property>

这个就是上述说的两种命令行策略的参数模式。通过命名可以很容易可以看出,其实这就是两个InputFormat的实现类,distcp任务(其实也就是MR任务),通过配置命令行或者参数指定使用不同的inputFormat生成不同的splits,从而实现不同的拷贝文件的逻辑。

然而,既然有两个选项,那他们的区别在哪呢?我们可以简单的看看下图的一个整体结构

DynamicInputFormat

对于DynamicInputFormat来说,有几个重要的相关的类:DynamicRecordReader,DynamicInputChunk,DynamicInputChunkContext。

对于distcp任务,会先生成一个copy-listing文件,该文件包含复制文件的列表等信息,DynamicInputFormat的getSplits方法就是将这些切分为不同chunk,然后分配到不同的task中。

在切分copy-listing文件到不同的chunk当中,其中有几个变量,numMaps和numRecords得到splitRatio的比例,也就是算出平均每个map处理多少个chunk,然后通过总的records数量,算出每个chunk中有多少条records

static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
int maxChunksIdeal = getMaxChunksIdeal(conf);
int minRecordsPerChunk = getMinRecordsPerChunk(conf);
int splitRatio = getSplitRatio(conf);

if (nMaps == 1) {
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
return 1;
}

if (nMaps > maxChunksIdeal)
return splitRatio;

int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));

return nRecordsPerChunk < minRecordsPerChunk ?
splitRatio : nPickups;
}

最终会将所有的record放到不同的chunk中,在hdfs上会在对应目录行程对应的文件类似fileList.seq.chunk.0000x:

drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248
drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir
-rw-r--r-- 1 hadoop supergroup 1504 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00002
-rw-r--r-- 1 hadoop supergroup 1486 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00003
-rw-r--r-- 1 hadoop supergroup 1646 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000000
-rw-r--r-- 1 hadoop supergroup 1524 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000001
-rw-r--r-- 1 hadoop supergroup 6686 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq
-rw-r--r-- 1 hadoop supergroup 5906 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq_sorted

然后map通过DynamicRecordReader去读取数据的时候就会将对应的chunk文件修改为task(task_1526024399954_0017_m_000000)名字,所以通过上面的文件夹输入可以看出,这时有两个map正在对数据进行拷贝,执行速度快的map会继续读取未被领取的chunk进行拷贝,这就让速度快的map可以对更多的数据进行拷贝。

UniformSizeInputFormat

对于默认的UniformSizeInputFormat,他的实现方式比DynamicInputFormat简单,原理其实就是得到总的totalSizeBytes,然后除以map数量得到平均每个map处理多少数据,然后当文件的大小加起来大于nBytesPerSplit的时候,就形成一个split,这样是希望每个map处理的数据差距不会太大。

带宽控制

带宽控制主要实现在ThrottledInputStream当中,他在hadoop除了在distcp之外,也用在了NameNode之间的FSImage传输等场景上的使用,原理就是,他继承了原有的InputStream并在每次读取的时候进行每秒获取字节的速率检查(throttle),如果超过,则进行sleep:

  /**
   * Read bytes starting from the specified position. This requires rawStream is
   * an instance of {@link PositionedReadable}.
   */
  public int read(long position, byte[] buffer, int offset, int length)
      throws IOException {
    if (!(rawStream instanceof PositionedReadable)) {
      throw new UnsupportedOperationException(
          "positioned read is not supported by the internal stream");
    }
    throttle();
    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
        offset, length);
    if (readLen != -1) {
      bytesRead += readLen;
    }
    return readLen;
  }

  private void throttle() throws IOException {
    while (getBytesPerSec() > maxBytesPerSec) {
      try {
        Thread.sleep(SLEEP_DURATION_MS);
        totalSleepTime += SLEEP_DURATION_MS;
      } catch (InterruptedException e) {
        throw new IOException("Thread aborted", e);
      }
    }
  }

  /**
   * Getter for the read-rate from this stream, since creation.
   * Calculated as bytesRead/elapsedTimeSinceStart.
   * @return Read rate, in bytes/sec.
   */
  public long getBytesPerSec() {
    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
    if (elapsed == 0) {
      return bytesRead;
    } else {
      return bytesRead / elapsed;
    }
  }

总结:

除了本文说的参数之外,我们平时在数据拷贝的过程中,我们还可以综合的通过控制map的数量,控制带宽阈值去减少这个过程对线上系统的影响,其中还有update参数等等,我们可以按照自身的业务需求去调整自身的参数,从而达到一个相对最优的数据拷贝效果。

[Hadoop] Standby NameNode性能问题排查

一、 背景描述

在集群重启了之后,运维人员在测试的时候发现集群进程负载很高,有时候进行ls会有缓慢的情况,并且 我们通过top查看到集群的SNN进程cpu消耗100%

二、 问题定位以及排查方案

查询系统占用CPU100%消耗线程很简单,用top -H -p 命令即可看到每条线程的占用,该线程的线程ID是十进制的

我们通过jstack对SNN的线程栈进行定位,将46274转化为十六进制得到b4c2的线程号,得到当时线程的栈调用,发现当时调用的线程信息,发现这个耗时主要发生在Edit log tailer线程中

然后我们查看源码,我们每次进行loadEdits的时候,都会进行countforQuota的统计,那么我们知道,NN会定时去加载Edits,那么是多久的时间呢?

我们查看EditLogTailerThread线程,查看他的时间间隔,知道我们60s执行一次EditlogTailer, 然后做一次updateCountForQuota的操作,会耗费很多时间,那么调优的时候,要么增加一定的时间间隔,降低CPU的消耗,但是会影响loadEdis的及时性,要么优化代码结构,加速计算,那么我们查找下方案

所以经过查找,社区上也有人遇到类似问题,并且有一些相关的patch,这里有一些文章

除了Patch之外,还有HDFS-8865也讨论了类似的问题,HDFS-6763其实是基于HDFS-8865的一个改进版,其核心通过构造InitQuotaTask来加速计算的过程,从而达到优化的效果,详细的学习建议参考ISSUE上面的详细讨论和方案的设计。

参考

https://issues.apache.org/jira/browse/HDFS-8865

https://issues.apache.org/jira/browse/HDFS-6763

https://tech.meituan.com/namenode_restart_optimization.html

[Hadoop] HDFS调优笔记

本文章来自 hackershell.cn,转载请标注出处

描述

这篇文章主要从一些配置设置相关方面去调优Hadoop集群的笔记,内容来自网上或一些实践经验

1.HDFS审计日志

HDFS审计日志是一个和进程分离的日志文件,默认是没有开启的,开启之后,用户的每个请求都会记录到审计日志当中,通过审计日志可以发现哪些ip,哪些用户对哪些目录做了哪些操作,比如:那些数据在哪些在什么时候删除,和分析哪些Job在密集的对NameNode进行访问,我们自己的版本中对访问记录了job的Id,在新版的HDFS中,新增加了callcontext的功能,也做了类似操作:HDFS-9184 Logging HDFS operation’s caller context into audit logs.

如何开启,修改Hadoop-env.sh

-Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender}

改为

-Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,RFAAUDIT}

对应的log4j.properties可以新增保存个数

#
# hdfs audit logging
#
hdfs.audit.logger=INFO,NullAppender
hdfs.audit.log.maxfilesize=2560MB
hdfs.audit.log.maxbackupindex=30
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.RFAAUDIT.File=/data1/hadoop-audit-logs/hdfs-audit.log
log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAAUDIT.layout.ConversionPattern=[%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX}] [%p] %c{3}.%M(%F %L) [%t] : %m%n
log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}

开启异步的审计日志

使用异步的log4j appender可以提升NameNode的性能,尤其是请求量在10000 requests/second,可以设置hdfs-site.xml

<property>
    <name>dfs.namenode.audit.log.async</name>
    <value>true</value>
  </property>

2.开启Service RPC端口

在默认情况下,service RPC端口是没有使用的,client和DataNode汇报,zkfc的健康检查都会公用RPC Server,当client的请求量比较大或者DataNode的汇报量很大,会导致他们之间相互影响,导致访问非常缓慢,开启之后,DN的汇报和健康检查请求都会走Service RPC端口,避免了因为client的大量访问影响,影响服务之间的请求,在HA集群中,可以在hdfs-site.xml中设置

<property>
    <name>dfs.namenode.servicerpc-address.mycluster.nn1</name>
    <value>mynamenode1.example.com:8021</value>
  </property>

  <property>
    <name>dfs.namenode.servicerpc-address.mycluster.nn2</name>
    <value>mynamenode2.example.com:8021</value>
  </property>

开启之后,需要重置zkfc

hdfs zkfc –formatZK

注意:

修改这个端口需要重启集群,请自行评估带来的影响

3.关闭多余的日志

有时候,NameNode上日志打印会严重影响NN的性能,出问题时也会造成没必要的干扰,所以可以修改log4j的文件,对没必要的日志进行日志级别的调整,例如

log4j.logger.BlockStateChange=WARN
log4j.logger.org.apache.hadoop.ipc.Server=WARN

社区上也有很多日志的优化方案

  • HDFS-9434
  • HADOOP-12903
  • HDFS-9941
  • HDFS-9906

4.RPC FairCallQueue

这个是基于上面第二点开启Service RPC继续说的,这是较新版本的Hadoop的新特性,RPC FairCallQueue替换了之前的单一的RPC queue的模式,RPC Server会维护并按照请求的用户进行分组,Handler会按照队列的优先级去消费queue里面的RPC Call,这个功能它可以防止因为某个用户的cleint的大量请求导致NN无法响应,整个集群瘫痪的状态,开启了之后,请求多的用户请求会被降级,这样不会造成多租户下,影响他用户的访问,后续会有文章介绍,相关的JIRA HDFS-10282

如果开启,需要修改core-site.xml

  <property>
    <name>ipc.8020.callqueue.impl</name>
    <value>org.apache.hadoop.ipc.FairCallQueue</value>
  </property>
<property>
  <name>ipc.8020.faircallqueue.decay-scheduler.period-ms</name>
  <value>60000</value>
</property>

注意

不能对DataNode和NN通信的端口进行开启

5.磁盘吞吐量

对于NameNode来说,HDFS NameNode性能也依赖于flush edit logs到磁盘的速度,任何延迟将会导致将会影响RPC的处理线程,并对Hadoop集群造成连锁的性能影响。

你应该使用专用的硬盘时存储edit logs,如果hdfs-site.xml中没有配置,将等于dfs.name.name.dir的值

 <property>
    <name>dfs.namenode.name.dir</name>
    <value>/mnt/disk1,/mnt/disk2</value>
  </property>

对于DN来说,默认的Du,会产生大量的du -sk的操作,会造成集群严重的IO Wait增加,从而导致任务会变得缓慢

负载图

产生大量的DU操作

解决方案是

将同时产生的du操作,加个随机数,随机到集群的不同时间段,并且每天只du一次,这样虽然可能会造成hdfs上显示的使用率会有延时,但基本可以满足要求HADOOP-9884

打patch之后,修改hdfs-site.xml

<property>
  <name>fs.getspaceused.jitterMillis</name>
  <value>3600000</value>
</property>
<property>
  <name>fs.du.interval</name>
  <value>86400000</value>
</property>
<property>
  <name>dfs.datanode.cached-dfsused.check.interval.ms</name>
  <value>14400000</value>
</property>

6.避免读取stale DataNodes

修改hdfs-site.xml

dfs.namenode.avoid.read.stale.datanode=true
dfs.namenode.avoid.write.stale.datanode=true

7.开启short circuit reads

开启短路读之后,当client读取数据时,如果在改节点,会直接通过文件描述符去读取文件,而不用通过tcp socket的方式

修改hdfs-site.xml

dfs.client.read.shortcircuit=true
dfs.domain.socket.path=/var/lib/hadoop-hdfs/dn_socket

8.关闭操作系统的Transparent Huge Pages (THP)

操作系统默认开启THP,会导致整个Hadoop集群cpu sys态变高,详细步骤可以参考

9.设置系统的vm.swappiness

避免使用交换区

添加vm.swappiness=0到/etc/sysctl.conf重启生效,或者sysctl -w vm.swappiness=0

10.设置系统CPU为performance

设置cpu的scaling governors为performance模式,你可以运行cpufreq-set -r -g performance或者修改/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor文件,并设置为performance

参考文章

Scaling the HDFS NameNode

OS Configurations for Better Hadoop Performance

hadoop DiskSetup

[Hadoop] Hadoop集群一般需要关注的几个重要指标

通用监控指标

对于每个RPC服务应该监控

RpcProcessingTimeAvgTime(PRC处理的平均时间)

通常hdfs在异常任务突发大量访问时,这个参数会突然变得很大,导致其他用户访问hdfs时,会感觉到卡顿,从而影响任务的执行时间

CallQueueLength(RPC Call队列的长度)

如果callqueue队列数值一直处于较高的水平,例如对于NN来说CallQueue的长度等于handler*100,也就是说NN可能收到了大量的请求或者server在处理rpc请求时耗时很长,导致call堆积等

进程JVM监控

MemHeapUsedM(堆内存使用监控)

通过监控改参数可以查看进程的gc时间和gc发生之后释放多少内存和进程的内存使用情况

ThreadsBlocked(线程阻塞数量)

分析当问题发生时进程的线程的阻塞状况

ThreadsWaiting(线程等待数量)

分析当问题发生时进程的线程的等待状况

NameNode监控指标

TotalFiles(总的文件数量)

监控和预警文件数的总量,可以通过其看出是否有任务突然大量写文件和删除大量文件

TotalBlocks(总的block数量)

表示集群的block数量,作用同上

PercentUsed(集群hdfs使用百分比)

监控集群的hdfs的使用情况,使用率不宜太高,因为需要预留磁盘空间给任务计算使用

BlockPoolUsedSpace(集群该namespace的hdfs使用容量大小)

可以监控不同namespace的hdfs的使用情况

Total(集群hdfs总容量大小)

显示集群整体容量情况

Used(集群hdfs已使用的容量大小)

集群hdfs使用情况,可以预警是否需要增加机器和删除无用数据

NumLiveDataNodes(存活的DN数量)

NumDeadDataNodes(丢失的DN数量)

丢失节点,如果过多可能会引起丢块

VolumeFailuresTotal(坏盘的数量)

应该设定阀值,达到一定数量时处理

MissingBlocks(丢失的block数量)

丢失重要的块会引起任务报错

DataNode监控指标

ReadBlockOpAvgTime(读取block的平均时间)

可选的监控选项,如果该机器在某个时段平均时间突然升高,可能网络有打满或磁盘读取速度存在问题

WriteBlockOpAvgTime(写数据块的平均时间)

可选的监控选项

ResouceManager监控指标

NumActiveNMs(NM存活节点数量监控)

NumLostNMs(NM丢失节点数量监控)

有时节点会因为磁盘空间不足等原因导致进程退出,虽然集群具有容错机制,但当丢失节点达到一定数量之后,集群计算资源相当于减少了,所以应当设置合理的阀值报警处理

NumUnhealthyNMs(NM不健康节点数量监控)

通常会因为磁盘问题导致节点不健康

集群应用数量监控

AppsSubmitted(app提交数量)

之前集群有出现过app的id号,生成很慢的情况,可以通过改数值和其他参数去判断提交减少的问题

AppsRunning(app的运行数量)

可以通过改值去对比历史同一时刻的app的运行数量是否差异很大,去判断集群到底是否可能出现问题

AppsPending(app等待数量)

如果该数值很高,或则在某个queue的该数值很高,有可能是因为app所在的队列资源满了,导致app无法获取资源,启动master,如果资源没满,可能的一个原因是app的所在队列无法在rm中有先获取资源,或资源被预留所导致等

AppsCompleted(app完成数量)

应用完成的数量监控

AppsKilled(app被kill的数量)

应用被kill的数量监控

AppsFailed(app失败数量)

如果AppsFailed数量升高,说明集群的存在导致app批量失败的操作

集群资源使用量情况监控

AllocatedMB(已分配的内存大小)

如果集群用户反应任务运行缓慢,应该及时检查队列资源的使用情况和hdfs的响应速度

AllocatedVCores(已分配的核数量)

有时任务分配不上去,有可能是核数已经用完

AllocatedContainers(已分配的Container数量)

已分配的Container数量

AvailableMB(可用的内存大小)

有遇到过在集群反复重启NM后,导致集群计算可用资源错误的bug

AvailableVCores(可能的核数量)

PendingMB(等待分配的内存大小)

PendingVCores(等待分配的核数量)

PendingContainers(等待分配的Container数量)

如果等待分配的Container比日常出现多出很多,应该检查集群是否有问题

ReservedMB(预留的内存大小)

之前遇到因为spark任务申请很大的资源,导致把整个集群的资源都预留的情况,这时应该适当的调整最大的分配Container的内存大小

ReservedVCores(预留的核数量)

同上

ReservedContainers(预留的Container数量)

Container因为资源不足,优先预留节点

集群分配数据监控

AssignContainerCallNumOps(分配Container的次数)

我们可以通过该监控可以知道RM每秒能够分配多少的Container,在高峰期是否可能存在瓶颈,经过社区的patch优化之后,RM的分配Container最大值可以达到4k+

AssignContainerCallAvgTime(分配Container的平均时间)

如果时间突然变大,说明可能遇到分配瓶颈等其他问题

ContinuousScheduleCallNumOps(连续调度次数)

如果该数值没有增加,说明连续调度线程出现问题

ContinuousScheduleCallAvgTime(连续调度平均时间)

连续调度的平均时间

NodeUpdateCallNumOps(NM心跳汇报次数)

NodeUpdateCallAvgTime(心跳汇报处理时间)

rm资源分配是通过每一次NM的心跳进行分配和领取Container的,如果该时间变长,则分配速度可能会存在下降

Linux机器监控

网络带宽情况

通过监控DN的网络情况可以查找,该节点是否在当时是热节点,一般情况下如果在该机器的网络情况已经满了,会影响任务的正常运行速度

机器负载情况

网络丢包情况

机器内存使用情况

总 结

集群监控指标属于逐渐完善的一个过程,以上分享的是一般的重要常用指标,详细的问题分析可能需要通过工具和不同指标配合才能找到问题,希望这篇文章能够给大家带来用处

[Hadoop] NameNode sshFence的一个小bug

前几天集群在发生异常切换的时候,除了了以下警告日志

[2017-01-10T01:42:37.234+08:00] [WARN] hadoop.ha.SshFenceByTcpPort.pump(StreamPumper.java 88) [nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: StreamPumper for STDERR] : nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: nc: invalid option -- 'z'
[2017-01-10T01:42:37.235+08:00] [WARN] hadoop.ha.SshFenceByTcpPort.pump(StreamPumper.java 88) [nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: StreamPumper for STDERR] : nc -z xxx-xxx-17224.hadoop.xxx.com8021 via ssh: Ncat: Try `--help' or man(1) ncat for more information, usage options and help. QUITTING.

我们知道tryGracefulFence不成功之后会去fence的那个进程NN,我们看下代码

private boolean doFence(Session session, InetSocketAddress serviceAddr)
      throws JSchException {
    int port = serviceAddr.getPort();
    try {
      //这段日志已经出现,所以忽略
      LOG.info("Looking for process running on port " + port);
      int rc = execCommand(session,
          "PATH=$PATH:/sbin:/usr/sbin fuser -v -k -n tcp " + port);
      //这段日志没出现,所以代表执行该命令返回值为rc == 1
      if (rc == 0) {
        LOG.info("Successfully killed process that was " +
            "listening on port " + port);
        // exit code 0 indicates the process was successfully killed.
        return true;
      } else if (rc == 1) {
        // exit code 1 indicates either that the process was not running
        // or that fuser didn't have root privileges in order to find it
        // (eg running as a different user)
        LOG.info(
            "Indeterminate response from trying to kill service. " +
            "Verifying whether it is running using nc...");
     //然后通过这个命令去检查端口是否还在的时候,报错了,返回2,并不是执行返回1,而是执行方法有误
        rc = execCommand(session, "nc -z " + serviceAddr.getHostName() +
            " " + serviceAddr.getPort());
        if (rc == 0) {
          // the service is still listening - we are unable to fence
          LOG.warn("Unable to fence - it is running but we cannot kill it");
          return false;
        } else {
          LOG.info("Verified that the service is down.");
          return true;          
        }
      } else {
        // other 
      }
      LOG.info("rc: " + rc);
      return rc == 0;
    } catch (InterruptedException e) {
      LOG.warn("Interrupted while trying to fence via ssh", e);
      return false;
    } catch (IOException e) {
      LOG.warn("Unknown failure while trying to fence via ssh", e);
      return false;
    }
  }

然后在CentOS7执行这个命令时

[XXX@XXX-XXX-17223 ~]$ nc -z
nc: invalid option -- 'z'
Ncat: Try `--help' or man(1) ncat for more information, usage options and help. QUITTING.
[XX@XXX-XXX-17223 ~]$ echo $?       
2

意味着这句话返回值不为0,不代表fence成功,因为操作系统的nc版本问题,也有可能不为0,而为2

rc = execCommand(session, "nc -z " + serviceAddr.getHostName() +
            " " + serviceAddr.getPort());

更严格的判断为不为1,也不能判断fence成功

➜  ~ nc -z 127.0.0.1 3200
➜  ~ echo $?             
0
➜  ~ nc -z 127.0.0.3 3200
➜  ~ echo $?             
1

改代码已经在社区被提出了,也有了解决方案,但是没被合并,详情请看HDFS-3618HDFS-11308

[Hadoop] 自定义Hadoop ACL设计

原文已经发表在数据杂货铺,转载请标注原文出处

背 景

随着集群变大,使用用户越来越多,原生的Hadoop权限控制已经不能满足用户的需求,社区也围绕着安全做了很多工作,例如原生支持的Kerberos认证和LDAP,Apache Sentry和Apache Ranger等开源项目也在不同程度上做了很多工作,但是各自都有自己的优缺点,所以按照自身需求选择不同的尤为重要。

JDH ACL设计原理

新版的Hadoop权限设计主要围绕着两方面:用户和组的关系,组和目录之间的关系,并且以组作为权限管理的单位,在新版本的Hadoop开放了INodeAttributeProvider类,允许用户去自定义权限检查的逻辑,Jdh ACL的权限控制就是基于该前提去实现用户级别的权限控制。

新的ACL并没有完全的抛弃原生Hadoop的权限检查,而是在其基础上增加了多组的权限检查规则

即使新版本的Hadoop中支持了像Linux一样的ACL(即支持一个文件可以属于多用户和多组的设置,并且可以设置多种权限,权限的设置需要手动递归,不能自动依赖与上层目录的权限),但是不好的地方在于权限不方便集中的统一管理,而且权限数据会固化到fsimage中等等,所以我们希望找到一种集中管理权限,灵活配置权限的方法,并且以插件的方式实现权限管理的方案,社区中Apache Ranger项目基本符合这种需求,但由于其并不能很好的和公司已有的系统兼容,所以我们参照Ranger的设计思路,按照适合自身的权限需求,设计了自己的一套ACL控制。

像文章开头所说,我们是以组为最小单位做权限控制的,即权限规则的检查也是基于组这个概念,用户是否能够有权限访问这个目录,也就是通过组给关联起来

从上图可以知道,他们之间的映射关系主要可以分为两部分:用户和组的映射关系,组和目录的关系

用户和组的映射关系

对于用户和组的映射关系,hadoop早已经开放接口(GroupMappingServiceProvider),允许开发者自己实现用户和组的映射关系,LdapGroupsMapping也是基于该接口实现用户和组的映射逻辑,我们自己实现了基于文件可配置的用户和组的映射,添加新的映射关系只需要在统一的权限管理界面UGDAP配置好,触发刷新即可

组和目录的关系

组和目录的映射关系和Ranger一样,采用的是JSON的格式形式存储,对于Ranger来说,多个目录对应多个用户和多个组,其中用户和组可以配置不同的权限,其称为一个Policy,而我们设计的Jdh policy为一个目录对应多个组,不同组分别可以有不同的权限,其中添加了组规则是否递归,是否是排除组的一些概念,以适应自身的需求。

JDH ACL主要一些模块的交互简图如下:

模块的交互简图中PolicyRefresher定时的加载来自UGDAP同步过来的权限策略数据,并重新初始化PolicyEngine和更新PolicyRepository,PolicyRepository会按照一定结构初始化PolicyMaps,以加快权限检查的速度。

权限校验逻辑

对于每一个JDH Policy来说,除了可以配置不同组不同权限之外,还可以配置该目录权限是否递归生效以方便用户权限的配置,另一方面可以配置排除组规则把小部分人加入到排除组中,做到对小部分人关键目录进行隔离。

在每个权限Policy中,都有一个includeMap和excludeMap去存储正常组和排除组规则,权限检查会优先检查excludeMap中的内容,如果符合排除标准,则直接返回,如果排除组检查没有符合排除的标准,则走正常组的权限检查,如果两者都没有符合的标准,则走HDFS原生的权限检查流程。

缓存策略和检查优化

为了尽可能的减少权限路径的查找和路径匹配的时间,我们在PolicyRepository初始化时,对Policy的规则进行了前部分路径的作为查找PolicyMap的key,这样的好处是当一个访问路径/user/bdp/a/b来时,就可以快速的通过key(/user/bdp/a)去找到对应的Policy,如果找不到对应的Policy则默认走Hadoop原生的权限,value则通过目录的深度进行排序,这样也就意味着只要父目录的权限符合要求,就不会检查子目录的权限要求。

对于缓存优化来说,PolicyEngine内部会维护一个固定大小的CacheMap,当进行路径匹配时,可以先从CacheMap里面lookup,看是否已经匹配成功(matchedResourceCache)或不成功的路径(notMatchResourceCache),如果有这直接返回之前的匹配结果,避免了再次匹配目录,如果没有则加入到缓存中,CacheMap通过Key的访问时间去决定这个Entry是否从CacheMap中移除。

[Hadoop] 关于Hadoop的NetworkTopology的遇到的一些问题

背景问题

问题是周末,其他团队HBase推数据变慢了,我也跟着一起找问题,现象是新加的节点的网络流量非常大,后来帮哥发现是原来没有配机架的集群,新机器配了机架,既然加了机架导致这个问题,那我们把机架变为没变机架的时候,是否就可以解决这个问题?后来发现已经加的节点无法从已经加入的机架去除,这就要找根本的问题了。

问题:为什么我更新原有DN的机架信息不生效?

关于机架

我们都知道,Hdfs会根据你配的机架,适当的分配block的分布,AM的也需要相应的机架信息去申请资源,机架信息在DN注册的会执行我们配置的机架感知的脚本,把DN对应到相应的机架当中

  /**
   * Resolve network locations for specified hosts
   *
   * @param names
   * @return Network locations if available, Else returns null
   */
  public List<String> resolveNetworkLocation(List<String> names) {
    // resolve its network location
    List<String> rName = dnsToSwitchMapping.resolve(names);
    return rName;
  }

其中,代码中的resolve方法就是调用了ScriptBasedMapping的resolve中的方法,而ScriptBasedMapping是CachedDNSToSwitchMapping的子类,在调用ScriptBasedMapping的resolve方法之前

  @Override
  public List<String> resolve(List<String> names) {
    // normalize all input names to be in the form of IP addresses
    names = NetUtils.normalizeHostNames(names);

    List <String> result = new ArrayList<String>(names.size());
    if (names.isEmpty()) {
      return result;
    }
//这里就是关键所在
    List<String> uncachedHosts = getUncachedHosts(names);

    // Resolve the uncached hosts
    List<String> resolvedHosts = rawMapping.resolve(uncachedHosts);
    //cache them
    cacheResolvedHosts(uncachedHosts, resolvedHosts);
    //now look up the entire list in the cache
    return getCachedHosts(names);

  }

他会检查这个节点是否曾经在Cache中,如果在即使你的机架信息改变了,还是会从Cache中拿第一次的机架信息,所以更改机架信息对已经注册的DN是不生效的。 对于来说,其实它也是有刷新Cache的方法的,但是他只有抛InvalidTopologyException时才会进行重新刷新

  @Override
  public void reloadCachedMappings() {
    cache.clear();
  }

DatanodeManager类里面的reloadCachedMappings

catch (InvalidTopologyException e) {
      // If the network location is invalid, clear the cached mappings
      // so that we have a chance to re-add this DataNode with the
      // correct network location later.
      List<String> invalidNodeNames = new ArrayList<String>(3);
      // clear cache for nodes in IP or Hostname
      invalidNodeNames.add(nodeReg.getIpAddr());
      invalidNodeNames.add(nodeReg.getHostName());
      invalidNodeNames.add(nodeReg.getPeerHostName());
      dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames);
      throw e;
    }

但对于NetworkTopology来说,当你移除这个机架的最后一个节点的时候,他会相应的把这个机架从NetworkTopology中移除

解决方案和结论

结论是机架信息对于新的注册的DN有效,如果你是变更了原有DN的机架信息是不生效的,只会用第一次注册的机架信息

解决方案:

如果要实现机架变更后,DN重新注册刷新机架信息也是可以的,只要自己实现DNSToSwitchMapping不加Cache就行了

[Hadoop问题] IOException: Got error, status message , ack with firstBadLink as xxx.xx.xx.xx:50010

关键日志:

2016-08-24T02:00:32.362 08:00] [ERROR] hadoop.hdfs.DFSClient.closeAllFilesBeingWritten(DFSClient.java 980) [Thread-0] : Failed to close inode 22083674
java.io.IOException: Got error, status message , ack with firstBadLink as xxx.xxx.xxx.xxx:50010
at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:140)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1334)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1237)
at org.apache.hadoop.hdfs.DFSOutputStream

原因当时DN超过了每个磁盘预留的限制:

[2016-08-24T02:00:31.494+08:00] [INFO] server.datanode.DataNode.writeBlock(DataXceiver.java 837) [DataXceiver for client DFSClient_NONMAPREDUCE_103164656_1 at /172.16.183.26:29414 [Receiving block BP-964157621-172.16.172.17-1470393241028:blk_1094132102_20397583]] : opWriteBlock BP-964157621-172.16.172.17-1470393241028:blk_1094132102_20397583 received exception org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space: The volume with the most available space (=123444224 B) is less than the block size (=134217728 B).

原因是balance的速度不够迅速,有一种的解决方案,但没测过,想法是每个盘可写最多不超出128M(预留为100G),即第一个满了可以写,但不能超过一个block的大小,这样目测可以避免这样的问题。

[HADOOP] 慢任务排查问题集合

1.某些任务因为一个task变慢导致整个job变慢

场景:

之前遇到有些任务的map执行很慢,然后发现在执行任务时读取某些文件变慢,但就是不知道慢在哪,这时我们可以在那台机器,打开debug日志

export HADOOP_ROOT_LOGGER=DEBUG,console

然后用hdfs dfs -get /path/to/yourFIle就可以详细的看到他是链接到哪台DN导致响应缓慢,然后就可以登陆机器排查改DN的网络是否流量过高,机器负载等相关信息

解决方案

把该DN临时停止,读取数据时连接到其他副本的DN

[HADOOP问题] DataNode日志中出现Input/output error

场景是这样的,有些用户因为某些原因将几个目录的副本从3降为1(一般情况下是不建议这么做的),后来出现丢块后,在把副本升为2了,后来还是出现文件无法读取这种情况:

****/user/username/fileName-000xx.lzo                                                      
0. BP-1422437282658:blk_1344672165_270983563 len=134217728 repl=2 [host1:50010, host3:50010]
1. BP-1422437282658:blk_1344672816_270984214 len=134217728 repl=1 [host11:50010]
2. BP-1422437282658:blk_1344672898_270984296 len=134217728 repl=2 [host19:50010, host16:50010]

对于NameNode来说,他是感知这个块是正常的,从datanode的日志看出,该datanode一直进行块复制操作,但每次想把这个块复制出去时,却出现了以下异常:

java.io.IOException: Input/output error
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
at org.apache.hadoop.net.SocketOutputStream.transferToFully(SocketOutputStream.java:223)

然后我们通过locate这个块所在的目录,进行了scp操作,还是出现了Input/output error,意味着这个块是无法完整读取的,所以我们进行了fsck了那个磁盘进行修复,因为磁盘问题导致无法读取,所以对这个并无太大影响,毕竟副本为1是风险很大的,所以那个文件是恢复不了了。

另外一个场景是集群有些datanode因为报block pool BP-1337805335-XXX-1422437282658 is not found,造成在联邦模式下有些NS显示为live节点,有些NS为dead节点,但是block pool在那个data盘是存在的

2016-01-15 21:30:48,225 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in offerService
java.io.IOException: block pool BP-1337805335-XXX-1422437282658 is not found
at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl.getBlockPoolSlice(FsVolumeImpl.java:122)
at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl.getBlockPoolUsed(FsVolumeImpl.java:92)
at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList.getBlockPoolUsed(FsVolumeList.java:74)

这些节点的数据量比较大,因为死了比较久,而且副本为3的情况下,把一个DataNode停止是不会影响集群的,后来进行了fsck的操作,发现一个盘有问题,并修复了,后来启动datanode,节点也再也没有死了,fsck这个命令请在专业的人指导下面操作把,毕竟数据安全为最重要。