2020年六月月 发布的文章

[Hadoop] 浅谈Hadoop Distcp工具的InputFormat

在集群迁移或者数据跨集群同步的过程中,必要少不了数据拷贝的动作,在同一个集群内,跨NameSpace的数据拷贝,你可以使用distcp,你也可以自己实现类似facebook提供的fastcopy的拷贝(社区好像没提供),那么在使用distcp工具的过程中,其中的一些参数到底影响了什么,他是一个怎样的原理,今天就和大家简单的分享下。

我们在命令行执行hadoop distcp命令回车,就会看到他所支持的很多参数,其中在命令行拷贝策略(-strategy)选项中,有两个参数可选参数:dynamic,uniformsize。在默认情况下使用的是uniformsize,含义是distcp的每个map会相对均衡去复制数据量大小的文件。

我们通过查看源码容易可以看出,除了命令行选项之外,distcp还能默认的去加载distcp-default.xml,我们可以放置到$HADOOP_CONF_DIR下,我们可以配置相对常用的参数到这个文件中。

  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
    Configuration config = new Configuration(configuration);
    config.addResource(DISTCP_DEFAULT_XML);
    setConf(config);
    this.inputOptions = inputOptions;
    this.metaFolder   = createMetaFolderPath();
  }

除此之外,我们还从默认的配置当中,看到了两个参数:

<property>
<name>distcp.dynamic.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
<description>Implementation of dynamic input format</description>
</property>

<property>
<name>distcp.static.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
<description>Implementation of static input format</description>
</property>

这个就是上述说的两种命令行策略的参数模式。通过命名可以很容易可以看出,其实这就是两个InputFormat的实现类,distcp任务(其实也就是MR任务),通过配置命令行或者参数指定使用不同的inputFormat生成不同的splits,从而实现不同的拷贝文件的逻辑。

然而,既然有两个选项,那他们的区别在哪呢?我们可以简单的看看下图的一个整体结构

DynamicInputFormat

对于DynamicInputFormat来说,有几个重要的相关的类:DynamicRecordReader,DynamicInputChunk,DynamicInputChunkContext。

对于distcp任务,会先生成一个copy-listing文件,该文件包含复制文件的列表等信息,DynamicInputFormat的getSplits方法就是将这些切分为不同chunk,然后分配到不同的task中。

在切分copy-listing文件到不同的chunk当中,其中有几个变量,numMaps和numRecords得到splitRatio的比例,也就是算出平均每个map处理多少个chunk,然后通过总的records数量,算出每个chunk中有多少条records

static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
int maxChunksIdeal = getMaxChunksIdeal(conf);
int minRecordsPerChunk = getMinRecordsPerChunk(conf);
int splitRatio = getSplitRatio(conf);

if (nMaps == 1) {
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
return 1;
}

if (nMaps > maxChunksIdeal)
return splitRatio;

int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));

return nRecordsPerChunk < minRecordsPerChunk ?
splitRatio : nPickups;
}

最终会将所有的record放到不同的chunk中,在hdfs上会在对应目录行程对应的文件类似fileList.seq.chunk.0000x:

drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248
drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir
-rw-r--r-- 1 hadoop supergroup 1504 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00002
-rw-r--r-- 1 hadoop supergroup 1486 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00003
-rw-r--r-- 1 hadoop supergroup 1646 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000000
-rw-r--r-- 1 hadoop supergroup 1524 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000001
-rw-r--r-- 1 hadoop supergroup 6686 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq
-rw-r--r-- 1 hadoop supergroup 5906 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq_sorted

然后map通过DynamicRecordReader去读取数据的时候就会将对应的chunk文件修改为task(task_1526024399954_0017_m_000000)名字,所以通过上面的文件夹输入可以看出,这时有两个map正在对数据进行拷贝,执行速度快的map会继续读取未被领取的chunk进行拷贝,这就让速度快的map可以对更多的数据进行拷贝。

UniformSizeInputFormat

对于默认的UniformSizeInputFormat,他的实现方式比DynamicInputFormat简单,原理其实就是得到总的totalSizeBytes,然后除以map数量得到平均每个map处理多少数据,然后当文件的大小加起来大于nBytesPerSplit的时候,就形成一个split,这样是希望每个map处理的数据差距不会太大。

带宽控制

