背景

为了检验Flink版本的可靠性,我们需要对一些常见的异常进行模拟,例如模拟常见的异常:网络链接被重置的情况,这次我们模拟PG的写入可靠场景

场景模拟

我们通过JPS命令找到对应的进程号,然后通过

netstat -anp | grep <pid>

找到对应client的端口,然后进行tcpkill

tcpkill -1 -i eth0 port 47296

我们观察到,taskmanager上面的日志一直在重试一段日志

2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
    at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
    at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
    at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
    at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
    at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
    at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
...

问题分析

从日志看来,jdbc-upsert-output-format-thread-1线程在重试一次之后执行成功了,但是数据库却没有看到更新的数据,我们通过查看源码可以看出,JDBC的Connectors为了提升性能,会将数据一批的写入到数据库, 即定期的执行flush方法,我们可以详细的看下重试的逻辑

public synchronized void flush() throws Exception {
    checkFlushException();

    for (int i = 1; i <= maxRetryTimes; i++) {
        try {
            jdbcWriter.executeBatch();
            batchCount = 0;
            break;
        } catch (SQLException e) {
            LOG.error("JDBC executeBatch error, retry times = {}", i, e);
            if (i >= maxRetryTimes) {
                throw e;
            }
            Thread.sleep(1000 * i);
        }
    }
}

从代码看,好像没什么问题,那么为什么重试的成功会导致数据没有写入数据库呢?因为可以重现,这个就好定位,我们可以通过remote debug抓取调用栈的信息,然后进行调试,这里就不详细介绍了,我们可以发现他的执行流程是这样的, 当链接发生重置,在executeBatch在执行之前并没有感知到链接已经被关闭的问题,从而在执行链接检查是否关闭之前对batchStatements进行了clear,然后异常抛出来之后,在上层执行重试,这时候Statements已经被清空,导致被认为执行了一个空的语句,被认为执行成功,而上层flink也认为执行成功,导致数据被丢弃

PgPreparedStatement.executeBatch
    -> PgStatement.executeBatch()
      -> internalExecuteBatch()
         -> batchStatements.clear();
            -> connection.getAutoCommit()
           -> PgConnection.checkClosed
                    throw new PSQLException(GT.tr("This connection has been closed.")

解决方案

这里 FLINK-16281 解决问题的思路是上层增加一个CacheRows的数据结构,当执行executeBatch成功时,才对CacheRows进行清除,否则再次重试的时候,需要将CacheRows上的数据更新到数据库中,但这个issue不能解决链接被关闭的问题,所以,在发生异常的时候,可以在catch异常的时候对链接的情况进行判断,如果链接被关闭,则尝试重连, 从而增加Flink在网络异常的可靠性,可以参考这里 FLINK-16708

注意:在Flink 1.10.1中对这个模块进行了重构,代码的实现可能和这里有较大差异,但是总的来说,测试规则是一样的,这个 issue 会更新为最新版本,如果在新版本上重连机制还是不生效,最后,祝大家玩的开心~