[SPARK] Spark Streaming在进行YARN kill的时候无法对eventlog的inprogress进行rename
Spark Streaming程序在cluster模式下,进行yarn kill操作时,对导致eventlog无法在stop的时候进行rename,导致spark history无法读取对应的日志
经过分析,原因时在进行kill操作时,YARN的cluster模式下,进行TERM和KILL两个信号会有250ms的时间差,意味着TERM(kill -15 pid)进行了250ms会进行KILL(kill -9 pid)的操作,TERM操作会触发Spark StreamingContext的jvm hook,之后会调用sparkContext的hook,但ss的hook会可能会等待上12s的时间
// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.
receiverJobExitLatch.await(10, TimeUnit.SECONDS)
还有在JobScheduler上
jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
因此,在streaming未完全停止之前,就执行了kill -9的操作,导致程序过早退出,有可能还会导致streaming程序丢数。
因此可以配置yarn.nodemanager.sleep-delay-before-sigkill.ms(默认250ms)增加,至少为12s之上
// kill process
if (processId != null) {
String user = container.getUser();
LOG.debug("Sending signal to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr);
final Signal signal = sleepDelayBeforeSigKill > 0
? Signal.TERM
: Signal.KILL;
boolean result = exec.signalContainer(new ContainerSignalContext.Builder()
.setContainer(container)
.setUser(user)
.setPid(processId)
.setSignal(signal)
.build()
);
LOG.debug("Sent signal " + signal + " to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr
+ ", result=" + (result? "success" : "failed"));
if (sleepDelayBeforeSigKill > 0) {
new DelayedProcessKiller(container, user,
processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
}