Status

StateCompleted
Discussion Threadhttps://lists.apache.org/thread/vnjv32zv3fqqp92t5nqf0qqy03xphwm8
Created

2022-02-17

In Release

2.4.0

Background and motivation

Airflow has had excellent support for task orchestration including distributed task execution across nodes using the Celery and Kubernetes executors. With the core capabilities of task scheduling, task execution, task dependency management, and task retries, Airflow's handling of task execution is both scalable and reliable. 

However, data handling with Airflow tasks is relatively simplistic. Airflow currently depends on a feature called XCom for passing data between tasks. This data is persisted in the Airflow metadatabase in the course of DAG execution and was originally intended for small data elements between tasks within the same DAG. As part of the Airflow 1.10.12 release, this feature was extended to support custom XCom data handlers to enable persistence of XCom data in external data sources such as S3 and Google Cloud Storage, thereby enabling large data sets to be passed between tasks using XCom as the core abstraction. Further, as part of the Airflow 2.0 release, users can now pass data between tasks within a DAG inherently through the TaskFlow API. 

However, there are several use cases which require a greater "data awareness" within Airflow for more intuitive data pipeline authoring and maintenance. An example is detailed below: 

  • It is very common in Enterprises to have a dataset "published" by a core team is then used by other data teams. An example of this is "Sales order data" which is published by the Finance team. This dataset can then be used by other teams such as the Product team for product segmentation analysis.
  • Currently with Airflow, this dependency between the data pipeline maintained and run by the Finance to create this "Sales order data", and the data pipeline maintained by the Product team can only be expressed as task dependencies within the same DAG.
  • This leads to composite DAGs with a large number of tasks. Additionally, the underlying data dependency is not visible and only the task dependencies are visible. 
  • Enabling a clear separation of the data pipelines between the Finance and Product teams with an explicit data dependency between them, which then enables the Product data pipeline to be triggered as and when the Sales order data is updated. 

This should be cleanly handled by Airflow and is the core motivation for this proposal. 

Goals

Based on the above, the goal of this AIP is three-fold:

  1. Enable a data dependency mechanism which can be used across DAGs, 
  2. Enable of the triggering of DAGs based on these dataset updates, and 
  3. Enable explicit, visible relationships between DAGs based on data dependencies. 

Approach

The high-level approach outlined below is intended to be contained within Airflow without requiring additional infrastructure components, since it needs to work in the vast majority of enterprise and cloud deployments without needing infrastructure changes. 

There are definitely areas which could be extended using additional infrastructure components, but those have been deferred in this proposal, since they can be built using this foundation in later stages. 

The core approach includes the following key elements:

Introduction of a "Dataset" concept

An explicit notion of a Dataset to serve as a reference for data dependencies. This is necessary to enable the varied scope of availability, especially for unambiguous dataset references to be accessed globally across DAGs and task runs.

A Dataset can be thought of as an "opaque pointer" – or more specifically, Airflow is aware of it only at a high level, and (in this AIP) it doesn't understand anything about what the dataset actual is, where it lives or what data it contains. In other words, as far as Airflow is concerned a Dataset is just a name.

A Dataset is defined by a URI-parsable string. It does not need to be an absolute URL, nor even contain a scheme. The airflow:  scheme is however reserved for future use.

Some "common" examples of the dataset URIs:

  • my-dataset  - a simple dataset of unknown type/location.
  • s3://my-connection@bucket/order_data -  using a specific connection ID for credentials
  • s3://bucket/order_data - using the default connection ID for this type. (Note: hooks don't currently have a concept of default connection IDs – that's a property of operators, so this information would be part of the new Dataset feature being added to providers.)
  • azure-data-lake://container-name/path  - an example of a connection type that uses undersores (The connection type is azure_data_lake)
  • file:///path/to/file/or/folder  - A "special case" for just storing files locally. This is a useful/key component in the "testing out airflow on a laptop" for a new user
  • snowflake://db/table  - Referring to a table in a Snowflake database

