This AIP is part of AIP-73, which aims to expand data awareness within Airflow.

Rationale

A data asset is, fundamentally, a collection of logically related data. This can be one of (but not limited to):

  • A table in a relational database
  • An persisted ML model
  • An embedded dashboard or report
  • A bunch of tables grouped by a naming convention sharing a near-identical structure
  • A directory containing CSV files

From a programmatic standpoint, we will define assets as entities within a function that are generated when that function is called and executed. Assets can be created as one-off events, but more commonly are updated somewhat periodically, either indefinitely or until a set point of time. Each write can either:

  • Replace data in its entirety (for example by deleting and recreating a table)
  • Incrementally append data to an existing asset (for example by inserting or upserting DB records into an existing table or by creating a new file in a directory), or
  • Publish a new iteration of the asset (for example by publishing a new ML model version or creating and sending a new iteration of regulatory report).

While it’s not necessary for a pipeline to understand this distinction, similar to how version control systems consider diffs, it is important for pipelines to understand what each task execution changes within an asset in order for Airflow to highlight impacts on other dependent assets and recreating those writes retroactively (i.e backfilling, discussed later).

Proposed Changes

Rename Datasets to Assets

Taking inspiration from other tooling like Great Expectations, Atlan and Dagster, we propose to rename Datasets to Assets, and potentially introduce subtypes. The term “asset” is more generalizable, and can be used to represent other data products like serialized ML models.

This opens the door to us adding different types of assets beyond datasets, like ML models. ML practitioners use datasets AND serialized models as part of their ML pipelines, and while there is nothing that is stopping ML practitioners from representing both as “datasets” in Airflow today, it is uncomfortable to refer to models as datasets when they are distinct concept, and makes it hard to distinguish between tasks updating ML models and tasks updating datasets in the UI and in the code. We can also add specific attributes for different asset types. For example, most datasets are semi-structured and in a tabular format, so it is reasonable to assume that datasets have schemas with column names and types.

In user-facing interfaces, the term Dataset will be collectively changed to Asset. The current Datasets view in the web UI, for example, will be renamed to the Assets view.

The Asset Class

Similar to how Dataset works, an asset can be declared by instantiating an Asset object. One difference from Dataset is that Asset will have an additional name argument. If given, this must be unique in the Airflow deployment, and is used to represent the asset in user-facing interfaces, e.g. the web UI. Both name and uri can be given, but if one of them is omitted, its value should be inferred from the other. At least one of the arguments must be provided; they cannot both be absent. For convenience, the uri argument will also accept an ObjectStoragePath to be automatically coerced into a URI string.

The Asset class will also have a group attribute that can be used to identify multiple assets are of a kind. This is just a string and can be given anything the user sees appropriate. Airflow only uses it toe categorize assets in user-facing interfaces. In the web UI’s Assets view, say, we can group assets together by this, so it’s easier to visually parse the graph.

In ML use cases, for example, a user may choose to assign assets to be in the model (something trained against data) or dataset (things to train models against, and/or receive from feeding data through a model) group. Something like:

my_model_1 = Asset(name="my_model_1", group="model")
past_info_1 = Asset(name="past_info_1", group="dataset")

@task(inlets=[my_model_1, past_info_1], outlets=[Asset(name="prediction_1")])
def predict_1():
# Use the model and available data to make a prediction...

Say there’s a bunch of these tasks, we can then “sort” in the Datasets view by type, and put models and datasets in different groups, to more easily find what we want. Something like:

Asset Subclasses

Various subclasses will be implemented to provide additional arguments for structured information. This also sets the group attribute on the asset automatically. For example, we should have subclasses Dataset and Model for the ML use case mentioned above, and they’d automatically be put in groups dataset and model, respectively. So instead of using Asset manually, you can just do

my_model_1 = Model(name="my_model_1")
past_info_1 = Dataset(name="past_info_1")

If you wondering—yes, we are introducing the Dataset class back under a different semantic. This is sort of intentional; with this addition, existing DAGs can be rewritten in the most minimal way possible (just one line to change the import) to continue to function, albeit in a slightly different semantic. This is especially possible since almost everything in assets is optional anyway (if you’ve given a URI, which you must in 2.x datasets in the first place). The user may wish to “fix” the DAG to have the correct semantic if they so wish, but if they don’t (and I imagine many won’t, especially in corporate use cases), that’s totally OK too.

Another thing subclasses can do is to colour-code the assets. This is similar, but fundamentally very different, from operator colours—which was not that useful if you ask for my honest opinion—in that the group colour identifies what a thing logically is (useful!), instead of how the thing is implemented (not very useful).

Compatibility and Migration

People using the existing Dataset class are expected to rewrite usages to use the new Asset class. If the goal is to rewrite as little code as possible, they may choose to use the new Dataset class instead, in which case only the import line needs changing. The new classes will likely reside in airflow.assets instead of airflow.datasets (Airflow 2 import path). This should be easily covered by an automated tool.

If we are to have a trasitional Airflow 2.11, we can consider aliasing airflow.assets to assist migration. Users can change the import path while still on Airflow 2, and migrate to Airflow 3 without needing to change any code.

Scope Definition

This AIP will be considered done when the proposed rename is made, and additional arguments added. The web UI should be adjusted to accomodate the type-grouping feature. The Asset class should be designed in a way that users can freely create subclasses of it, and expect the correct thing to happen regarding the type parameter in the UI.

A few subclasses may be added with the initial implementation that represent cases we feel are common enough to illustrate how they can be used, but they are not considered a requirement for the AIP to be done.

Future Work

Asset Annotations

An additional benefit of using subclasses is we can add additional parameters to a specific asset. The parameters can provide annotations with a structure common to the asset type, as an alternative to the free-form extra dict. This improves how information about the asset can be shown in the UI; for example, since ML datasets are almost always tabular data, we can allow specifying the columns in the subclass:

aggregated_sales = Dataset("aggregated_sales", columns=["sales"])

This information can be used in the UI to show (a sliced view of) the actual table like a Pandas DataFrame in Jupyter Notebooks.

This is considered follow-up since its implementation is strictly incremental on asset subclasses once the foundation is laid out. It is perfectly fine for us to release the subclasses first to the public, and only add structured annotation fields later, after those subclasses see real-world use and we get feedback on what kind of data people tend to put in them.

5 Comments

  1. Consider the renaming is a breaking change, will proxy classes for Dataset be created as alias to Asset in order to minimize need for rework for existing DAGs? Or would you prefer to "break" existing dataset based DAGs?

    1. Yeah I think we should do something like that. Simply making the class a proxy seems lame considering we only have one chance in ten years to remove things, so we’ll probably change the semantic just a bit so existing code still work (and can stay unchanged if the user feels that’s good enough) but with a slightly different semantic. We’re thinking instead of a proxy it might be a subclass to Asset and also implicitly set type="dataset".

      1. Do you think we need to enforce a different import path? Would it be a technical limitation if a Proxy is still existing under the old name mapping to the new hierarchy? I would see a benefit that zero re-write would be needed.

        1. Yeah, tbh an extra import path for us to maintain is a 0 cost one. We can keep it so DAG Authors don't need to change their dag files

          1. airflow.datasets will not exist after this AIP (renamed to airflow.assets) otherwise, so maintenance is not exactly zero. It is very minimal though, especially if we do it in airflow/__init__.py instead of creating an otherwise empty package just for compatibility.