Status

StateDraft
Discussion Threadhttps://lists.apache.org/thread/qz6ckld9jsv6grsc02qnp3sf1jlrto87
Vote Thread
Progress Tracking (PR/GitHub Project/Issue Label)https://github.com/apache/airflow/pull/39647
Created

2024-05-26

In Release
Authors

Abstract

This proposal aims to enhance Apache Airflow's DAG loading mechanism to support reading DAGs from ephemeral storage, superseding AIP-5 Remote DAG Fetcher. The objective is to generalize the DAG loader and processor to load DAGs from pathlib.Path rather than assuming direct access to the OS filesystem. This includes implementing a custom module loader that supports loading from ObjectStoragePath locations and other Path locations with caching capabilities provided by fsspec. Additionally, while this AIP does not implement DAG versioning directly, the proposed loader can be easily extended to support DAG versioning, laying the foundational layer for AIP-63

Motivation

Currently, Airflow's DAG loading mechanism is tightly coupled with the local filesystem, limiting its flexibility and scalability. As organizations increasingly adopt cloud-native architectures and ephemeral storage solutions, there is a growing need to support reading DAGs from various storage backends, including object storage services like S3, GCS, and Azure Blob Storage. Traditionally, Airflow has relied on third party syncing mechanisms, like git-sync, which make for more complex deployments and have drawbacks with keeping 'in-sync'. This proposal addresses this need by abstracting the DAG loading process to accommodate different storage backends seamlessly.

Proposal

Goals

  1. Generalize the DAG loader and processor to use pathlib.Path for file operations.
  2. Implement a custom module loader to support ObjectStoragePath and other Path-like abstractions.
  3. Integrate fsspec for caching mechanisms within the loader to optimize performance and reduce redundant data retrievals from remote storage.
  4. Lay the foundational layer for supporting DAG versioning as described in AIP-63.

Implementation Details

1. Generalize DAG Loader and Processor

  • Refactor DAG Loader:

    • Modify the DAG loader to use pathlib.Path for all file operations.
    • Ensure backward compatibility with existing DAG loading processes.
  • Update DAG Processor:

    • Adapt the DAG processor to handle pathlib.Path objects.
    • Abstract any direct filesystem access to use pathlib.Path methods.

2. ObjectStorageLoader & ObjectStorageFinder

This is to support loading python modules from cloud storage. This consists of having a Finder, to locate arbitrary modules, and a Loader.

    • Utilize the existing ObjectStoragePath class, which extends pathlib.Path and uses fsspec as the underlying implementation.
    • Ensure ObjectStoragePath integrates seamlessly with Airflow's DAG discovery process by adding the Finder.
    • Ensure caching for compiled py files and for downloaded DAGs and modules. 
    • Allow users to define cache size, eviction policies, and storage locations via Airflow configuration.
    • Optimize the custom module loader for minimal latency and efficient data retrieval.

3. Support for DAG Versioning (Foundational Layer)

  • DAG Versioning Integration:
    • Design the loader to support versioning mechanisms provided by fsspec or cloud storage backends.
    • Ensure the architecture allows easy extension to include versioning logic as specified in AIP-63.
    • Provide configuration options to enable versioning features, either leveraging built-in capabilities of fsspecand cloud storage or through an additional versioning layer.

Security Considerations

Confidentiality

We rely on the mechanisms provided by Airflow's Connections and ObjectStoragePath to ensure secure access to object storage. Storage will be read-only (no cache files will be written to Object Storage)

Availability

As DAGs and modules will need to be downloaded before Airflow can parse them this can impact performance of the DAG processing. Configurable caching is implemented to reduce the impact.

Integrity

Currently no additional mechanisms are planned to verify the integrity of the DAGs and modules. We rely on object storage semantics and fsspec to provide this.

Documentation

  • Update Airflow's documentation to include detailed instructions on configuring and using the new DAG loading mechanism.
  • Provide examples and best practices for leveraging ephemeral storage in Airflow.



3 Comments

  1. Thanks Bolke de Bruin for the AIP - compared to others in Reply-to-all I'll try to comment in Confluence. I have some general questions which might turn into concern but am "open to be convinced":

    1. Object Storages are (usually) charged per IOPS. As all nodes need continuous access to DAGs - is there a way to estimate amount of IOPS needed and therefore costs generated compared to a GIT Sync approach? Also a GIT might be rate-limited.
    2. On the other hand understanding that caching will compensate, how will I be (as admin) be able to flush all caches consistently in case of inconsistencies?
    3. I have doubts in performance as local files will always be fast and present but remote calls will introduce latency in DAG parsing. I we would have parallel parsing and asyncio that would compensate IO latency. But as long as it is not a mandatory option this is to be investigated - but foresee problems in this area. I am not sure whether this could scale to 1000's of DAGs
    4. I also see (like other emails in discussion) a challenge with non Python resources. How about Dag-Factory or other generators using other files via __file__?
    5. Would the AIP include also a standard deployment option as a option in Helm chart?

    Overall I like the idea of de-coupling the core from IO/file access though of course!

      1. Short answer is "not that I can think of right now, except for measuring it once available". The long answer is ymmv. Long lived nodes with slowly changing DAGs will use the cache, so they will barely hit the object storage apart from the initial load. Checking for changes is for S3 currently $0.0004 per thousand requests. GIT sync will need to do the same and will sync all changes everywhere rather than selective changes when needed. 
      2. Caching eviction policy can be set or you can just remove the cached files
      3. With file caching, files will be pulled locally first and parsed from there. If you have caching turned off this will introduce a latency indeed, however there is no real downside to caching except if you change your DAGs extremely often.
      4. First and foremost if you do not change to remote storage there is no effect on DagFactory and on other generators. Relying on __file__ imho is a bad habit and it only coincidentally works. Strategies to overcome/mitigate this have been outlined on the mailing list but for reference: 1) allow the module loader to load resources by convention (e.g. everything under __module__/resources), 2) use manifest file, 3) maybe use a zip-file
      5. Can you explain what you would expect?
      1. Regarding 5) I expect nothing explicit - just wanted to ask for clarification if the deployment is included in the AIP (in scope) such that A GIT-Sync-free deployment via ObjectStorage will be added as configuration option to the helm chart as well. (I am not certain whether this is easily possible)