Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently Flink has a limited observability of checkpoint and recovery processes.

For checkpointing Flink has a very detailed overview in the Flink WebUI, which works great in many use cases, however it’s problematic if one is operating multiple Flink clusters, or if cluster/JM dies. Additionally there are a couple of metrics (like lastCheckpointDuration or lastCheckpointSize), however those metrics have a couple of issues:

  • They are reported and refreshed periodically, depending on the MetricReporter settings, which doesn’t take into account checkpointing frequency.

    • If checkpointing interval > metric reporting interval, we would be reporting the same values multiple times.

    • If checkpointing interval < metric reporting interval, we would be randomly dropping metrics for some of the checkpoints.

For recovery we are missing even the most basic of the metrics and Flink WebUI support. Also given the fact that recovery is even less frequent compared to checkpoints, adding recovery metrics would have even bigger problems with unnecessary reporting the same values.

In this FLIP I’m proposing to add support for reporting traces/spans (example: Traces) and use this mechanism to report checkpointing and recovery traces. I hope in the future traces will also prove useful in other areas of Flink like job submission, job state changes, ... . Moreover as the API to report traces will be added to the MetricGroup , users will be also able to access this API. 

Related work:

[FLINK-23411] Expose Flink checkpoint details metrics - ASF JIRA
[FLINK-33071] Log checkpoint statistics - ASF Jira

[FLINK-7894] Improve metrics around fine-grained recovery and associated checkpointing behaviors - ASF JIRA

Public Interfaces

/**
 * Span represents something that happened in Flink at certain point of time, that will be reported
 * to a {@link org.apache.flink.traces.reporter.TraceReporter}.
 *
 * <p>Currently we don't support traces with multiple spans. Each span is self-contained and
 * represents things like a checkpoint or recovery.
 */
@Experimental
public interface Span {

    public static SpanBuilder builder(Class<?> classScope, String name) {
        return new SpanBuilder(classScope, name);
    }

    public String getScope();
    
    public String getName();

    public long getStartTsMillis();

    public long getEndTsMillis();

    /**
     * Currently returned values can be of type String, Long or Double, however more types can
     * be added in the future.
     */
    public Map<String, Object> getAttributes();
}
@Experimental
public class SpanBuilder {
    SpanBuilder(Class<?> classScope, String name) {...}
    
    public Span build() {...}
    public SpanBuilder setStartTsMillis(long startTsMillis) {...}
    public SpanBuilder setEndTsMillis(long endTsMillis) {...}
    public SpanBuilder setAttribute(String key, String value) {...}
    public SpanBuilder setAttribute(String key, long value) {...}
    public SpanBuilder setAttribute(String key, double value) {...}
}
**
 * Trace reporters are used to export {@link Span Spans} to an external backend.
 *
 * <p>Reporters are instantiated via a {@link TraceReporterFactory}.
 */
@Experimental
public interface TraceReporter {

    // ------------------------------------------------------------------------
    //  life cycle
    // ------------------------------------------------------------------------

    /**
     * Configures this reporter.
     *
     * <p>If the reporter was instantiated generically and hence parameter-less, this method is the
     * place where the reporter sets it's basic fields based on configuration values. Otherwise,
     * this method will typically be a no-op since resources can be acquired in the constructor.
     *
     * <p>This method is always called first on a newly instantiated reporter.
     *
     * @param config A properties object that contains all parameters set for this reporter.
     */
    void open(MetricConfig config);

    /** Closes this reporter. Should be used to close channels, streams and release resources. */
    void close();

    // ------------------------------------------------------------------------
    //  adding / removing metrics
    // ------------------------------------------------------------------------

    default void notifyOfAddedSpan(Span span) {}
}
/**
 * {@link TraceReporter} factory.
 *
 * <p>Trace reporters that can be instantiated with a factory automatically qualify for being loaded
 * as a plugin, so long as the reporter jar is self-contained (excluding Flink dependencies) and
 * contains a {@code META-INF/services/org.apache.flink.traces.reporter.SpanReporterFactory} file
 * containing the qualified class name of the factory.
 */
@Experimental
public interface TraceReporterFactory {

    /**
     * Creates a new trace reporter.
     *
     * @param properties configured properties for the reporter
     * @return created metric reporter
     */
    TraceReporter createTraceReporter(final Properties properties);
}

And expand:

@Public
public interface MetricGroup {

    @Experimental
    default void addSpan(Span span) {}
    
    (...)
}


Proposed Changes

