Status


StateCompleted
Discussion Threadhttps://lists.apache.org/thread/vnjv32zv3fqqp92t5nqf0qqy03xphwm8
Created

In Release

2.4.0



Background and motivation

Airflow has had excellent support for task orchestration including distributed task execution across nodes using the Celery and Kubernetes executors. With the core capabilities of task scheduling, task execution, task dependency management, and task retries, Airflow's handling of task execution is both scalable and reliable. 

However, data handling with Airflow tasks is relatively simplistic. Airflow currently depends on a feature called XCom for passing data between tasks. This data is persisted in the Airflow metadatabase in the course of DAG execution and was originally intended for small data elements between tasks within the same DAG. As part of the Airflow 1.10.12 release, this feature was extended to support custom XCom data handlers to enable persistence of XCom data in external data sources such as S3 and Google Cloud Storage, thereby enabling large data sets to be passed between tasks using XCom as the core abstraction. Further, as part of the Airflow 2.0 release, users can now pass data between tasks within a DAG inherently through the TaskFlow API. 

However, there are several use cases which require a greater "data awareness" within Airflow for more intuitive data pipeline authoring and maintenance. An example is detailed below: 

  • It is very common in Enterprises to have a dataset "published" by a core team is then used by other data teams. An example of this is "Sales order data" which is published by the Finance team. This dataset can then be used by other teams such as the Product team for product segmentation analysis.
  • Currently with Airflow, this dependency between the data pipeline maintained and run by the Finance to create this "Sales order data", and the data pipeline maintained by the Product team can only be expressed as task dependencies within the same DAG.
  • This leads to composite DAGs with a large number of tasks. Additionally, the underlying data dependency is not visible and only the task dependencies are visible. 
  • Enabling a clear separation of the data pipelines between the Finance and Product teams with an explicit data dependency between them, which then enables the Product data pipeline to be triggered as and when the Sales order data is updated. 

This should be cleanly handled by Airflow and is the core motivation for this proposal. 

Goals

Based on the above, the goal of this AIP is three-fold:

  1. Enable a data dependency mechanism which can be used across DAGs, 
  2. Enable of the triggering of DAGs based on these dataset updates, and 
  3. Enable explicit, visible relationships between DAGs based on data dependencies. 

Approach

The high-level approach outlined below is intended to be contained within Airflow without requiring additional infrastructure components, since it needs to work in the vast majority of enterprise and cloud deployments without needing infrastructure changes. 

There are definitely areas which could be extended using additional infrastructure components, but those have been deferred in this proposal, since they can be built using this foundation in later stages. 

The core approach includes the following key elements:

Introduction of a "Dataset" concept

An explicit notion of a Dataset to serve as a reference for data dependencies. This is necessary to enable the varied scope of availability, especially for unambiguous dataset references to be accessed globally across DAGs and task runs.

A Dataset can be thought of as an "opaque pointer" – or more specifically, Airflow is aware of it only at a high level, and (in this AIP) it doesn't understand anything about what the dataset actual is, where it lives or what data it contains. In other words, as far as Airflow is concerned a Dataset is just a name.

A Dataset is defined by a URI-parsable string. It does not need to be an absolute URL, nor even contain a scheme. The airflow:  scheme is however reserved for future use.

Some "common" examples of the dataset URIs:

  • my-dataset  - a simple dataset of unknown type/location.
  • s3://my-connection@bucket/order_data -  using a specific connection ID for credentials
  • s3://bucket/order_data - using the default connection ID for this type. (Note: hooks don't currently have a concept of default connection IDs – that's a property of operators, so this information would be part of the new Dataset feature being added to providers.)
  • azure-data-lake://container-name/path  - an example of a connection type that uses undersores (The connection type is azure_data_lake)
  • file:///path/to/file/or/folder  - A "special case" for just storing files locally. This is a useful/key component in the "testing out airflow on a laptop" for a new user
  • snowflake://db/table  - Referring to a table in a Snowflake database