带宽控制主要实现在ThrottledInputStream当中,他在hadoop除了在distcp之外,也用在了NameNode之间的FSImage传输等场景上的使用,原理就是,他继承了原有的InputStream并在每次读取的时候进行每秒获取字节的速率检查(throttle),如果超过,则进行sleep:

  /**
   * Read bytes starting from the specified position. This requires rawStream is
   * an instance of {@link PositionedReadable}.
   */
  public int read(long position, byte[] buffer, int offset, int length)
      throws IOException {
    if (!(rawStream instanceof PositionedReadable)) {
      throw new UnsupportedOperationException(
          "positioned read is not supported by the internal stream");
    }
    throttle();
    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
        offset, length);
    if (readLen != -1) {
      bytesRead += readLen;
    }
    return readLen;
  }

  private void throttle() throws IOException {
    while (getBytesPerSec() > maxBytesPerSec) {
      try {
        Thread.sleep(SLEEP_DURATION_MS);
        totalSleepTime += SLEEP_DURATION_MS;
      } catch (InterruptedException e) {
        throw new IOException("Thread aborted", e);
      }
    }
  }

  /**
   * Getter for the read-rate from this stream, since creation.
   * Calculated as bytesRead/elapsedTimeSinceStart.
   * @return Read rate, in bytes/sec.
   */
  public long getBytesPerSec() {
    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
    if (elapsed == 0) {
      return bytesRead;
    } else {
      return bytesRead / elapsed;
    }
  }

总结:

除了本文说的参数之外,我们平时在数据拷贝的过程中,我们还可以综合的通过控制map的数量,控制带宽阈值去减少这个过程对线上系统的影响,其中还有update参数等等,我们可以按照自身的业务需求去调整自身的参数,从而达到一个相对最优的数据拷贝效果。

[Hadoop] Standby NameNode性能问题排查

一、 背景描述

在集群重启了之后,运维人员在测试的时候发现集群进程负载很高,有时候进行ls会有缓慢的情况,并且 我们通过top查看到集群的SNN进程cpu消耗100%

二、 问题定位以及排查方案

查询系统占用CPU100%消耗线程很简单,用top -H -p 命令即可看到每条线程的占用,该线程的线程ID是十进制的

我们通过jstack对SNN的线程栈进行定位,将46274转化为十六进制得到b4c2的线程号,得到当时线程的栈调用,发现当时调用的线程信息,发现这个耗时主要发生在Edit log tailer线程中

然后我们查看源码,我们每次进行loadEdits的时候,都会进行countforQuota的统计,那么我们知道,NN会定时去加载Edits,那么是多久的时间呢?

我们查看EditLogTailerThread线程,查看他的时间间隔,知道我们60s执行一次EditlogTailer, 然后做一次updateCountForQuota的操作,会耗费很多时间,那么调优的时候,要么增加一定的时间间隔,降低CPU的消耗,但是会影响loadEdis的及时性,要么优化代码结构,加速计算,那么我们查找下方案

所以经过查找,社区上也有人遇到类似问题,并且有一些相关的patch,这里有一些文章

除了Patch之外,还有HDFS-8865也讨论了类似的问题,HDFS-6763其实是基于HDFS-8865的一个改进版,其核心通过构造InitQuotaTask来加速计算的过程,从而达到优化的效果,详细的学习建议参考ISSUE上面的详细讨论和方案的设计。

参考

https://issues.apache.org/jira/browse/HDFS-8865

https://issues.apache.org/jira/browse/HDFS-6763

https://tech.meituan.com/namenode_restart_optimization.html

[Flink] 记 Flink 1.10.0 JDBC Connectors重连失效分析

背景

为了检验Flink版本的可靠性,我们需要对一些常见的异常进行模拟,例如模拟常见的异常:网络链接被重置的情况,这次我们模拟PG的写入可靠场景

场景模拟

我们通过JPS命令找到对应的进程号,然后通过

netstat -anp | grep <pid>

找到对应client的端口,然后进行tcpkill

tcpkill -1 -i eth0 port 47296

我们观察到,taskmanager上面的日志一直在重试一段日志

2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
    at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
    at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
    at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
    at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
    at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
    at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
...

问题分析

从日志看来,jdbc-upsert-output-format-thread-1线程在重试一次之后执行成功了,但是数据库却没有看到更新的数据,我们通过查看源码可以看出,JDBC的Connectors为了提升性能,会将数据一批的写入到数据库, 即定期的执行flush方法,我们可以详细的看下重试的逻辑

public synchronized void flush() throws Exception {
    checkFlushException();

    for (int i = 1; i <= maxRetryTimes; i++) {
        try {
            jdbcWriter.executeBatch();
            batchCount = 0;
            break;
        } catch (SQLException e) {
            LOG.error("JDBC executeBatch error, retry times = {}", i, e);
            if (i >= maxRetryTimes) {
                throw e;
            }
            Thread.sleep(1000 * i);
        }
    }
}

从代码看,好像没什么问题,那么为什么重试的成功会导致数据没有写入数据库呢?因为可以重现,这个就好定位,我们可以通过remote debug抓取调用栈的信息,然后进行调试,这里就不详细介绍了,我们可以发现他的执行流程是这样的, 当链接发生重置,在executeBatch在执行之前并没有感知到链接已经被关闭的问题,从而在执行链接检查是否关闭之前对batchStatements进行了clear,然后异常抛出来之后,在上层执行重试,这时候Statements已经被清空,导致被认为执行了一个空的语句,被认为执行成功,而上层flink也认为执行成功,导致数据被丢弃

