Status

This AIP is part of AIP-73, which aims to expand data awareness within Airflow.

Proposed Changes

We propose extending the taskflow API to include asset-centric primitives to improve the interface for implementing workflows for asset-centric tools. In addition to attaching Asset annotations to a task-based workflow, as described in AIP-74, an asset can be defined directly in a file with a decorated Python function:

# The "uri" argument is optional but strongly encouraged.
# The asset's name defaults to the function name if none is explicitly given.
@asset(uri="s3://aws_conn_id@bucket/raw_bus_trips.parquet")
def raw_bus_trips():
    # Write bus trips data to asset...

The asset is at the same level as a DAG and defined by the function. Unlike @task and @dag, you do not need to call the decorated function in a file to instantiate it. The definition itself creates the asset. Calling the asset as a function results in a parse-time error.

Internally, creating an asset this way would create:

  • A DAG with dag_id raw_bus_trips
  • One single task in the DAG for the declared Python function
  • Asset with name raw_bus_trips (and the specified URI, if given), set as the task’s outlet

The @asset decorator therefore need to accept all arguments on DAG (with obvious exceptions such as dag_id, and things that don’t make sense like default_args) and @task (again with exceptions along similar lines).

An optional uri argument is also accepted as the asset URI (see AIP-74). We do not propose to actually handle writing to and reading from the URI—that is left as a future work at the moment—the user still needs to do that manually in the function.

All assets in an Airflow deployment share one single namespace. This is also the same namespace for DAGs to avoid confusion. Different assets can reference the same URI to support writing to the same data target from two functions—not a best but unfortunately common practice.

A couple of names are reserved and cannot be used as asset names. Declaring an asset with one of those names results in a parse-time error:

  • context references the execution context provided by Airflow
  • self references the “current” asset (see below)

Reference an Asset

An asset can be referenced in another asset’s function by passing its name as a named function argument. 

@asset(uri="snowflake://.../bus_trips")
def aggregated_bus_trips(raw_bus_trips):
    # Write aggregated bus trips to asset...

This is like setting inlet to a task, and does not control when the referencing asset is run. In the above example, aggregated_bus_trips can run regardless of the state of raw_bus_trips. If you want the referencing asset to depend on the referenced, the two must have a scheduling relationship. See Schedules below for details.

It is also possible for an asset to reference itself inside the function. This is useful for e.g. retrieving the declared URI for other uses to avoid code duplication.

@asset(uri="snowflake://.../bus_trips")
def aggregated_bus_trips(self, *, raw_bus_trips):
   conn_uri, _, view_name = self.uri.rpartition("/")
   connection = ibis.connect(conn_uri) # Connect to Snowflake!
  # Write aggregated bus trips to the view using connection...

We can provide a better interface than rpartition to parse the URI, but that’s out of this API’s scope.

Multi-Assets

There are cases where the same function may generate multiple assets, such as when generating training and testing datasets for ML. This isn’t necessary for most situations, but is a valid use case (like an upstream asset needing to be split into two). To accommodate, we will allow users to pass in parameters for multiple assets. Some parameters will be unique per asset and some, like schedules or partitions (see below) will be shared.

@asset.multi(
  outlets=[
      Asset(name="bus_trip_1", uri="s3://../bus_trip_1.parquet"),
      Asset(name="bus_trip_2", uri="s3://../bus_trip_2.parquet"),
  ],
)
def split_trips(aggregated_bus_trips):
  ...

Schedules

The schedule of an asset denotes when the asset is written to. If the writing is done by a process managed by the pipeline, the schedule reflects when the pipeline kicks off the process to do the writing. Similar to today, you will also be able to set time-based schedules and/or schedule assets to be generated whenever upstream assets are generated. 

@asset(..., schedule="@yearly")
def asset1():  # Write happens when a new calendar year is entered.
    ...

@asset(..., schedule=asset1)
def asset2():  # Write happens whenever asset1’s write completes.
    ...
@asset(..., schedule=asset1)
def asset3():  # Runs parallel to asset 2.
    ...

@asset(..., schedule=asset2 | asset3)
def asset4():  # Write happens whenever EITHER asset2 or asset3 finishes.
  ...

The key difference is, since we are decoupling partitioning from scheduling, the schedule parameter no longer controls the interval, i.e. not designed around the logical/execution date. It simply controls when the next round should happen.

It is expected that scheduling of non-asset workflows (DAGs) will also be changed in a similar way to match the behavior for assets. Existing operators must be reviewed to ensure they account for the new scheduling semantic, but we should provide a transition interface to assist rewrites and better allow providers to develop a implementation both compatible to 2 and 3. The timetable protocol will also require new methods to implement the new semantic, but both old and new should be able to both implemented on the same class, with each major version only calling methods its uses.

