Status

This AIP is part of AIP-63, which aims to add DAG Versioning to Airflow.

This AIP supersedes AIP-5 and AIP-20.

Motivation

Today, Airflow will always execute tasks using the latest DAG code. While in some cases this may be what users want, in others, it may be desired to complete a whole DAG run on a single version of a DAGs code. Or, when clearing tasks in older runs, allow that task to run on the DAG code used at the time of the original run.

We also continuously parse DAG files, as we have no way of knowing if the DAGs from that file could have changed. This is clearly inefficient and means you either wait a potentially long time for changes to be reflected or burn a ton of resources needlessly. More control around parsing behavior is desirable.

Goals:

  • Allow tasks to run using a specific version of DAG code
  • Allow more control of Airflow parsing behavior
  • DAG code can come from many sources

In order to support this, we will introduce the following concepts:

  • DAG Bundle - a collection of DAGs and other files. Think of today's DAG folder.
  • DAG Bundle Manifest - metadata about the DAG bundle.

DAG Bundles

Airflow will support one or more DAG Bundles (a collection of DAGs and other files, like today's DAG folder). They will support versioning, which allows Airflow to control the version of the DAG code used for a given task try. This means that a DAG run could continue running on the same DAG code for the entire run, even if the DAG is changed mid-way through, as the worker can retrieve a specific DAG bundle when running a task.

This will require Airflow to support a different way of retrieving DAGs - it can no longer simply expect to find DAGs on local disk. This will be done in a pluggable way with DAG bundle backends and optional versioning support so the ecosystem can evolve as time goes on.

By default, DAG runs will execute on the same DAG bundle for the whole run, assuming the DAG file backend supports versioning. However, DAGs can opt to continue the existing behavior of running on the latest DAG code instead.

Any file can be placed in a DAG Bundle and it will be versioned (if the DAG bundle backend supports versioning, of course). Imagine a yaml config file, for example.

Pluggable DAG bundle backends

DAG bundle backends will allow Airflow to retrieve DAG bundles at a given version at any point in time. Since it’s common that DAGs are not simply contained in a single file, we require a way to version a whole set of files simultaneously. How this is accomplished will depend on the backend. Airflow will only operate on complete DAG bundles - it will never attempt to identify or fetch a subset of a DAG bundle.

Backends are encouraged to implement local caching in order to reduce the impact and/or reliance on external systems where possible. The exact mechanism, and whether it will fall on the backend or Airflow itself will be determined during implementation.

This does introduce the need for potentially large amounts of temp disk, but where Airflow places these versioned bundles will be configurable and controlled by the deployment manager as appropriate.

Possible backends

We may not build all of the following backends for Airflow 3.0, and there may be multiple variants for each technology, but this gives you a good idea of what we are thinking and what is possible.

git

Git already versions everything in the repository, so we can simply use a commit hash or tag.

The backend will support both moving branches (e.g. tracking main) and a specific tag/commit that is updated in the DAG bundle backend configuration. This allows for flexibility of deployment strategies.

Blob storage

Versioning support for blob storage systems varies. Some support versioning at the object level, while others support no versioning at all. It is still possible to write a backend that supports versioning, but Airflow will only keep track of a single bundle version, and bridging that gap is up to the backend to work out. For example, you could write a backend that will grab a zipped bundle, giving both the backend and Airflow a single object to version. There are alternative approaches as well (e.g. keeping a “manifest” - not to be confused with the manifest from this AIP - that tracks versions per object), but Airflow itself is shielded from the complexity of this.

Local Disk

Not every environment will need DAGs from an external system (imagine a local development environment or when you bake DAGs into an image). To support such use cases, the existing DAG directory will be exposed via a DAG bundle backend, but one that does not support versioning. Airflow will operate under the “latest only” paradigm to support this.

The local disk backend is something we will build for Airflow 3.0 in order to have a robust local development experience.

Configuring DAG bundle backends

DAG bundle backend configuration will be stored in the Airflow db - this allows for configuring them via the Airflow UI, API, or CLI. We will also support importing from json, allowing for easy provisioning by deployment managers.

By default, the local disk backend pointing at today's DAG directory will be the only configured backend.

There will be no restriction on the number of DAG bundle backends that can be configured.

Parsing

Instead of constantly trying to parse DAG files, we will add support for a manifest file in DAG Bundles. This allows Airflow to support more flexible parsing rules, which will allow users to tell us things like:

  • Whether we need to scan for files containing DAGs in any parts of the DAG folder structure.
  • How often we should scan for those files.
  • Or, a list of files we should parse.
  • How often files should be parsed (basically `min_file_process_interval`, but at a folder/file level), or if they should only be parsed if the files checksum / mtime changes.

For example, a manifest could look something like this:

# /manifest.yaml

directories_to_scan:
  - path: /glacier/
    scan_interval: 1 day
    min_file_parse_interval: 1 day
  - path: /fast/
    scan_interval: 10 minutes
    min_file_parse_interval: 1 minute
  - path: /static/
    scan_interval: never
    min_file_parse_interval: on change

dag_files:
  - path: /never_scanned/mydag.py
    min_file_parse_interval: on change

bundle_fetch_interval: 5 minutes

We will nail down the exact functionality that will be supported and the file structure during implementation.

A few notes:

  • Manifests will be optional.
  • Parsing will always happen on the latest version of the DAG bundles (and the execution environment configured by the manifest), as is the case today.
  • Today’s `dag_dir_list_interval` will be replaced with a new configuration option to control how often we look for new versions of a DAG bundle. We will also have UI/API/CLI support for “refreshing” on demand.
  • `.airflowignore` will still be supported.

Scheduler

The scheduler, when creating new DAG runs, will always use the latest DAG version.

While scheduling tasks, the scheduler will specify the DAG bundle version when sending the task to the executor, so that the task can run on that specific DAG bundle version.

Worker

The executor will tell the worker what version of a DAG bundle a task needs to run against. The worker can then use the DAG bundle backend to get the DAG bundle and execute the task.

If the worker is capable of running more than one task (e.g. LocalExecutor worker or CeleryExecutor worker), the worker can store the bundle on temp disk and use it for multiple tasks. These can even be kept around even if no task is currently using that version of the dag bundle.

Packaging (Where do DAG Bundle Backends live?)

The packaging decisions (e.g. where will the out-of-the-box DAG bundle backends live) will be deferred until we have a better understanding of how AIP-72 will be packaged and delivered to users. We will have to work closely with the AIP-72 folks to nail this aspect of our AIPs down, as they have some overlap.

Phases

Phase 1: DAG Bundle Backend framework and initial DAG Bundle Backends
Phase 2: Parser / Manifest changes and integrating versioning into AIP-72’s task execution interface

You may be surprised to see that Connections, Variables, and Plugins are not mentioned. They are out of scope for now.

Migration Considerations

There are no direct breaking changes, unless deployment managers or authorized users decide to remove the local dag bundle backend that will be configured by default.


16 Comments

  1. What if we bring the idea of Prefect Deployments? This has a bit overhead (or huge, I never count TBH) when it starts the flow, because required to copy entire source, which could be also remote, into the temporary directory.

    1. I think that's exactly what "bundle" is - there are differente names. I named them "snapshots" - but concept is the same. Because they should contain (this way or that way obtained) - sources + variables +  other environment - connections,  that we can restore in somewhat reproducible way. And I think "Pluggable DAG bundle" will mean that we will have to find a way to efficiently switch between snapshots/bundles/deployments (whatever you call them) for the execution. What is interesting there is that there are also various "levels" of those snapshots - for example in some cases you might want to have 'almost' the same bundle (for example connections will need to be updated as password changes since the last run and your connections are generally the same +/- credentials. 

      1. Fwiw I prefer "snapshots" over bundle – way more intuitive and conveys the connection to DAG versioning instantly. I was confused about what bundle meant in this context until I read the AIP. 

    2. I've left this intentionally vague. It's much easier to scope it at todays DAG dir only, and is where this draft draws the line now. But Jareks idea of bringing in "everything" is definitely on the table too. It raises questions like "should deployment managers be responsible for managing provider versions, or DAG authors?". Definitely a lot to consider when we get closer to this being a reality.

  2. I very much like this as the major "big thing" of the DAG versioning "bundle" of AIPs!

    I like the abstract way of describing a bundle and especially that you see the "bundle" of files together, which also considers cross-file dependencies for utilities and generator stuff.

    I assume most common backends used for versioning today is GIT. I assume it would be really complex and error prone to try to re-implement an SCM like logic. Also you need the python source files available in a file-system like makker in order to cross `import` refernced files in the python module loader.

    As being biased towards GIT I don't know if you are aware of `git worktree` (https://git-scm.com/docs/git-worktree) which allows a temporary checkout (or many) from one GIT repo. If I think of this ans assume I have a LocalExecutor or CeleryWorker then there might be 16 task slots operating in parallel all on different task versions and potentially different version hashes. So DAG code must be available in the file system in potentially multiple versions in parallel.
    In such cases you could think of if a GIT repo is cloned and available, via `git worktree` the individual versionned file tree could be cloned into a TMP folder prior task execution. Multiple tasks with the same version even might share the file tree to save IO. If the version hash corresponds to GIT commit hash then 90% of the complexity could be pushed down to logic in GIT. And you would have the benefit of 100% traceability to "what code was executed" as well as via the GIT hash you can always reproduce the code view on demand in webserver (if code versions are not kept as blob copies in DB... whereas DB is needed to prevent GIT being available as clone on webserver though).

    1. Yep, basically exactly what I was thinking for git.

      A key part though is these will be pluggable, so all the git specific logic and say versioned s3 bucket specific logic can be isolated, and all "core" Airflow needs to know is ask for version x. The specifics of that interface still needs to be ironed out, but I think it's important to have that flexibility long term.

  3. Like the idea of addressing this huge missing peace in Airflow!


    Have you considered taking it as an opportunity to abstract the way how the DAGs are defined together with the definition of a bundle? For example, this can be achieved by abstracting away how DagBag is populated - for a given bundle it would defer the logic of presenting a DAG of certain version to a pluggable bundle provider. The trivial and default implementation of bundle provider - read /dags directory and parse it. For Git-based - checkout with worktree a version and parse it. For custom backend that doesn't use Python, but for ex. YAMLs to define their workflows - create DAGs from these directly (instead of having a special fake DAG file that would read YAMLs to create DAGs).


    Time and time again I see unsuccessful attempts to contribute visual DAG editors (like the one Pinterest was proposing some time ago) and users implementing their own hacky solutions of not-Python-file-managed DAGs withing Airflow constraints. The right implementation of DAG bundles can reduce the amount of workarounds in these solutions and will be a huge step towards the feasibility of visual DAG editing being a part of core Airflow experience one day.


    Looking forward to having this one polished and shipped!

    1. No, I didn't consider changing that. My initial reaction is that that wouldn't need to be coupled with this proposal either. I see that being more of a "pluggable parser" than a "pluggable source of files" that the parser looks at.

      That said, I do understand the desire for more flexibility in how Airflow discovers and parses DAGs. I'll definitely keep it in mind and make sure we don't make that type of thing harder to achieve.

    2. Igor Kholopov  Yup, the Pluggable dag parser is a good step forward. I was planning to get it for 3.1, but if you want to own it, please go for it (smile) 

      I added it as a line item for 3.1: Airflow 3 Workstreams#Othercandidates.1 – feel free to replace my name with yours on it

  4. Referring to Airflow 3 discussion.  I'd say we should be much bolder than FSSPEC or common abstraction over file system versioning. 

    I think we should consider a MUCH more decoupled architecture for Airflow 3 that might simplify a lot and be even more opinionated in the way how we actually implement versioning internally. If we decide to break with Airflow 2  DAG compatibility, I think we should consider the world where Worker and Triggerer do not need DAG folder access at all (warning) . If we consider "Task execution Isolation" by splitting of DB access and communication, we should also seriously consider sending not only "secrets", but also "DAG code" to the tasks. If we sign and verify the payload that we send to Worker and Triggerer, and if we are able to extract only the files that are needed to parse and execute given DAG (either automatically or via annotations). We should be able to store versions of DAGs locally on Scheduler/ DAG File processor side and send the code over to the task to execute. In this case we do not need to focus on FSSpec, or underlying file system capabiity for worker or triggerer and all the complexity of using a distributed file system as part of the deployment. We could mandate (and be opinionated) on how DAG file processor/scheduler stores the history (here I would side with Jens Scheffler that GIT is a good solution. And we could simply treat is as an internal implementation detail of how scheduler/DAG file processor stores the DAG folder history (we could use GIT for storing the history, indepenently on how DAG files gets delivered to DAG folder available for DAG file processor).

    That would be my bold proposal if we limit DAG versioning to Airflow 3 and break some compatibilities.


    1. Bold indeed. I like the general idea, but my initial reaction is that I'm a bit hesitant of Airflow trying to build the "payload" itself. I think having users (maybe with the help of Airflow somehow) build the "payload" themselves makes for a much clearer line. Say in docker "create a container with this image" vs "create a container from this single Dockerfile in a sea of Dockerfiles" (not a great analogy, but yeah). I think one benefit of that is, coupled with "clear interface for Task Execution", it opens the door for different DAGs to use different dependencies too.

      Anyways, lots to think through for this now that breaking changes are possible.

      1. > Anyways, lots to think through for this now that breaking changes are possible.

        Precisely. I think (and I had the same "revelation" during the AIP-67 multi-team discussions and voting) that IF we allow for breaking changes, then we can make very different decisions and make a number of simplifications or abstractions that would not be possible otherwise but would be way more future-proof. That's why I really started the discussion on Airlflow 3 because it might be a HUGE difference if we decide this one to be part of Airflow 2 or Airflow 3 or both.

  5. I think generally the "per dag execution environment" should be clarified as an optional feature that might help with solving particular dependency conflicts, but we should not treat is as "default" approach that will be used by many users.

    IMHO a number of problems it introduces with managing multiple environments, making sure that they are actually portable and it's not defined who and how should manage them, and I think they actually do not solve Airflow "conflicts" problems but they are multiplying them (because basically people will have to solve conflicts on their own). Also it adds quite a bit of complexity when it comes to managing changes in such environments. For example when you upgrade a new version (security) of a dependency - should that mean that all past dag runs that used past version of the environment should use the new one? Or when you add backwards incompatible dependency change to your DAG (and change the environment) - how that impacts previous version of the environment (and how it plays with the above "security update". We seem to delegate those problems to users, and that's fine, as long as it's going to be "Power user" feature and we document and explain some of the scenarios involved there. And we have to be very careful as this is a bit uncharted territory and I am quite sure pip install --target  is just a tip of the iceberg (smile) 

    I think though that  this might be a good idea to add such execution environment as an "optional" way of handling situations where particular task or DAGs should be run with a different set of dependencies (and for that, with very specific cases, it might be a great escape hatch), but I have huge doubts if this will ever become a "standard" way of running airflow tasks. It will come with a big implementation cost, but I have a feeling it will be used very rarely. But I am very optimisitic as a feature, and I think it is a good idea (for Airflow 3.1+ as marked in the document). But we should not make it an "expectation" that bundle **should** contain execution environment. This should be "exceptional" - IMHO "common" environment of execution will be used by 9X% of tasks, because overhead on managing multiple execution environments by the users will be huge.

    But also I am curious to see how it might work out, while sceptical in general to make it a "first class citizen" in airflow and expecting people to do that "in general", I think if we treat it as "yet another way of dealing with conflicting dependencies" on top of what we already have: https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#handling-conflicting-complex-python-dependencies. And this should also be added on top of "multi-team" approach whre such environments will be defined on "team level". I think we should give our users all those options.

    In short: I think we should be rather clear that such "per-dag" execution environments are "Power user" additional feature and very well document some limitations and constraints they might bring: dependency of exact same libraries and OS, the time/traffic it will takes to install such environments, security implications, difficulties in running and testing and especially in maintaining those environments.

    1. 100%. I don't think I did a good job of articulating it, but I definitely view this as an optional power user feature only!

    2. I'm going to remove execution environments from this AIP. It's big enough without it. A little more detail in this comment and thread.

      1. Okay, execution environments are gone now!