(renamed from Airflow FS)


Status

StateCompleted
Discussion Thread
Vote Threadhttps://lists.apache.org/thread/wokt58k15g81cjnsytq9k1ofvspb4d5c
Progress Tracking (PR/GitHub Project/Issue Label)https://github.com/apache/airflow/pull/34729
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

In Release2.8.0
Authors

Updated 2023-11-16: The AIP has been accepted as experiemental in 2.8.

Summary

With Airflow Store (AS) we want to address the challenges of fragmented code, improve XCom, and DAG processing complexity by providing a unified abstraction for accessing object storage systems. By doing so, we aim to enhance the overall user experience, promote code reusability, and simplify the development of Airflow workflows that interact with different storage backends.

Motivation

One common challenge faced by DAG authors is the diversity of object storage systems used to store data inputs, outputs, and intermediate results. Currently, Airflow lacks a unified and consistent way to access these various storage systems. This leads to an exponential amount of Operators that do "SourceToDest" and vice versa. These Operators do not necessarily keep to a common API standard leading to a cognitive burden for the DAG authors.

At the same time DAG processing is entirely bound to a filesystem mounted to the system local to the DAG  processor. Running in containers a local filesystem is often ephemeral and admins deploy things like git-sync or redeployments to deal with this. A preference is to use object storage, however with Airflow relying heavily on filesystem semantics this is currently not an option.

XCom was created to store small pieces of data to share between tasks. These values are then stored in Airflow's database. We do see users wanting to share more values (like Data Frames) and with the increasing use of TaskFlow it becomes more natural to do so as well. Databases are very suitable for storing the larger objects and while user have been creating custom XCom backends that interact with object storage systems.  Airflow can be improved to have first class support for larger objects.

Inspiration

The "Airflow Store" proposal takes inspiration from successful projects like Databricks FS and Apache Iceberg, which provide unified access to distributed file systems and structured data tables, respectively. Airflow FS aims to build on these concepts and offer a similar level of abstraction for object storage systems, making it easier for Airflow users to interact with different storage backends consistently.

Posix Behavior

One of the core principles of AS is to follow POSIX behavior as closely as possible. This ensures that users can leverage their familiarity with standard (python) file operations, such as reading, writing, listing, and deleting files, even when working with various object storage systems. This consistency simplifies DAG development and reduces the learning curve for Airflow users.

Considerations

What change do you propose to make?

One of the primary considerations for the adoption of AS is its integration into the core Airflow package, hosted on airflow.io. This ensures availability across DAG Processing, XCOM, Authoring and Providers. To achieve this, AS will be implemented based on the well-established fsspec library, a BSD licensed file system specification library. AFS aims to provide a standard and unified approach to working with various object storage systems through the concept of mounts and mountpoints. This unified approach will eliminate the need for users to grapple with cross-filesystem issues, such as copying data between different cloud storage providers and removing the need for specialised Operators that deal with this.

Why is it needed?

  • To reduce the number of basic (and not overly well maintained) Operators that do SourceToDest and vice versa operations. This can be replaced by a FileTransferOperator
  • To have a unified interface to file operations in TaskFlow and traditional Operators
  • To allow DAG processing to be using arbitrary locations (object storage)
  • To allow XCom to have a standardised way of dealing with larger data volumes
  • Simplify DAG CI/CD 
  • Streamlining pre-DAG to DAG (e.g. notebooks to DAG)

Further considerations

While nothing prevents making use of fsspec  by users and even XCOM and DAG Processing there are several benefits that are hard to get when rolling your own. Obvious pros of having native fsspec support is a simplified authorization (with Airflow connections) and having a single FileTransferOperator instead of n^2 source-to-dest ones. Next to that when doing same-filesystem operations AFS will us the most native way of doing so, which would require rigor on the end of the user to have the equivalent there. If bug-fixes are required there is only one place where we would need to fix them.