(Strictly the scheme part of the URL doesn't have to match the same form used for Connections, but that would be confusing so we will use the same prefix here)

These above URLs are examples and this is not intended to be a complete list.

Data driven DAG scheduling

As outlined in the background and motivation section, it is critical for certain data pipelines to know that a source Dataset has changed and only then be run. 

This changes in the source Dataset should trigger follow-on DAGs. Today, this is typically accomplished by having a FileSensor wait for the arrival of a file in a known location. 

Having an explicit Dataset created in an earlier DAG and then explicitly referenced asa dependency in subsequent DAGs can enable this behavior directly within Airflow, rather than requiring the DAG author to write code within the DAG to poll for the file arrival and so on. 

User Interface

Having explicit Dataset and data dependency definitions within Tasks, also enables a better visual representation in the Airflow UI, leading to a clearly visible dependency path across DAGs based on data propagation, rather than being implicit and only visible by doing a code walkthrough.

The approach of using URIs for datasets does lead to the possibility of typos causing hard to debug issues (such as "why isn't my DAG ever triggering?"). We can't reasonably make an unknown dataset be an error, as it's possible that the DAG that defines that dataset just hasn't been parsed yet, so at most this would be a warning.

We will show such a warning ("This DAG depends on dataset X that Airflow hasn't seen a producer for" etc.). One would be on any of the DAG detail pages itself, and the second would be on this new Dataset dependency view where we would highlight Datasets that have consumers but no producers.

When is a dataset updated?

Since Airflow is not responsible for updating the dataset (Airflow is data-aware, but still data agnostic), and we are deliberately not implementing polling of changes in this AIP (see future work section) Airflow only marks a Dataset as updated when a task finishes sucesfully.

If a task might only conditionally update a dataset, that task should finished with another state – i.e. SKIPPED.

When is a DagRun created?

A DAG defines its dataset dependencies through parameter schedule .  This parameter takes a list of Dataset objects. Once all of the datasets have been updated at least once, a DagRun will be created.  Then the database will start tracking new updates again in preparation for the next dag run.

Example DAG code

Taskflow Example

import pandas as pd

from airflow import dag, Dataset


dataset = Dataset("s3://s3_default@some_bucket/order_data")
@dag
def my_dag():

    @dag.task(outlets=[dataset])
    def producer():
		# What this task actually does doesn't matter to Airflow, the simple act of running to SUCCESS means the dataset
		# is updated, and downstream dags will get triggered
        ...


dataset = Dataset("s3://s3_default@some_bucket/order_data")
@dag(schedule=[dataset])
def consuming_dag():
    @dag.task
    def consumer(uri):
		df = pandas.read_from_s3(uri)
        print(f" Dataset had {df.count()} rows")

    consumer(df=ref.uri)

Classic operator example

It is also possible to "annotate" an operator to say that it updates/produces a dataset, even if Airflow itself isn't responsible for moving the data around

with DAG(
        dag_id='example_databricks_operator',
        start_date=datetime(2021, 1, 1),
        catchup=False,
) as dag:
    new_cluster = {
        'spark_version': '2.1.0-db3-scala2.11',
        'node_type_id': 'r3.xlarge',
        'aws_attributes': {
            'availability': 'ON_DEMAND'
        },
        'num_workers': 8,
    }

    notebook_task_params = {
        'new_cluster': new_cluster,
        'notebook_task': {
            'notebook_path': '/Users/airflow@example.com/PrepareData',
        },
    }
    notebook_task = DatabricksSubmitRunOperator(
        task_id='notebook_task',
        json=notebook_task_params,
        # This isn't required for the task to run, but by is required for Airflow to
        # know that this task produces a given dataset.
        outlets=[Dataset("s3://s3_default@bucket/order_data")],
    )

Adding the "outlets" section to an Operator is currently required for triggering of downstream datasets, but see the future work section for a bit that would this optional for scheduling (but still required for accurate dataset view in the UI)

We "reuse" the (half-formed) inlets  and outlets  attributes from the existing Lineage interface.

DB Changes

In order to have the "data-aware scheduling" we will create a new table in the database so we can lookup the list of DAGs that are scheduled/triggered based on a given dataset URI. We have chosen a new table rather than adding a column to the existing "dag" or "serialized_dag" tables as we want to allow DAG to depend on multiple datasets, so we need a many-to-many list that is easily and efficiently searchable in the scheduler.

The tables we will likely add are: datasets (uri, last_updated, producing_dag_id) and dag_dataset_dependency (uri, dag_id)

See Future Work section for more detail about a richer async/poll based approach. We have stayed with a simple approach for now as it gets us quickly to the core foundation of data-aware scheduling goal.

Future work

This AIP builds the groundwork for making Airflow data aware and is purposefully tight in scope . Here are some of the ideas we have already thought about (and stripped out of this AIP) for follow on work.

Lifecycle management and automatic garbage collection of old Datasets

An explicit problem with XCom today is cleanup of old data. Especially as Datasets grow larger, automatic cleanup of old data is necessary for a well run data system. However, it is worth pointing out that this cleanup is optional since there are many situations where Datasets may be kept around for extended time to enable auditing. 

To solve this problem, we would like to introduce the concept of "Availability Scope" for a Dataset. Whether it is:

  • Within a specific DagRun, or
  • Globally across all DAGs in the Airflow deployment

Like any data handling within a programming language, this element has obvious implications. The analogy is that of a variable defined within a class or within a method inside a class or on the heap and passed by reference across all classes and methods within the process. 

This is purely an analogy and should not be mapped literally to an Airflow DAG or an Airflow Task Group. Since Tasks within Airflow can run on different nodes and will always run as different processes, all data references in Airflow need to be done at an inter-process level, but from the perspective of reference passing and lifecycle management, the analogy is useful.

With explicit definitions around scope of availability of data as detailed here and explicit references to Datasets, it is now possible to use standard computer science techniques around garbage collection for deleting obsolete data objects automatically.

Ideally, this clean-up would be done automatically by Airflow. To solve this, we would like to introduce the notion of Setup and Teardown tasks within Airflow. Specifically, an example of a Teardown task would be a "Garbage Collector" which would be responsible for cleanup of temporary Datasets. However, this would be run on the Workers as a "non-user visible task", as opposed to the SLA callbacks which are currently run in the Scheduler / DAG Processor space. This will be however be the subject of a follow-on AIP as getting Workers (and Executors) to understand non-Task work is non-trivial and is not required for this core proposal. 

Dataset Names

Instead of using the URI mechanism for Dataset references, we considered giving Dataset names and hiding the URIs for better abstraction and changes over time, but decided to defer it. Future work could be to integrate with something like a (Hive) metastore, so that we can give Datasets names without having to implement our own store.

Polling of datasets for changes

The "data based scheduling" currently only works when the same Airflow instance is responsible for both the producing and consuming ends. One future enhancement would be to add the ability to "poll" for changes to the dataset providers. Ideally this would be done using asyncio, so this might involve "extending"/re-purposing the Triggerer process and existing async capabilities in Airflow. That would be its own large AIP opening possibilities for even more “Event Based Scheduling”, hence why we didn’t include it in this one.

Schedule on Dataset change and timetable

It would be possible that someone might want a DAG to run both on a regular schedule, and whenever an input dataset changes. This may or may not be in this AIP depending on how difficult it is to implement

Data Dependency Trigger Rules for multiple input DataSets

In a similar manner to how a Task has trigger_rules for changing when it can run based on different possible states of upstream tasks, we would like to add different "modes" for scheduling a DAG when it has multiple input datasets. The two modes that come to bind off the bat are "all changed" (to only trigger the run once all input datasets have updated) and "any changed" so trigger a run when any one dataset has changed. (The default behaviour, and the only behaviour implemented in this AIP would be "all changed")

Dataset based SLAs

The core Dataset concept is in place as part of this AIP, but there isn't yet any support for enhanced SLAs based on Datasets. This is a foundational AIP and the support for SLAs specifically around data timeliness will be the focus of a follow-on AIP.