PgPreparedStatement.executeBatch
    -> PgStatement.executeBatch()
      -> internalExecuteBatch()
         -> batchStatements.clear();
            -> connection.getAutoCommit()
           -> PgConnection.checkClosed
                    throw new PSQLException(GT.tr("This connection has been closed.")

解决方案

这里 FLINK-16281 解决问题的思路是上层增加一个CacheRows的数据结构,当执行executeBatch成功时,才对CacheRows进行清除,否则再次重试的时候,需要将CacheRows上的数据更新到数据库中,但这个issue不能解决链接被关闭的问题,所以,在发生异常的时候,可以在catch异常的时候对链接的情况进行判断,如果链接被关闭,则尝试重连, 从而增加Flink在网络异常的可靠性,可以参考这里 FLINK-16708

注意:在Flink 1.10.1中对这个模块进行了重构,代码的实现可能和这里有较大差异,但是总的来说,测试规则是一样的,这个 issue 会更新为最新版本,如果在新版本上重连机制还是不生效,最后,祝大家玩的开心~

[JVM] 如何通过jcmd画火焰图

火焰图是我们日常的比较好用的一个性能分析工具,通过火焰图我们可以分析进程在哪些方法耗时较长,通过分析热点代码调用,找出可以优化的性能瓶颈

jcmd能做什么

在JDK 1.7的版本之后,新增了一个JVM相关的诊断工具jcmd,它可以用来进行JVM堆、线程信息的导出,进行采样分析,这里只简单的介绍它的功能,具体深入的使用方式可以查看文档

如何使用,执行jcmd -h

  PerfCounter.print display the counters exposed by this process
  -f  read and execute commands from the file
  -l  list JVM processes on the local machine
  -h  this help

查看本机的JVM进程jcmd -l

54769 org.apache.hadoop.hdfs.server.datanode.DataNode
54870 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
54665 org.apache.hadoop.hdfs.server.namenode.NameNode
54987 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
55325 sun.tools.jcmd.JCmd -l
55070 org.apache.hadoop.yarn.server.nodemanager.NodeManager

查看制定进程的性能统计jcmd 54665 PerfCounter.print

54665:
java.ci.totalTime=4754838518
java.cls.loadedClasses=4867
java.cls.sharedLoadedClasses=0
java.cls.sharedUnloadedClasses=0
java.cls.unloadedClasses=0
...

列出当前进行可以执行的操作jcmd 54665 help

54665:
The following commands are available:
JFR.stop
JFR.start
JFR.dump
JFR.check
VM.native_memory
VM.check_commercial_features
VM.unlock_commercial_features
ManagementAgent.stop
ManagementAgent.start_local
ManagementAgent.start
GC.rotate_log
Thread.print
GC.class_stats
GC.class_histogram
GC.heap_dump
GC.run_finalization
GC.run
VM.uptime
VM.flags
VM.system_properties
VM.command_line
VM.version
help

查看对应的命令怎么使用jcmd 54665 help VM.unlock_commercial_features

54665:
VM.unlock_commercial_features
Unlock commercial features

Impact: Low: No impact

Permission: java.lang.management.ManagementPermission(control)

Syntax: VM.unlock_commercial_features

对线程进行dump操作jcmd 54665 Thread.print

54665:
2020-06-03 20:45:23
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):

"Attach Listener" #49 daemon prio=9 os_prio=31 tid=0x00007f9b5673b000 nid=0xa50b waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CacheReplicationMonitor(1004132509)" #48 prio=5 os_prio=31 tid=0x00007f9b58b7d800 nid=0x8d03 waiting on condition [0x000070000a0de000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
...

如何通过jcmd画火焰图

通过上面的简单的jcmd的介绍,我们已经可以简单的使用jcmd了,那么我们如何通过jcmd工具去生成我们想要分析进程的火焰图呢?

第一步解锁jdk的商业特性

jcmd pid VM.unlock_commercial_features

启动JFR(Java Flight Recorder),收集300s的JVM信息,得到output.jfr文件

jcmd pid JFR.start name=test duration=300s settings=profile.jfc filename=output.jfr

下载FlameGraph项目

git clone https://github.com/brendangregg/FlameGraph.git

下载jfr-flame-graph项目进行构建

git clone https://github.com/chrishantha/jfr-flame-graph.git

生成火焰图

export FLAMEGRAPH_DIR=~/FlameGraph
cd ~/jfr-flame-graph/build/install/jfr-flame-graph/bin
./create_flamegraph.sh -f ~/output.jfr -i > ~/flamegraph.svg

然后用浏览器打开,就可以点击对应的调用栈了

除此之外,其实也有软件Java Mission Control可以直接打开output.jfr文件,例如

点击代码,调用树,进行对应的分析

参考文章

  1. JVM性能调优工具之jcmd

  2. GC日志分析网站:gceasy.io