Status

Motivation

In FLIP-294 [1] we have supported meta data listener, and current FLIP aims to add customized job lineage listener.

Public Interfaces

JobStatusChangedListener 

Flink creates events and notify JobStatusChangedListener when status of job is changed. There are two types of job status event for the listener: JobCreatedEvent and JobExecutionStatusEvent. JobCreatedEvent will be fired when job is created, it has job lineage and the listener can create lineages for source and sink. JobExecutionStatusEvent has old and new job statuses in runtime and listener can even delete the lineages when job goes to termination.

/**
 * When job is created or its status is changed, Flink will generate job event and notify job status changed listener.
 */
@PublicEvolving
public interface JobStatusChangedListener {
    /* Event will be fired when job status is changed. */
    public void onEvent(JobStatusChangedEvent event);
}

/** Basic job status event. */
@PublicEvolving
public interface JobStatusChangedEvent {
    JobID jobId();
    String jobName();
}

/** Job created event with job lineage. */
@PublicEvolving
public interface JobCreatedEvent extends JobStatusChangedEvent {
    /* Lineage for the current job. */
    LineageGraph lineageGraph();

    /* Runtime execution mode for the job, STREAMING/BATCH/AUTOMATIC. */
    RuntimeExecutionMode executionMode();
}

/** Job status changed event for runtime. */
@PublicEvolving
public interface JobExecutionStatusEvent extends JobStatusChangedEvent {
    JobStatus oldStatus();
    JobStatus newStatus();
    @Nullable Throwable exception();
}

/** Factory for job status changed listener. */
@PublicEvolving
public interface JobStatusChangedListenerFactory {
    JobStatusChangedListener createListener(Context context);

    @PublicEvolving
    public interface Context {
        /*
         * Configuration for the factory to create listener, users can add customized options to flink and get them here to create the listener. For
         * example, users can add rest address for datahub to the configuration, and get it when they need to create http client for the listener.
         */
        Configuration getConfiguration();
        ClassLoader getUserClassLoader();

		/*
    	* Get an Executor pool for the listener to run async operations that can potentially be IO-heavy. `JobMaster` will provide an independent executor
        * for io operations and it won't block the main-thread. All tasks submitted to the executor will be executed in parallel, and when the job ends,
        * previously submitted tasks will be executed, but no new tasks will be accepted.
        */
        Executor getIOExecutor()
    }
}

Job lineage is divided into two layers: the first layer is global abstraction for all Flink jobs and connectors, and the second layer defines the lineages for Table/Sql and DataStream independently based on the first one. 

/**
 * Job lineage is built according to StreamGraph. Users can get sources, sinks and relationships from lineage.
 */
@PublicEvolvig
public interface LineageGraph {
    /* Source lineage vertex list. */
    List<SourceLineageVertex> sources();

    /* Sink lineage vertex list. */
    List<LineageVertex> sinks();

    /* lineage edges from sources to sinks. */
    List<LineageEdge> relations();
}

/** Lineage vertex represents the connectors in lineage graph, including source and sink. */
@PublicEvolving
public interface LineageVertex {
    /* List of input (for source) or output (for sink) datasets interacted with by the connector */
    List<LineageDataset> datasets();
}

/** Lineage dataset represents the source or sink in the job. */
@PublicEvolving
public interface LineageDataset {
    /* Name for this particular dataset. */
    String name;
    /* Unique name for this dataset's storage, for example, url for jdbc connector and location for lakehouse connector. */
    String namespace;
    /* Facets for the lineage vertex to describe the particular information of dataset, such as schema and config. */ 
    Map<String, Facet> facets;
}

/** Facet interface for dataset. */
@PublicEvolving
public interface LineageDatasetFacet {
    /** Name for the facet which will be used as key in facets of LineageDataset. */
    String name();
}

/** Builtin config facet for dataset. */
@PublicEvolving
public interface DatasetConfigFacet extends LineageDatasetFacet {
    Map<String, String> config();
}

/** Field for schema in dataset. */
public interface DatasetSchemaField<T> {
    /** The name of the field. */
    String name();
    /** The type of the field. */
    T type();
}

/** Builtin schema facet for dataset. */
@PublicEvolving
public interface DatasetSchemaFacet extends LineageDatasetFacet {
    <T> Map<String, DatasetSchemaField<T>> fields();
}

