This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/0y2yg1y27sv86rkxg0vpj18h5hgy2ppg
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA

FLINK-37426 - Getting issue details... STATUS

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink observability has support for Metrics and Traces. We suggest to enhance the observability capabilities by adding support for Events. This can be used to track the most important events that happen in Flink, e.g. completed checkpoints or changes to the job’s state. Storing such information, e.g. in an event log or a database, would allow us to create a history of those events which can be used for audit purposes or to derive important information like, for example, job uptime/downtime or violations of required checkpoint completion times.

Public Interfaces

Many of the introduced interfaces are following the design of corresponding interfaces to report traces in https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces:

Event interface and builder

/**
 * {@link Event} represents an event that happened in Flink for reporting, e.g. a completed
 * checkpoint or a restart.
 */
@Experimental
public interface Event {

    static EventBuilder builder(Class<?> classScope, String name) {
        return new EventBuilder(classScope, name);
    }

    /** The timestamp for when the event happened or was observed, in milliseconds. */
    long getObservedTsMillis();

    /** The name of the event. */
    String getName();

    /** The scope of the event, typically the fully qualified name of the emitting class. */
    String getClassScope();

    /** Textual description of the event. */
    String getBody();

    /** Severity of the event, e.g. DEBUG, INFO, ... */
    String getSeverity();

    /**
     * Additional attributes for the event. Currently, returned values can be of type String, Long
     * or Double, however more types can be added in the future.
     */
    Map<String, Object> getAttributes();
}


/** Builder used to construct {@link Event}. See {@link Event#builder(Class, String)}. */
@Experimental
public class EventBuilder {
    ...
    
    // Ctors
    public EventBuilder(Class<?> classScope, String name) {...}
    public EventBuilder(String classScope, String name) {...}
    public EventBuilder(Class<?> classScope) {...}
    public EventBuilder() {...}

    /** Sets the timestamp for when the event happened or was observed, in milliseconds. */
    public EventBuilder setObservedTsMillis(long observedTsMillis) {...}

    /** Sets the name of the event. */
    public EventBuilder setName(String name) {...}

    /** Sets the scope of the event, typically the fully qualified name of the emitting class. */
    public EventBuilder setClassScope(String classScope) {...}

    /** Sets the textual description of the event. */
    public EventBuilder setBody(String body) {...}

    /** Sets the severity of the event, e.g. DEBUG, INFO, ... */
    public EventBuilder setSeverity(String severity) {...}

    /** Additional attribute to be attached to this {@link Event}. */
    public EventBuilder setAttribute(String key, String value) {...}

    /** Additional attribute to be attached to this {@link Event}. */
    public EventBuilder setAttribute(String key, long value) {...}

    /** Additional attribute to be attached to this {@link Event}. */
    public EventBuilder setAttribute(String key, double value) {...}

    public String getName() {...}

    /** Builds the specified instance. */
    public Event build(Map<String, String> additionalVariables) {...}
    public Event build() {...}
}

Event reporter interface and newly introduced super interface common to all reporters

@Experimental
public interface EventReporter extends Reporter {
    
    /** Report the given event. */
    void notifyOfAddedEvent(Event event);
}
/** Parent interface to all reporters, defining their common lifecycle. */
@public
public interface Reporter {

    /** Configures this reporter. */
    void open(MetricConfig config);

    /** Closes this reporter. Should be used to close channels, streams and release resources. */
    void close();
}

Reporter factory interface

/**
 * {@link EventReporter} factory.
 *
 * <p>Event reporters that can be instantiated with a factory automatically qualify for being loaded
 * as a plugin, so long as the reporter jar is self-contained (excluding Flink dependencies) and
 * contains a {@code META-INF/services/org.apache.flink.events.reporter.EventReporterFactory} file
 * containing the qualified class name of the factory.
 */
@Experimental
public interface EventReporterFactory {
    /**
     * Creates a new event reporter.
     *
     * @param properties configured properties for the reporter
     * @return created metric reporter
     */
    EventReporter createEventReporter(Properties properties);
}

Event reporting through newly introduced method in MetricGroup interface

@Public
public interface MetricGroup {

    @Experimental
    default void addEvent(EventBuilder eventBuilder) {}
    
    (...)
}

List of reported events

The following Events will be reported as part of this FLIP:

  • CheckpointEvent  (this will be configurable, as checkpoints are already reported as Spans) 
  • JobStatusChangeEvent  - emitted when JobStatus  is changed
  • JobFailureEvent  - emitted when Job fails
  • AllSubtasksStatusChangeEvent - emitted when all subtask reach RUNNING  or FINISHED  state

Proposed Changes

Loading and operating event reporters

We introduce a hierarchy similar to Trace reporting (Event+builder, reporter, reporter factory, and exposure through MetricGroup).

We slightly refactor the existing reporter interfaces (trace, metric) by extracting the Reporter parent interface that all reporter interfaces extend. This is a backward compatible change to the public MetricReporter interface.

In general, EventReporter will work and be loaded the same way as MetricReporter or SpanReporter is, as a separate plugin, with the same style of configuration in the flink-conf.yaml.

We will also integrate with mechanisms of metric groups for filtering and enrichment (additional scope variables) of events.

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users, there is no need for any migration.

As mentioned above, we change the public interface MetricReporter through extraction of some methods into a new super interface to avoid code duplication. This change is binary compatible.

Test Plan

This feature is already used and tested inside Confluent. Before committing the final accepted version would be tested inside Confluent again. On top of that there will sufficient automatic testing coverage.

Rejected Alternatives

See discussion in https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces. At this point the FLIP is basically walking in the footsteps of how other kinds of reporting work, e.g. reporting for traces.