Spark Streaming程序在cluster模式下,进行yarn kill操作时,对导致eventlog无法在stop的时候进行rename,导致spark history无法读取对应的日志

经过分析,原因时在进行kill操作时,YARN的cluster模式下,进行TERM和KILL两个信号会有250ms的时间差,意味着TERM(kill -15 pid)进行了250ms会进行KILL(kill -9 pid)的操作,TERM操作会触发Spark StreamingContext的jvm hook,之后会调用sparkContext的hook,但ss的hook会可能会等待上12s的时间

// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.
receiverJobExitLatch.await(10, TimeUnit.SECONDS)

还有在JobScheduler上

jobExecutor.awaitTermination(2, TimeUnit.SECONDS)

因此,在streaming未完全停止之前,就执行了kill -9的操作,导致程序过早退出,有可能还会导致streaming程序丢数。

因此可以配置yarn.nodemanager.sleep-delay-before-sigkill.ms(默认250ms)增加,至少为12s之上

// kill process
      if (processId != null) {
        String user = container.getUser();
        LOG.debug("Sending signal to pid " + processId
            + " as user " + user
            + " for container " + containerIdStr);

        final Signal signal = sleepDelayBeforeSigKill > 0
          ? Signal.TERM
          : Signal.KILL;

        boolean result = exec.signalContainer(new ContainerSignalContext.Builder()
                  .setContainer(container)
                  .setUser(user)
                  .setPid(processId)
                  .setSignal(signal)
                  .build()

          );

        LOG.debug("Sent signal " + signal + " to pid " + processId
          + " as user " + user
          + " for container " + containerIdStr
          + ", result=" + (result? "success" : "failed"));

        if (sleepDelayBeforeSigKill > 0) {
          new DelayedProcessKiller(container, user,
              processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
        }