/** Lineage vertex for source which has boundedness. */
@PublicEvolving
public interface SourceLineageVertex extends LineageVertex {
    /**
     * The boundedness for the source connector, users can get boundedness for each sources in
     * the lineage and determine the job execution mode with RuntimeExecutionMode.
     */
    Boundedness boundedness();
}

/** Lineage edge from sources to sink. */
@PublicEvolving
public interface LineageEdge {
    LineageVertex source();
    LineageVertex sink();
}

Job lineage for Table/SQL job

For Table/SQL jobs, Flink creates table lineages according to tables for source and sink. There're column lineages in table lineages, and Flink jobs can create the dependencies between source and sink columns. Flink creates these lineages for Table/SQL jobs from job plan, the entire processing has nothing to do with users. CatalogContext in TableLineageDataset is defined in FLIP-294 [1] which identify the context for physical table in different catalog.

/** Basic table lineage dataset which has catalog context and table in it. */
public interface TableLineageDataset extends LineageDataset {
    /* The catalog context of the table lineage vertex. */
    public CatalogContext catalogContext();

    /* The table of the lineage vertex. */
    public CatalogBaseTable table();

    /* Database name and table name for the table lineage vertex. */
    public ObjectPath objectPath();
}

/** Schema field for sql/table jobs which has `LogicalType` for field type. */
public interface TableSchemaField extends DatasetSchemaField<LogicalType> {
}

/** Source lineage vertex for table. */
@PublicEvolving
public interface TableSourceLineageVertex extends SourceLineageVertex {
}

/** Sink lineage vertex for table. */
@PublicEvolving
public interface TableSinkLineageVertex extends TableLineageVertex {
    /* Modify type, INSERT/UPDATE/DELETE statement, listener can identify different sink by the type to determine whether the sink should be added to the lineage. */
    ModifyType modifyType();
}

/* Table lineage edges from source table to sink table. */
@PublicEvolving
public interface TableLineageEdge extends LineageEdge {
    /* Table column lineage edges from source columns to sink columns. */
    List<TableColumnLineageEdge> columnEdges();
}

/* Column lineage from source table columns to each sink table column, one sink column may be aggregated by multiple tables and columns. */
@PublicEvolving
public interface TableColumnLineageEdge {
    /** The dataset for source dataset. */
    LineageDataset source();

    /**
     * Columns from one source table of {@link LineageEdge} to the sink column. Each sink column may be computed from multiple columns
     * from source, for example, avg operator from two columns in the source.
     */
    List<String> sourceColumns();

    /* Sink table column. */
    String sinkColumn();
}

/** The existing `ModifyType` should be marked as `PublicEvolving` and users can get it from table sink lineage vertex. */
@PublicEvolving
public enum ModifyType {
}

Job lineage for DataStream job

The data structures of connectors in DataStream jobs are much more complex than in Table/SQL jobs, they may be tables, user customized POJO classes or even vector for ML jobs. Currently we can add sources and sinks in DataStream as follows.

// Add sources in DataStream job
// 1. Add source with InputFormat
DataStreamSource env.createInput(InputFormat inputFormat)
// 2. Add source with SourceFunction
DataStreamSource addSource(SourceFunction function)
// 3. Add source with Source
DataStreamSource env.fromSource(Source source)

// Add sinks in DataStream job
// 1. Add sink with SinkFunction
DataStreamSink addSink(SinkFunction sinkFunction)
// 2. Add sink with Sink
DataStreamSink<T> sinkTo(Sink<T, ?, ?, ?> sink)

We add LineageVertexProvider  for DataStream and users can create lineage vertex for their customized source and sink connectors. We will implement LineageVertexProvider  for the builtin source and sink such as KafkaSource , HiveSource , FlinkKafkaProducerBase  and etc.

/**
 * Create lineage vertex for source and sink in DataStream. If the source and sink connectors in datastream job implement this interface, 
 * flink will get lineage vertex and create all-to-all lineages between sources and sinks by default.
 */
@PublicEvolving
public interface LineageVertexProvider {
     LineageVertex getLineageVertex();
}

Flink will get lineage vertex from source and sink when they implement LineageVertexProvider and create lineage edges between them automatically. But DataStream job misses the relationships between specific sources and sinks, for example, one DataStream  job

1. Reads data from kafka topic 1, topic 2 and topic 3

2. Writes data to kafka topic 4 and topic 5.

We cannot analyze the relationship between topic 1, 2, 3 and topic 4, 5 that we can get from sql job, so we add addLineageEdges in StreamExecutionEnvironment to add these information. If users do not add lineage edges for DataStream job, flink will create relations between each source and sink. 