(Strictly the scheme part of the URL doesn't have to match the same form used for Connections, but that would be confusing so we will use the same prefix here)

These above URLs are examples and this is not intended to be a complete list.

Data driven DAG scheduling

As outlined in the background and motivation section, it is critical for certain data pipelines to know that a source Dataset has changed and only then be run. 

This changes in the source Dataset should trigger follow-on DAGs. Today, this is typically accomplished by having a FileSensor wait for the arrival of a file in a known location. 

Having an explicit Dataset created in an earlier DAG and then explicitly referenced asa dependency in subsequent DAGs can enable this behavior directly within Airflow, rather than requiring the DAG author to write code within the DAG to poll for the file arrival and so on. 

User Interface

Having explicit Dataset and data dependency definitions within Tasks, also enables a better visual representation in the Airflow UI, leading to a clearly visible dependency path across DAGs based on data propagation, rather than being implicit and only visible by doing a code walkthrough.

The approach of using URIs for datasets does lead to the possibility of typos causing hard to debug issues (such as "why isn't my DAG ever triggering?"). We can't reasonably make an unknown dataset be an error, as it's possible that the DAG that defines that dataset just hasn't been parsed yet, so at most this would be a warning.

We will show such a warning ("This DAG depends on dataset X that Airflow hasn't seen a producer for" etc.). One would be on any of the DAG detail pages itself, and the second would be on this new Dataset dependency view where we would highlight Datasets that have consumers but no producers.

When is a dataset updated?

Since Airflow is not responsible for updating the dataset (Airflow is data-aware, but still data agnostic), and we are deliberately not implementing polling of changes in this AIP (see future work section) Airflow only marks a Dataset as updated when a task finishes sucesfully.

If a task might only conditionally update a dataset, that task should finished with another state – i.e. SKIPPED.

When is a DagRun created?

A DAG defines its dataset dependencies through parameter schedule .  This parameter takes a list of Dataset objects. Once all of the datasets have been updated at least once, a DagRun will be created.  Then the database will start tracking new updates again in preparation for the next dag run.

Example DAG code

Taskflow Example

TaskFlow example
import pandas as pd

from airflow import dag, Dataset


dataset = Dataset("s3://s3_default@some_bucket/order_data")
@dag
def my_dag():

    @dag.task(outlets=[dataset])
    def producer():
		# What this task actually does doesn't matter to Airflow, the simple act of running to SUCCESS means the dataset
		# is updated, and downstream dags will get triggered
        ...


dataset = Dataset("s3://s3_default@some_bucket/order_data")
@dag(schedule=[dataset])
def consuming_dag():
    @dag.task
    def consumer(uri):
		df = pandas.read_from_s3(uri)
        print(f" Dataset had {df.count()} rows")

    consumer(df=ref.uri)

Classic operator example

It is also possible to "annotate" an operator to say that it updates/produces a dataset, even if Airflow itself isn't responsible for moving the data around

with DAG(
        dag_id='example_databricks_operator',
        start_date=datetime(2021, 1, 1),
        catchup=False,
) as dag:
    new_cluster = {
        'spark_version': '2.1.0-db3-scala2.11',
        'node_type_id': 'r3.xlarge',
        'aws_attributes': {
            'availability': 'ON_DEMAND'
        },
        'num_workers': 8,
    }

    notebook_task_params = {
        'new_cluster': new_cluster,
        'notebook_task': {
            'notebook_path': '/Users/airflow@example.com/PrepareData',
        },
    }
    notebook_task = DatabricksSubmitRunOperator(
        task_id='notebook_task',
        json=notebook_task_params,
        # This isn't required for the task to run, but by is required for Airflow to
        # know that this task produces a given dataset.
        outlets=[Dataset("s3://s3_default@bucket/order_data")],
    )

Adding the "outlets" section to an Operator is currently required for triggering of downstream datasets, but see the future work section for a bit that would this optional for scheduling (but still required for accurate dataset view in the UI)

We "reuse" the (half-formed) inlets  and outlets  attributes from the existing Lineage interface.

DB Changes

In order to have the "data-aware scheduling" we will create a new table in the database so we can lookup the list of DAGs that are scheduled/triggered based on a given dataset URI. We have chosen a new table rather than adding a column to the existing "dag" or "serialized_dag" tables as we want to allow DAG to depend on multiple datasets, so we need a many-to-many list that is easily and efficiently searchable in the scheduler.

The tables we will likely add are: datasets (uri, last_updated, producing_dag_id) and dag_dataset_dependency (uri, dag_id)

See Future Work section for more detail about a richer async/poll based approach. We have stayed with a simple approach for now as it gets us quickly to the core foundation of data-aware scheduling goal.

Future work

This AIP builds the groundwork for making Airflow data aware and is purposefully tight in scope . Here are some of the ideas we have already thought about (and stripped out of this AIP) for follow on work.

Lifecycle management and automatic garbage collection of old Datasets

An explicit problem with XCom today is cleanup of old data. Especially as Datasets grow larger, automatic cleanup of old data is necessary for a well run data system. However, it is worth pointing out that this cleanup is optional since there are many situations where Datasets may be kept around for extended time to enable auditing. 

To solve this problem, we would like to introduce the concept of "Availability Scope" for a Dataset. Whether it is:

  • Within a specific DagRun, or
  • Globally across all DAGs in the Airflow deployment

Like any data handling within a programming language, this element has obvious implications. The analogy is that of a variable defined within a class or within a method inside a class or on the heap and passed by reference across all classes and methods within the process. 

This is purely an analogy and should not be mapped literally to an Airflow DAG or an Airflow Task Group. Since Tasks within Airflow can run on different nodes and will always run as different processes, all data references in Airflow need to be done at an inter-process level, but from the perspective of reference passing and lifecycle management, the analogy is useful.

With explicit definitions around scope of availability of data as detailed here and explicit references to Datasets, it is now possible to use standard computer science techniques around garbage collection for deleting obsolete data objects automatically.

Ideally, this clean-up would be done automatically by Airflow. To solve this, we would like to introduce the notion of Setup and Teardown tasks within Airflow. Specifically, an example of a Teardown task would be a "Garbage Collector" which would be responsible for cleanup of temporary Datasets. However, this would be run on the Workers as a "non-user visible task", as opposed to the SLA callbacks which are currently run in the Scheduler / DAG Processor space. This will be however be the subject of a follow-on AIP as getting Workers (and Executors) to understand non-Task work is non-trivial and is not required for this core proposal. 

Dataset Names

Instead of using the URI mechanism for Dataset references, we considered giving Dataset names and hiding the URIs for better abstraction and changes over time, but decided to defer it. Future work could be to integrate with something like a (Hive) metastore, so that we can give Datasets names without having to implement our own store.

Polling of datasets for changes

The "data based scheduling" currently only works when the same Airflow instance is responsible for both the producing and consuming ends. One future enhancement would be to add the ability to "poll" for changes to the dataset providers. Ideally this would be done using asyncio, so this might involve "extending"/re-purposing the Triggerer process and existing async capabilities in Airflow. That would be its own large AIP opening possibilities for even more “Event Based Scheduling”, hence why we didn’t include it in this one.

Schedule on Dataset change and timetable

It would be possible that someone might want a DAG to run both on a regular schedule, and whenever an input dataset changes. This may or may not be in this AIP depending on how difficult it is to implement

Data Dependency Trigger Rules for multiple input DataSets

In a similar manner to how a Task has trigger_rules for changing when it can run based on different possible states of upstream tasks, we would like to add different "modes" for scheduling a DAG when it has multiple input datasets. The two modes that come to bind off the bat are "all changed" (to only trigger the run once all input datasets have updated) and "any changed" so trigger a run when any one dataset has changed. (The default behaviour, and the only behaviour implemented in this AIP would be "all changed")

Dataset based SLAs

The core Dataset concept is in place as part of this AIP, but there isn't yet any support for enhanced SLAs based on Datasets. This is a foundational AIP and the support for SLAs specifically around data timeliness will be the focus of a follow-on AIP. 


12 Comments

  1. There's a lot of overlap with https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md in the dataset URI scheme – we should unifiy.

  2. Why isn't the output of the producer used as the definition of a dataset? So it allows multiple outputs

    from airflow import dag, DataFrame


    def producer() → DataFrame, DataFrame:

            df = df ({
                "Name": [
                    "Braund, Mr. Owen Harris",
                    "Allen, Mr. William Henry",
                    "Bonnell, Miss. Elizabeth",
                ],
                "Age": [22, 35, 58],
                "Sex": ["male", "male", "female"],
            })

                      df.publish(uri, retention, security)

            df2 = df ({"Country": "Netherlands"})

                      df2.publish(uri, retention, security)


    This way it ties in to the sql-decorator / python-api much closer. 

    Furthermore what is entirely missing from this AIP is how this integrates with the scheduler. The schedule needs to scan for changes. What happens if a change to the dataset is made outside of Airflow?

    Next that, the DatasetProvider interface seems way too opinionated. It seems much more logical to have a FileSystem abstraction first and then to have a Data Layer on top of that, like a DataFrame or a Table or an Ordinary File. This allows someone to manipulate outputs in a much more pythonic way.

    Polling across instances also seems to be an outdated concept. If you can publish a dataset, the other Airflow instance can trigger it.

    Retention requirements should be set at the dataset. This would enable decoupling 'setup/teardown' from a DAG and would allow to have a Garbage Collector running.

    I suggest splitting this AIP (or having these items addressed in this AIP):

    1. FileSystem abstraction
    2. Data Awareness
    3. Retention and garbage collection
    4. Data Driven Scheduling 



    1. Bolke de Bruin what do you think if we extend the data driven scheduling  to event driven scheduling  and make data  as a type of event.

  3. How would we handle a scenario where a task can update a Dataset, but doesn't always do so?

  4. As scheduler will be aware of dataset modifications, it would be great if this AIP contained notification mechanism similar to one added here: https://github.com/apache/airflow/pull/20443 - it would be great for OpenLineage integration, especially if Dataset URIs will be unified.

  5. @dag(schedule_on=dataset)
    can it take a list of datasets?
    1. nvm, it does have more explanation in the next section


  6. This is a great AIP, making the DAGs dataset aware. I am wondering if we can extend this AIP to make DAG as event driven schedulable and we can treat the arrival of datasets as a type of event.


    With the event driven scheduling, we can extend the airflow scheduler a lot and enable more use cases.

  7. dataset = Dataset("s3://s3_default@some_bucket/order_data")

    can users specify something like ds  in the dataset uri? the example shows the dataset uri is quite static, in the real world, users seldom override their data with the same uri.
  8. The idea of easing an option to schedule DAGs on an arbitrary event like update of the dataset sounds great!

    I have a few concerns about the name of the concept that is described as a "Dataset" in this AIP. When I think of the Dataset I imagine a concrete chunk of data that is persisted in a particular place. The way scheduling is described here assumes that it is the update of the data under the URI that schedules a DAG. But as Ping Zhang highlighted, the regularly produced data is often immutable - instead a new dataset is produced every time. This means that the teams that would like to operate with this schema will have to reference some "collection of datasets" in their Dataset object rather than a real dataset (e.g. `uri="gs://other-teams-data/datasets"` while the real datasets are named  `uri="gs://other-teams-data/datasets/test_YYYY_MM_DD"`), which makes usability of the uri parameter of the dataset limited. Furthermore, I can imagine the situation where no data is stored at all and yet fall nicely into the idea of data-driven scheduling. For example, it could be a formal approval to start the pipeline, ticket creation, notification or pretty much any arbitrary event that can can be created by DAG task. In other words, it seems to me that the datasets in this proposal are closer to the concept of "event" or "observable" rather than "dataset" with URI parameter being just a name.

    If we proceed with the term of "dataset", it maybe worth to include in a Dataset object a name, URI and other metadata separately. Maybe allowing to pass additional metadata for every update of the dataset could handle the problem of having a new actual dataset created for every run (instead of updating exiting one) which was described in previous paragraph. But then comes the question of how extensible Dataset object can be - is it going to be another fully customizable Airflow concept similar to the Timetables/Operators or is it going to be more tightly integrated into the framework and had a more fixed nature? 

    Also it would be great to see a draft of what kind of scheduling conditions are supported. If my DAG merges two databases, can I create one that runs when at least 1 of them updated (OR condition)? Or the one that waits for both to be updated (AND condition)?

    1. The way scheduling is described here assumes that it is the update of the data under the URI that schedules a DAG. But as Ping Zhang highlighted, the regularly produced data is often immutable - instead a new dataset is produced every time. This means that the teams that would like to operate with this schema will have to reference some "collection of datasets" in their Dataset object rather than a real dataset (e.g. `uri="gs://other-teams-data/datasets"` while the real datasets are named  `uri="gs://other-teams-data/datasets/test_YYYY_MM_DD"`),

      So the difference here is that I see the data set as being updated when the next data becomes available – i.e. The gs://other-teams-data/orderdata dataset is updated when a DAG writes the new data in to gs://other-teams-data/orderdata/test_YYYY_MM_DD – conceptually the dataset is "order data" not "order data for Week 20".


      Furthermore, I can imagine the situation where no data is stored at all and yet fall nicely into the idea of data-driven scheduling. For example, it could be a formal approval to start the pipeline, ticket creation, notification or pretty much any arbitrary event that can can be created by DAG task. In other words, it seems to me that the datasets in this proposal are closer to the concept of "event" or "observable" rather than "dataset" with URI parameter being just a name

      Yes that is useful event driven scheduling, but that is purposefully not what this AIP is about – it is the foundational work for data driven scheduling that we can build lots of richer features on top of. (General event driven is separate, if somewhat related.)

      Also it would be great to see a draft of what kind of scheduling conditions are supported. If my DAG merges two databases, can I create one that runs when at least 1 of them updated (OR condition)? Or the one that waits for both to be updated (AND condition)?

      See "Data Dependency Trigger Rules for multiple input DataSets" section of future work

      1. So the difference here is that I see the data set as being updated when the next data becomes available – i.e. The gs://other-teams-data/orderdata dataset is updated when a DAG writes the new data in to gs://other-teams-data/orderdata/test_YYYY_MM_DD – conceptually the dataset is "order data" not "order data for Week 20".

        I assume that the logic of processing this/these new file(s) would be on the consumer dag ? For instance if only the data of the latest day (gs://other-teams-data/orderdata/test_YYYY_MM_DD) needs to be analyzed I assume the consumer DAG will rely on the execution_date/run_id to select the right file to process ?

        If so how do we handle manual runs in the past when some data needs to be fixed ? Because the producer would notify that the dataset was updated, but I do not see a way to tell which data. In this case it would trigger a new run of the consumer DAGs, but those would not know what to process (in the case only a subset needs to be processed as mentioned above like daily data). Please let me know if I got it wrong.