Reporting traces

TraceReporter would be handled and loaded the same way as MetricReporter is: via a separate plugin, with the same style of configuration in the flink-conf.yaml.

Implementations of TraceReporter

A simple Log4jTraceReporter will be added. This reporter would be logging reported traces to Flink’s Job Manager log.

OpenTelemetry based TraceReporter will be proposed in another FLIP

Reported traces

Recovery

Recovery trace would initially have just a single span per whole job, with sum/max aggregated values from each subtask:

  • Mailbox Start: time between sub task switching to INITIALIZING state and actually main sub task loop (mailbox loop) starting up. This includes constructing OperatorChain and other sub task classes.

  • Read Output: time to recover/read output buffers checkpointed as part of the unaligned checkpoint.

  • Initialize State: time to

    • download state files and load them into the state backend

    • invoke StreamOperator#initializeState and StreamOperator#open in all of the operators in the chain

  • Gate Restore: time to recover and process input buffers checkpointed as part of the unaligned checkpoint.

... {
    (...)
    SpanBuilder span =
            Span.builder(CheckpointStatsTracker.class, "JobInitialization")
                    .setStartTsMillis(jobInitializationMetrics.getStartTs())
                    .setEndTsMillis(jobInitializationMetrics.getEndTs())
                    .setAttribute(
                            "initializationStatus",
                            jobInitializationMetrics.getStatus().name());
    setDurationSpanAttribute(span, jobInitializationMetrics.getMailboxStart());
    setDurationSpanAttribute(span, jobInitializationMetrics.getReadOutput());
    setDurationSpanAttribute(span, jobInitializationMetrics.getInitializeState());
    setDurationSpanAttribute(span, jobInitializationMetrics.getGateRestore());
    if (jobInitializationMetrics.getCheckpointId() != JobInitializationMetrics.UNSET) {
        span.setAttribute("checkpointId", jobInitializationMetrics.getCheckpointId());
    }
    if (jobInitializationMetrics.getStateSize() != JobInitializationMetrics.UNSET) {
        span.setAttribute("fullSize", jobInitializationMetrics.getStateSize());
    }
}

private void setDurationSpanAttribute(
        SpanBuilder span, JobInitializationMetrics.SumMaxDuration duration) {
    span.setAttribute("max" + duration.getName(), duration.getMax());
    span.setAttribute("sum" + duration.getName(), duration.getSum());
}

Checkpointing

Checkpoint is also reported as a single span trace. One span for whole job per each checkpoint. Currently without aggregated subtask metrics (I hope this will be added in the future).

metricGroup.addSpan(
        Span.builder(CheckpointStatsTracker.class, "Checkpoint")
                .setStartTsMillis(checkpointStats.getTriggerTimestamp())
                .setEndTsMillis(checkpointStats.getLatestAckTimestamp())
                .setAttribute("checkpointId", checkpointStats.getCheckpointId())
                .setAttribute("fullSize", checkpointStats.getStateSize())
                .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize())
                .setAttribute("checkpointStatus", checkpointStats.getStatus().name()));

Immediate follow up work

  1. Add OpenTelemetry based TraceReporter.   (FLIP-385).

  2. Allow state backends to report more detailed recovery stats (FLIP-386).

    1. For example for Incremental RocksDB (RocksDBIncrementalRestoreOperation) we could easily separately measure time to download files from remote storage and time to load those files into the local RocksDB instance. However this would require to change state backend API.

Mid/long term follow up work

  1. Present aggregated subtask checkpoint metrics (Sync Duration, Async Duration, Alignment Duration, Start Delay.

    1. For now, this is omitted for checkpointing and included for recovery, to minimize the scope of this FLIP and because user has already access to those metrics from the Flink WebUI for example after all.

  2. For checkpointing and recovery traces create separate spans for each subtask within the same single checkpoint trace

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users, there is no need for any migration.

Test Plan

This feature is already used and tested inside Confluent. Before committing the final accepted version would be tested inside Confluent again.

Rejected Alternatives

The idea of exposing the same values via existing metric API has been rejected due to the reasons explained in the motivation.

Reporting Spans through some dedicate to tracing/spans entity (interface instead of MetricGroup) was rejected because:

  • it would give marginal benefits in cleanliness of the interfaces

  • the new interface would have to be propagated everywhere through the code base, including many @Public interfaces

  • with MetricGroup it is possible to leverage existing code base to, for example, add automatically JobID to any spans that are added to JobMetricGroup