@Public
public class StreamExecutionEnvironment {
	/**
 	* Users can add exclusive lineage for each source and sink by this method, for example, add lineage from different sources to each sink connectors.
    * <ul>
    *   <li> For datastream job, flink cannot analyze which sources generate data to the sink because the lineages are generated in the logical of user operators. 
    *        Users can add the lineages between sources and sinks in this method themselves.
    *   <li> For table or SQL job, flink can analyze the lineages from sources to sinks automatically, but users can also add customized lineage in this method.
    * </ul> 
    * <p>
    * NOTICE: The lineage graph added in this way has the highest priority, if users add customized lineage here, the lineage graph will overwrite the
    * automatically generated one by job, including table and SQL jobs. The lifecycle of lineages in {@link StreamExecutionEnvironment} is consistent with that 
    * of transformations, and when transformations are cleaned up, it will also be cleaned up.
    */
    StreamExecutionEnvironment addLineageEdges(LineageEdge ... edges);
}

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options

# Config for job status changed listeners.
lineage.job-status-changed.listeners: {job status changed listener factory1},{job status changed listener factory2}

Proposed Changes

Use case of job lineage

1. User customized lineage for DataStream

Users can implement customized source and sink lineages for datastream job, for example, KafkaVectorLineageVertex for kafka source and sink as follows.

/** User defined vector source and sink lineage vertex. */
public class KafkaVectorLineageVertex extends LineageVertex {
    /* The capacity of source lineage. */
    int capacity();

    /* The value type in the vector. */
    String valueType();

    List<LineageDataset> datasets();
}

// Create kafka source class with lineage vertex
public class KafkaVectorSource extends KafkaSource implements LineageVertexProvider {
    int capacity;
    String valueType;

    public LineageVertex LineageVertex() {
        return new KafkaVectorLineageVertex(capacity, valueType);
    }
}

// Create kafka sink class with lineage vertex
public class KafkaVectorSink extends FlinkKafkaProducerBase implements LineageVertexProvider {
    int capacity;
    String valueType;

    public LineageVertex LineageVertex() {
        return new KafkaVectorLineageVertex(capacity, valueType);
    }
}
 
/* User can use vector source/sink lineages in datastream job. */
StreamExecutionEnvironment env = ...;
KafkaSource source = new KafkaVectorSource();
KafkaSink sink = new KafkaVectorSink();
env.fromSource(source)
	.map(...).keyBy(..).reduce(..)...
	.sinkTo(sink);
env.addLineageEdges(/* Build lineage edge from source and sink vertex */);
env.execute();

After that, users can cast the source and sink lineage entities to vector lineage vertex, get capacity and value type in the customized listeners.

2. Connectors identified by connector identifier

Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.

a) Create Kafka and Paimon tables for Flink SQL job

-- Create a table my_kafka_table1 for kafka topic 'kafka_topic'
CREATE TABLE my_kafka_table1 (
  val1 STRING,
  val2 STRING,
  val3 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create a Paimon catalog and table for warehouse 'paimon_path1'
CREATE CATALOG paimon_catalog WITH (
    'type'='paimon',
    'warehouse'='paimon_path1'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table1 WHERE ...;

b) Create another Kafka and Paimon tables for Flink SQL job

-- Create another table my_kafka_table2 for kafka topic 'kafka_topic' which is same as my_kafka_table1 above
CREATE TABLE my_kafka_table2 (
  val1 STRING,
  val2 STRING,
  val3 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create a Paimon catalog with the same name 'paimon_catalog' for different warehouse 'paimon_path2'
CREATE CATALOG paimon_catalog WITH (
    'type'='paimon',
    'warehouse'='paimon_path2'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table2 WHERE ...;

c) Create Mysql table for Flink SQL job

-- Create two catalogs for warehouse 'paimon_path1' and 'paimon_path2', there are two different tables 'my_paimon_table'
CREATE CATALOG paimon_catalog1 WITH (
    'type'='paimon',
    'warehouse'='paimon_path1'
);
CREATE CATALOG paimon_catalog2 WITH (
    'type'='paimon',
    'warehouse'='paimon_path2'
);

-- Create mysql table
CREATE TABLE mysql_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://HOST:PORT/my_database',
    'table-name' = 'my_mysql_table',
    'username' = '***',
    'password' = '***'
);

-- Insert data to mysql table from two paimon tales
INSERT INTO mysql_table 
    SELECT ... FROM paimon_catalog1.default.my_paimon_table
        JOIN paimon_catalog2.default.my_paimon_table
    ON ... WHERE ...;

d) Implement datahub and paimon listener for flink lineage.