A con is with the introduction of fsspec as part of our dependencies and which is managed by a different community. That community is orders of a magnitude smaller than our own. So there is a certain risk to the speed by which bug fixes can be delivered if in fsspec. fsspec  has had consistent releases since its inception in 2019. Other high profile projects like Apache Arrow, Apache Iceberg and Pandas are also dependent on it. In case the project gets abandoned there is the possibility to switch to smart_open  (https://pypi.org/project/smart-open/) which has similar goals but not the strictness of interface.

Proposal

ObjectStoragePath

ObjectStoragePath is the main API users will work with. It has a pathlib.Path-like interface with some extensions. It accepts two extra optional parameters conn_id: str and store: ObjectStore (see below). Lazy initialisation of the underlying connection to the underlying store is ensured so that ObjectStorePath instances can be instantiated at the global scope. ObjectStoragePath can be serialised.

from airflow.io.store.path import ObjectStoragePath

base = ObjectStoragePath("s3://warehouse") # global scope

@task
def task():
data = base / "data.gz" # task scope
data.stat()

ObjectStore

ObjectStore manages the filesystem or object storage. In general, users would not need to deal with its interface except when using a custom scheme or protocol which is not registered through one of the providers. 

from airflow.io.store import attach
from airflow.io.store.path import ObjectStoragePath

store = attach(protocol="custom", fs=CustomFileSystem())

@task
def task()
o = ObjectStoragePath("custom://bla")
# or
o = ObjectStoragePath("bla", store=store)

Providers

Providers can support additional filesystems, beyond the ones provided by core FSSPEC by providing a filesystem  module that has a get_fs(conn_id: str) → AbstractFilesystem  interface.  

Implementation

Which users are affected by the change?

No users are affected by the change. If you do not want to make use of it, the old patterns still work.

What defines this AIP as "done"?

  • airflow.io is available
    • AS interface should get one of the standard "citizens" of the Provider Package features.

      Existing provider packages must still be supported and previous provider packages not knowing about AFS should be also usable with newer Airflow versions

    • filesystems:
      - airflow.providers.microsoft.azure.fs.adls
  • DAGs can work with AFS
  • Initial implementation supports LocalFilesystem, S3, GCS, ADLS
    • These implementations must be hosted within the respective provider packages.

      • LocalFileSystem→ Okay to put this in core with the interface definitions
      • S3→AWS
      • GCS→Google
      • ADLS→Azure
    • Exception could be for generic filesystem that don't have a real provider (say for example a ZipFileSystem)
  • Out of scope: XCom, DAG Processor


11 Comments

  1. How does this interact with Datasets? They currently have a URI, and if the URI of a dataset is an s3:// URI should we be able to do something like mount a Dataset?

  2. In general they are two separate concepts (see also the discussion here: https://github.com/apache/airflow/pull/34729#issuecomment-1749091768). So you attach a volume to a mount point so you can do operations on top of files there. In that way they are separate as a Dataset points to a set of data rather than a volume. 

    So this is how it works now in the PR:

    local_fs = fs.mount("file:///data", "/data")

    remote_fs = fs.mount("s3://my-warehouse", "/warehouse")

    out_ds = Dataset(remote_fs / "file.csv")

    Nevertheless, I think there is absolutely 'something' to make both concepts work more intimately together. After digesting the feedback here I was already thinking of having a pathlib.Path interface like so

    # referencing an absolute path

    data = afs.Path("s3://warehouse/data.gz", conn_id) 

    # referencing a mount point

    data = afs.Path("/mnt/warehouse/data.gz")

    Maybe it is an idea to have Dataset accept a Path? Or do you have something in mind?

    1. I don't have something in mind, other than a "these both take URLs and are both talking about data". I was kind of hoping you might (big grin)

      1. I might, there are some inklings of ideas (tongue)

      2. I thought about this as well (see Progress Tracking PR) but similarly there’s not a concrete use case for this. I’m currently collecting some use cases, and if that works out we can draft another AIP on top of this to propose that. I’d suggest leaving this out of this AIP for now to reduce the scope. It’d be easier for the discussion to focus on the lower lever designs, which is more important in this proposal.

    2. As Airflow has an inherit platform complexity, the idea of linking dataset and the mounting concept looks great to me as it would lower the level of complexity and understanding of users need to understand to use it. If we make it separate it might confuse users? Also using such kind of Path/URIs commonly as event trigger sounds like a cool catch.

      1. I agree, but the idea needs some iterations and fleshing out. AFS is intended as a foundational layer. If you have a few examples of what you think would be great to have (in code) then that would help.

        1. Okay, thanks. Does not need to be "all complete with one PR" but in general it might be (cool) to also think "long term" and have a path over multiple iterations to get to it. (no pressure (big grin))

  3. I assume with merge of the linked PR and checking the details - except that it is marked as "experimental" now, it is completed, correct?

  4. I believe this AIP is don meanwhile?

  5. Bolke de Bruin Looks like the AIP is done? Can we update the status if so