Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This FLIP proposes a pluggable interface that allows users to implement custom failure enrichers using Flink’s generic plugin framework.
Failure enrichers are holding custom user logic and may also emit pre-defined labels (string kv pairs) that are exposed via the Job Masters’ REST interface.
This enables use-cases like: assigning particular types to failures (e.g., User or System), emitting custom metrics per type (e.g., application or platform), exposing failures to downstream consumers (e.g., notification systems).
Public interfaces
The public facing changes this FLIP introduces include:
A config option for FailureEnrichers.
Key: jobmanager.failure-enrichers
Default: empty
Description: An optional list of failure enricher names. If empty, NO failure enrichers will be started. If configured, only enrichers whose name matches any of the names in the list will be started.
Example:jobmanager.failure-enrichers = org.apache.flink.test.plugin.jar.failure.TypeFailureEnricher
Introducing FailureEnricher/Context and FailureEnricherFactory interfaces for defining and loading (using the plugin manager) custom Enricher implementations respectively.
/** * Failure Enricher enabling custom logic and attaching metadata in the form of labels to each type * of failure as tracked in the job manager. */ @Experimental public interface FailureEnricher { /** * Method to list all the label Keys the enricher can associate with Values in case of a failure * {@code processFailure}. Note that Keys must unique and properly defined per enricher * implementation otherwise will be ignored. * * @return the unique label Keys of the FailureEnricher */ Set<String> getOutputKeys(); /** * Method to handle a failure as part of the enricher and optionally return a map of KV pairs * (labels). Note that Values should only be associated with Keys from {@code getOutputKeys} * method otherwise will be ignored. * * @param cause the exception that caused this failure * @param context the context that includes extra information (e.g., if it was a global failure) * @return map of KV pairs (labels) associated with the failure */ CompletableFuture<Map<String, String>> processFailure( final Throwable cause, final Context context); /** * An interface used by the {@link FailureEnricher}. Context includes an executor pool for the * enrichers to run heavy operations, the Classloader used for code gen, and other metadata. */ @Experimental interface Context { /** Type of failure. */ enum FailureType { /* happened in the scheduler context */ GLOBAL, /* happened in the task manager context */ LOCAL, /* caused by task manager disconnection/HB timeout */ DISCONNECT } /** * Return the type of the failure e.g., global failure that happened in the scheduler * context. * * @return FailureType */ FailureType getFailureType(); /** * Get the user {@link ClassLoader} used for code generation, UDF loading and other * operations requiring reflections on user code. * * @return the user ClassLoader */ ClassLoader getUserClassLoader(); /** * Get an Executor pool for the Enrichers to run async operations that can potentially be * IO-heavy. * * @return the Executor pool */ Executor getIOExecutor(); } }
/** Factory class for creating {@link FailureEnricher}. */ @Experimental public interface FailureEnricherFactory extends Plugin { /** * Construct a FailureEnricher. * * @param conf configuration for this failure enricher * @param jobId the ID of the job * @param jobName the name of the job * @param metricGroup the metric group of the JobMaster * @return the FailureEnricher */ FailureEnricher createFailureEnricher( Configuration conf, JobID jobId, String jobName, MetricGroup metricGroup); }
Runtime REST API snaphot update -- job exceptions now include the labels emitted by failure enricher implementations.
GET /jobs/:jobid/exceptions RESPONSE BODY: { ... "exceptionHistory": { "type": "object", "id": "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory", "properties": { "entries": { "type": "array", "items": { "type": "object", "id": "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:RootExceptionInfo", "properties": { "exceptionName": { "type": "string" }, "stacktrace": { "type": "string" }, "timestamp": { "type": "integer" }, "taskName": { "type": "string" }, "location": { "type": "string" }, "taskManagerId": { "type": "string" }, "labels": { "type": "object", "additionalProperties": { "type": "string" } } ... } } } } ... } }
Proposed Changes
Overview
Currently, all local/disconnect streaming task failures either from the Adaptive or the Default scheduler, go through the JobMaster and eventually trigger the ExecutionFailureHandler.
The handler returns a FailureHandlingResult containing the reason of the failure (as a Throwable) and the vertices to restart if the failure is recoverable.
FailureHandlingResults are then stored as a snapshot and exposed as part of the ExceptionHistory through the JobMaster (UI and REST) as well as the the HistoryServer when enabled.
In similar manner, global failures (failures that happened in the scheduler context) go through the GlobalFailureHandler create FailureHandlingResults and then end-up getting stored as a part of a snapshot.
This FLIP proposes the addition of a pluggable interface that will allow users to add custom logic and optionally enrich such failures with custom metadata labels.
In short every Enricher:
- Is triggered on every global/local/disconnect failure
- Receives a Throwable cause and an immutable Context
- Performs asynchronous execution (separate IoExecutor) to avoid blocking the main thread for RPCs
- Is completely independent from other Enrichers
- Emits failure labels/tags for its unique, pre-defined keys (defined at startup time – see getOutputKeys() above)
Implementation
Flink core
We first introduce FailureEnricher/Context and FailureEnricherFactory as described in the Public Interfaces section above.
Flink runtime
Then as part of the runtime, we provide a default implementation of the Context that is passed down to the FailureEnrichers -- it is holding an executor pool for the enrichers to run heavy operations, the user Classloader, and the type of error.
/** The default implementation of {@link Context} class. */ public class DefaultFailureEnricherContext implements FailureEnricher.Context { private final Executor ioExecutor; private final ClassLoader userClassLoader; private final FailureType failureType; private DefaultFailureEnricherContext( FailureType failureType, Executor ioExecutor, ClassLoader classLoader) { this.failureType = failureType; this.ioExecutor = checkNotNull(ioExecutor); this.userClassLoader = classLoader; } @Override public FailureType getFailureType() { return failureType; } @Override public ClassLoader getUserClassLoader() { return this.userClassLoader; } @Override public Executor getIOExecutor() { return ioExecutor; } /** Factory method returning a Local failure Context for the given params. */ public static Context forLocalFailure(Executor ioExecutor, ClassLoader classLoader) { return new DefaultFailureEnricherContext(FailureType.LOCAL, ioExecutor, classLoader); } /** Factory method returning a Global failure Context for the given params. */ public static Context forGlobalFailure(Executor ioExecutor, ClassLoader classLoader) { return new DefaultFailureEnricherContext(FailureType.GLOBAL, ioExecutor, classLoader); } /** Factory method returning a Disconnection failure Context for the given params. */ public static Context forDisconnectFailure(Executor ioExecutor, ClassLoader classLoader) { return new DefaultFailureEnricherContext(FailureType.DISCONNECT, ioExecutor, classLoader); } }
We also extend the JobMaster to hold a Set of loaded Enrichers that are initialized using the FailureEnricherFactory. Keep in mind that each FailureEnricher can emit labels for specific keys that must be unique.
Thus as part of initialization we run a validation step making sure these keys don't overlap – if they do we log an error message and remove the Enricher from the returned Set.
/** Utils class for loading and running pluggable failure enrichers. */ public class FailureEnricherUtils { public static Set<FailureEnricher> getFailureEnrichers( final Configuration configuration, final JobID jobId, final String jobName, final MetricGroup metricGroup, final PluginManager pluginManager) { Set<String> includedEnrichers = getIncludedFailureEnrichers(configuration); // When empty, NO enrichers will be started. if (includedEnrichers.isEmpty()) { return Collections.emptySet(); } final Iterator<FailureEnricherFactory> factoryIterator = pluginManager.load(FailureEnricherFactory.class); final Set<FailureEnricher> failureEnrichers = new HashSet<>(); while (factoryIterator.hasNext()) { try { final FailureEnricherFactory failureEnricherFactory = factoryIterator.next(); final FailureEnricher failureEnricher = failureEnricherFactory.createFailureEnricher( configuration, jobId, jobName, metricGroup); if (includedEnrichers.contains(failureEnricher.getClass().getName())) { failureEnrichers.add(failureEnricher); LOG.debug( "Found failure enricher {} at {} ", failureEnricherFactory.getClass().getName(), new File( failureEnricher .getClass() .getProtectionDomain() .getCodeSource() .getLocation() .toURI()) .getCanonicalPath()); } else { LOG.info( "Excluding failure enricher {}, not configured in enricher list ({}).", failureEnricherFactory.getClass().getName(), includedEnrichers); } } catch (Exception e) { LOG.warn("Error while loading failure enricher factory.", e); } } return filterInvalidEnrichers(failureEnrichers); } ... }
Enrichers are actually triggered as part of JobMaster#updateTaskExecutionState for local failures, as part of JobMaster#disconnectTaskManager for disconnections, and as part of DefaultScheduler#handleGlobalFailure for global ones.
This makes the implementation generic enough to work with all Flink schedulers and easier to guard from race conditions e.g., concurrent processing of task failures.
@Override public CompletableFuture<Acknowledge> updateTaskExecutionState( final TaskExecutionState taskExecutionState) { checkNotNull(taskExecutionState, "taskExecutionState"); if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) { return labelFailure(taskExecutionState) .thenApplyAsync( taskStateWithLabels -> { try { return doUpdateTaskExecutionState(taskStateWithLabels); } catch (FlinkException e) { throw new CompletionException(e); } }, getMainThreadExecutor()); } ... } /** * Creates a new task execution state copy of the original but with the provided labels * (currently used for Failures) */ public TaskExecutionState withLabels(Map<String, String> failureLabels) {..}
The collection of labels characterizing the failure is performed using a ConjunctFuture – also validating Enrichers' output keys at runtime – and is then propagated as part of ErrorInfo within an FailedExecution of a FailureHandlingResult, eventually making their way to ExceptionsInfo and ExceptionHistoryEntry.
public ErrorInfo(@Nonnull Throwable exception, long timestamp, @Nullable Map<String, String> labels) {...} @JsonCreator public ExceptionInfo( @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName, @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace, @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp, @JsonProperty(FIELD_NAME_EXCEPTION_LABELS) @Nullable Map<String, String> labels, @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName, @JsonProperty(FIELD_NAME_LOCATION) @Nullable String location, @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String taskManagerId) { ... }
Example implementations of the FailureEnrichers may include:
a simple TypeFailureEnricher, attempting to categorize failures to USER or SYSTEM based on the class of the Throwable (OOM exception to SYSTEM while ArithmeticException to USER):
/** * Type implementation of {@link FailureEnricher} that aims to categorize failures to USER or SYSTEM * based on the class of the failure. */ public class TypeFailureEnricher implements FailureEnricher { private static final String typeKey = "type"; private static final Set<String> labelKeys = Stream.of(typeKey).collect(Collectors.toCollection(HashSet::new)); @Override public Set<String> getOutputKeys() { return labelKeys; } @Override public CompletableFuture<Map<String, String>> processFailure( final Throwable cause, final Context context) { return CompletableFuture.supplyAsync( () -> { final Map<String, String> labels = new HashMap(); if (cause == null) { return labels; } if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(cause)) { labels.put(typeKey, "SYSTEM"); } else if (ExceptionUtils.findThrowable(cause, ArithmeticException.class) .isPresent()) { labels.put(typeKey, "USER"); } if (!labels.containsKey(typeKey)) { labels.put(typeKey, "UNKNOWN"); } return labels; }, context.getIOExecutor()); } }
a CountingFailureEnricher counting the number of job failures using its own counter:
/** * Counting implementation of {@link FailureEnricher} that records the count of job failures. * Counter includes failures that ignore restarts thus may be larger than numRestarts. **/ public class CountingFailureEnricher implements FailureEnricher { private final Counter failureCount; @Override public Set<String> getOutputKeys() { return Collections.emptySet(); } public CountingFailureEnricher(final MetricGroup metricGroup) { this.failureCount = metricGroup.counter(MetricNames.NUM_JOB_FAILURES); } @Override public CompletableFuture<Map<String, String>> processFailure( final Throwable cause, final Context context) { failureCount.inc(); return CompletableFuture.completedFuture(Collections.emptyMap()); } }
Flink runtime-web
Finally, inspired by
-
FLINK-6042Getting issue details...
STATUS
, we are going to provide a history of previously caused job failures to include the FailureEnricher labels.
Labels will be part of a separate column in the Job Web dashboard – ready for direct consumption from the UI.
Compatibility, Deprecation, and Migration Plan
Nothing backwards incompatible.
Test Plan
The change will be covered with unit and integration tests.
Rejected Alternatives
- Synchronous Enricher execution (could block JMs mainThread making JM unavailable)
- Mutable Enricher Context
Credit
This effort was heavily influenced by Flink Exception Classifier for Downtime Cause Classification and - FLINK-20833Getting issue details... STATUS while UI changes by - FLINK-6042Getting issue details... STATUS .