当 snapshot 失败时发生了什么
时间:2022-07-26
本文章向大家介绍当 snapshot 失败时发生了什么,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
工作中遇到了与 snapshot 异常相关的问题,特此总结一下,与 snapshot 相关的流程图如下:
当调用 AbstractUdfStreamOperator.snapshotState 方法时,实际上调用了
public static void snapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
if (trySnapshotFunctionState(context, backend, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
private static boolean trySnapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
// 调用 checkpoint function 的 snapshotState 方法
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).snapshotState(context);
return true;
}
......
当用户定义的 snapshotState 方法向外抛异常时,异常会一直上抛至 Task.triggerCheckpointBarrier 方法
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
//实际上就是 StreamTask Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
// source ->flatMap
// invokable 实际上是 operator chain
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) {
// build a local closure
final String taskName = taskNameWithSubtask;
final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
Runnable runnable = new Runnable() {
@Override
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
try {
// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
// only 做 checkpoint 的异常
// 当 checkpoint 发生异常时,ExecutionState 会转化为 FAILED 会导致重启
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
}
} catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
} finally {
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
}
}
};
executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
} else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
}
}
其中关键性的方法实际上是
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
而此方法调用了
cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
查看细节
private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
while (true) {
ExecutionState current = executionState;
// if the task is already canceled (or canceling) or finished or failed,
// then we need not do anything
if (current.isTerminal() || current == ExecutionState.CANCELING) {
LOG.info("Task {} is already in state {}", taskNameWithSubtask, current);
return;
}
if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
if (transitionState(current, targetState, cause)) {
// if we manage this state transition, then the invokable gets never called
// we need not call cancel on it
this.failureCause = cause;
return;
}
} else if (current == ExecutionState.RUNNING) {
if (transitionState(ExecutionState.RUNNING, targetState, cause)) {
// we are canceling / failing out of the running state
// we need to cancel the invokable
// copy reference to guard against concurrent null-ing out the reference
final AbstractInvokable invokable = this.invokable;
if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
this.failureCause = cause;
LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
// because the canceling may block on user code, we cancel from a separate thread
// we do not reuse the async call handler, because that one may be blocked, in which
// case the canceling could not continue
// The canceller calls cancel and interrupts the executing thread once
Runnable canceler = new TaskCanceler(
LOG,
invokable,
executingThread,
taskNameWithSubtask,
producedPartitions,
inputGates);
Thread cancelThread = new Thread(
executingThread.getThreadGroup(),
canceler,
String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
cancelThread.setDaemon(true);
cancelThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
cancelThread.start();
// the periodic interrupting thread - a different thread than the canceller, in case
// the application code does blocking stuff in its cancellation paths.
if (invokable.shouldInterruptOnCancel()) {
Runnable interrupter = new TaskInterrupter(
LOG,
invokable,
executingThread,
taskNameWithSubtask,
taskCancellationInterval);
Thread interruptingThread = new Thread(
executingThread.getThreadGroup(),
interrupter,
String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
interruptingThread.setDaemon(true);
interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
interruptingThread.start();
}
// if a cancellation timeout is set, the watchdog thread kills the process
// if graceful cancellation does not succeed
if (taskCancellationTimeout > 0) {
Runnable cancelWatchdog = new TaskCancelerWatchDog(
executingThread,
taskManagerActions,
taskCancellationTimeout,
LOG);
Thread watchDogThread = new Thread(
executingThread.getThreadGroup(),
cancelWatchdog,
String.format("Cancellation Watchdog for %s (%s).",
taskNameWithSubtask, executionId));
watchDogThread.setDaemon(true);
watchDogThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
watchDogThread.start();
}
}
return;
}
} else {
throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
current, taskNameWithSubtask, executionId));
}
}
}
主要就是将 ExecutionState 转化为 FAILED,然后进行一系列的取消操作。由于 ExecutionState 转为 FAILED,会触发 flink 的重启机制,若无重启机制,则直接失败。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- laravel5.6实现数值转换
- python中return不返回值的问题解析
- php装饰者模式简单应用案例分析
- php常用日期时间函数实例小结
- PHP超级全局变量【$GLOBALS,$_SERVER,$_REQUEST等】用法实例分析
- 基于laravel缓冲cache的用法详解
- Python使用sys.exc_info()方法获取异常信息
- laravel通用化的CURD的实现
- Laravel修改验证提示信息为中文的示例
- PHP+redis实现微博的推模型案例分析
- Laravel 解决composer相关操作提示php相关异常的问题
- laravel 实现根据字段不同值做不同查询
- php实现获取近几日、月时间示例
- PHP+redis实现微博的拉模型案例详解
- PHP实现微信申请退款功能