Partitions

At a high-level, partitions are “slices” of data assets that can be tracked and computed independently. They typically correspond to separate files or slices of tables, and are especially useful for modeling data that is appended or modified when running incremental load processes. When an asset is partitioned, each materialization of it only updates a part of the target, e.g. appends data newly generated during the time period covered by the partition. 

@asset(
  ...,
  schedule="@hourly",
  partition=PartitionByInternal("@hourly"),  # Each materialization covers an hour.
)
def hourly_data():
    ...

The partition concept and interface is covered by AIP-76.

User Presentation Considerations

Since an @asset declaration still creates a regular DAG and a regular asset, it should be displayed appropriately without additional work in the web UI, assuming AIP-74 implementation. This is arguably not optimal since the DAG view expects more than one task in the DAG, but it should be enough as the initial implementation.

Some terminologies need to change in the web UI, and similarly in API and CLI. Aside from the obvious Dataset references (just need to be renamed to say asset), we also need to change things like airflow dags and airflow tasks. Since it is difficult to come up with a unified name for both classic DAGs/tasks and assets, we propose to create a new airflow assets sub-command to handle just assets. The new airflow assets sub-command will have asset-specific sub-commands, such as materialize, aside from common ones like list. Commands like airflow dags will still be able to access asset-backing DAGs. This should be fine since assets still use the same dag_id namespace.

The same ideas apply for the REST API.

Effects on Terminology

While the @asset syntax still creates a DAG behind the scenes, a user using the decorator would no longer be exposed directly to the DAG construct. We’ve addressed the issue somewhat in the previous section by separating DAGs and assets, but there are terms that cannot be separated, specifically DAG files, DAG folders, and some in-the-work proposals such as AIP-66 DAG bundles. In many cases, we’d also want to have a name for DAGs and assets collectively in discussions since they logically do similar thing in some aspects. One possible name for them is Definitions, so we’d have definition files, definition folders, definition bundles, definition processors, etc.

Compatibility and Migration

Since this is a strictly new syntax that is implemented with existing constructs, there should be no compatibility issues since existing code does not change behaviour at all. A user may always choose to not use the new syntax.

For cases where users do want to migrate existing DAGs to the new syntax, documents and tutorials should be created to walk through the process to convert existing constructs to their equivalents (either functionally or logically). The Airflow documentation should include a comparison between classic DAGs and @asset-based workflows. Other documents, such as walk-throughs, can be created as blog posts or workshop slices, etc.

Better UI display for @asset

As mentioned above, using the existing DAG view to display assets is not optimal, since an asset generally have very few, or even just one task body inside. We should also try to improve the Home view, which currently displays a list of DAGs, and can be difficult to read with a lot of assets—each is a row in the table—with dependencies between them—which cannot be displayed with a table.

One possibility is to improve on the current Datasets view, and make that the Home view instead. The Datasets view can gain more sorting options for users to arrange things (including assets and classic DAGs) in them according to various needs, and access information on execution etc. more easily. We need to keep an eye with AIP-38 and coordinate the efforts to align with this.

Scope Definition

This AIP is considered complete when the @asset decorator is implemented and can fulfill the above features, and accompanying non-code materials are created. Scheduling and partitioning support not related to assets, such as partition support in DAGs, is not a part of this AIP.

Future Work

Attaching tasks to an asset

While having one task per asset covers most use cases, it is sometimes needed to create code blocks for additional work not strictly tied to the asset’s generation. This is especially true for setup and teardown tasks. We should therefore have a way for users to attach additional tasks either before or after the “main” asset generation task. The implementation should be straightforward since an asset function is internally just a DAG, but the interface may require additional design thoughts. This is deferred to a later release since it is strictly an addition after the @asset syntax is implemented.

Automatically read from/write to an asset

As mentioned above, we currently do not provide any special mechnism to read from or write to the location specified by the asset. It is already possible to access asset information inside the function to do the reading and writing. The natural next step is to encapsulate the actual access operation altogether inside the asset, so we can do something like

@asset(...)
def foo(bar):
df = bar.read_dataframe() # Read from asset "bar".
# ... do something ...
return df # Write to asset "foo".

Since reading and writing data requires a lot of abstraction work to properly consider the myriad of cases in the real world, we decide to put this semantic off until later so we can properly discuss the feature requirements.

Model(PostgresLocation(...))
load_from_postgres(type=Model)
PostgresModel(...)