// Datahub lineage listener factory.
public class DatahubLineageListenerFactory implements JobStatusChangedListenerFactory {
    private static final String DATAHUB_REST_ADDRESS = "datahub.rest.url";

    @Override
    public JobStatusChangedListener createListener(Context context) {
        Map<String, String> config = context.getConfiguration().toMap();
        String url = checkNotNull(config.get(DATAHUB_REST_ADDRESS));
        return new DatahubLineageListener(url);
    }
}

// Datahub lineage listener for paimon tables.
public class DatahubLineageListener implements JobStatusChangedListener {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubLineageListener.class);
    private final String url;

    public DatahubLineageListener(String url) {
        this.url = url;
    }

    @Override
    public void onEvent(JobStatusChangedEvent event) {
        if (event instanceof JobCreatedEvent) {
            JobCreatedEvent createdEvent = (JobCreatedEvent) event;
            LineageVertex jobLineage = createdEvent.lineage();
            for (LineageEdge relation : jobLineage.relations()) {
                LineageVertex source = relation.sources();                 LineageVertex sink = relation.sink();
                checkArgument(
                        sink instanceof TableLineageVertex,
                        String.format(
                                "Only support table sink lineage vertex: %s",
                                sink.getClass().getName()));
                TableLineageVertex tableSink = (TableLineageVertex) sink;
                String sinkPhysicalPath = getPhysicalPath(tableSink);
                ObjectIdentifier sinkIdentifier = tableSink.identifier();

                checkArgument(
                        source instanceof TableLineageVertex,
                        String.format(
                                "Only support table source lineage vertex: %s",
                                source.getClass().getName()));
                TableLineageVertex tableSource = (TableLineageVertex) source;

                // Get physical table path for paimon catalog.
                String sourcePhysicalPath = getPhysicalPath(tableSource);
                ObjectIdentifier sourceIdentifier = tableSource.identifier();
                createRelation(
                            createdEvent.jobId(),
                            createdEvent.jobName(),
                            sourcePhysicalPath,
                            sourceIdentifier,
                            sinkPhysicalPath,
                            sinkIdentifier);
            }
        } else if (event instanceof JobExecutionStatusEvent) {
            JobExecutionStatusEvent executionStatusEvent = (JobExecutionStatusEvent) event;
            if (executionStatusEvent.oldStatus().isGloballyTerminalState()) {
                deleteRelation(executionStatusEvent.jobId(), executionStatusEvent.jobName(), executionStatusEvent.exception());
            }
        } else {
            LOG.error(
                    "Receive unsupported job status changed event: {}",
                    event.getClass().getName());
        }
    }

    private String getPhysicalPath(TableLineageVertex lineageVertex) {
        CatalogContext sinkCatalogContext = lineageVertex.datasets().get(0).catalogContext();
        String sinkCatalogIdentifier = sinkCatalogContext
                .getFactoryIdentifier()
                .orElseThrow(() -> {
                    throw new UnsupportedOperationException(
                            "Only support catalog table connector.");
                });
        checkArgument(
                sinkCatalogIdentifier.equals("paimon"),
                "Only support paimon connector for lineage");
        return checkNotNull(sinkCatalogContext
                .getConfiguration()
                .toMap()
                .get("path"));
    }

    private void createRelation(
            JobID jobId,
            String jobName,
            String sourcePhysicalPath,
            ObjectIdentifier sourceIdentifier,
            String sinkPhysicalPath,
            ObjectIdentifier sinkIdentifier) {
        // Create dataset from physical path and identifier for source and sink, then create relation from source to sink for given job
    }

    private void deleteRelation(JobID jobId, String jobName, @Nullable Throwable exception) {
        // Delete relation created by given job
    }
}

After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by connector identifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub[2] as an example which is shown below

Changes for JobStatusChangedListener

Build JobLogicalPlan in StreamGraph

Flink creates Planner for sql and table jobs which contains exec nodes, then the planner will be converted to Transformation and StreamGraph. DataStream jobs are similar with SQL, Flink creates DataStream and converts it to Transformation and StreamGraph. The job conversion is shown as followed. 

There is a graph structure in StreamGraph , we can create JobLogicalPlan based on StreamGraph easily. 

For table and SQL jobs, Flink translates source and sink exec nodes in planner to Transformation in ExecNodeBase.translateToPlanInternal. There's resolved table in source and sink nodes, Flink can create source and sink lineage based on the table and save them in source/sink transformations.

For DataStream  jobs, DataStreamSource set the source lineage to source transformation in setLineage method, and DataStreamSink does the same thing.

Finally source and sink nodes are translated from transformation and added to StreamGraph in SourceTransformationTranslator.translateInternal  and SinkTransformationTranslator.translateInternal , where their lineages information can be added to StreamGraph too.

Create and notify listeners in RestClusterClient

RestClusterClient  can read option lineage.job-status-changed.listeners from Configuration and create JobStatusChangedListener . RestClusterClient.submitJob is used in AbstractSessionClusterExecutor to submit job, which will convert Pipeline to JobGraph. AbstractSessionClusterExecutor can set StreamGraph in RestClusterClient before calling the submitJob method, then the RestClusterClient can get the StreamGraph and notify the listener before JobGraph is submitted.

/* Set pipeline to client before it submit job graph.  */
public class AbstractSessionClusterExecutor {
    public CompletableFuture<JobClient> execute(
            @Nonnull final Pipeline pipeline,
            @Nonnull final Configuration configuration,
            @Nonnull final ClassLoader userCodeClassloader)
            throws Exception {
        ....

        final ClusterClientProvider<ClusterID> clusterClientProvider =
                    clusterDescriptor.retrieve(clusterID);
        ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();

        // Set pipeline to the cluster client before submit job graph
        clusterClient.setPipeline(pipeline); 

        return clusterClient
                    .submitJob(jobGraph)
                    ....  
    }
}

/* Create job submission event and notify listeners before it submit job graph. */
public class RestClusterClient {
    private final List<JobStatusChangedListener> listeners;
    private Pipeline pipeline;

    @Override
    public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
        // Create event and notify listeners before the job graph is submitted.
        JobSubmissionEvent event = createEventFromPipeline(pipeline);
        if (event != null) {
            listeners.forEach(listener -> listener.onEvent(event));
        }
        
        ....;
    }
}

Job status changed in job manager

JobManager  can create JobStatusChangedListener in DefaultExecutionGraph according to option lineage.job-status-changed.listeners in job configuration. Currently JobManager will call DefaultExecutionGraph.transitionState when the job status is changed, JobStatusChangedListener can be notified in the method as follows.

/* Set pipeline to client before it submit job graph.  */
public class DefaultExecutionGraph {
    private final List<JobStatusChangedListener> executionListeners;

    private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
        ....;
        notifyJobStatusHooks(newState, error);
        // notify job execution listeners
        notifyJobStatusChangedListeners(current, newState, error);
        ....;
    }

    private void notifyJobStatusChangedListeners(JobStatus current, JobStatus newState, Throwable error) {
        JobExecutionStatusEvent event = create job execution status event;
        executionListeners.forEach(listener -> listener.onEvent(event));
    }
}

Rejected Alternatives

Report lineage in JobListener

Currently there is JobListener in Flink, users can implement custom listeners and obtain JobClient. We can add lineage information in JobListener and users can report it. However, JobListener is added in FLINK-14992 [1] which is used by Zeppelin and Notebook to manage the job. The lineage information of Flink job is static and we don't need JobClient in the listener. So we reject this proposal and create JobStatusChangedListener for lineage.

Pull job status in JobListener

We can pull the job status in JobListener . As discussed in the previous thread [3], we do not want to do such things in our services too.

Submit job with lineage and Report it in JobMaster

In this FLIP the JobStatusChangedListener will be in Client and JobMaster, which will report lineage information and job status independently. Another proposal is to put the lineage information into JobGraph and then report it in JobMaster . Considering that the lineage information may be relatively large and affect JobGraph , we will report separately in the first phase and then consider merging in the future.

Plan For The Future

  1. We add column lineages in job lineage, but it is not supported in Flink at present. We'd like to implement them in the next FLIP. 
  2. Currently we only supports scan source in this FLIP, lookup join source should be supported later.

  3. Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.

  4. Current client will notify listener to report job lineage, it should be reported in JobManager and supports in REST api in the future.


[1] FLIP-294: Support Customized Catalog Modification Listener

[2] https://issues.apache.org/jira/browse/FLINK-14992

[3] https://lists.apache.org/thread/ttsc6155bzrtkh8wkkxg2dj689jtvh4t