Status

StateDraft
Discussion Thread

https://lists.apache.org/thread/bn0oo47j48xh8r335gd2jrrjz0o7vnjl

Vote Thread
Vote Result Thread
Progress Tracking (PR/GitHub Project/Issue Label)
Date Created

Aug 01, 2024

Version Released
Authors

Motivation

What problem does it solve? / Why is it needed?

From the very beginning Airflow relied on DAG parsing loop continuously re-parsing and maintaining the state of DAGs in Airflow deployment. This simple approach served Airflow for a long time, but ultimately made a number of basic use cases hard or impossible to implement:

  • No way to synchronously update a DAG in a fast & reliable manner. To deploy a new version of the DAG, the user needs to deliver a new version of the code (1) to components and wait until the DAG processor eventually picks up a newer version (2). For larger Airflow deployments the delay between (1) and (2) can be significant. There are workarounds (like bumping the priority),but they either don’t achieve a full sync parsing process or have huge latency overhead (like airflow dags reserialize -S <subdir>).
  • A safer rollout of new DAG code is limited by the fact that any import error removes the previous non-broken version of the DAG and disrupts the execution if it was running. This came from the fact that DAG removal and absence of DAG on next parsing iteration are equivalent in the current implementation. 
  • AI agents are harder to integrate with Airflow deployments, as eventually-consistent DAG ingestion requires multiple tool invocations.
  • Many companies opt-in for building simpler (and more restrictive) workflow orchestration solutions on top of Airflow (for ex. have all DAGs preserved as YAMLs). All these solutions have to hack around Airflow, because the entrypoint in Python files is the only way to define DAG, despite the generation of DAG itself from external combination being trivial.
  • Most of the parsing iterations are throw-away CPU cycles. While dynamically generated DAGs (like the one depending on Variables) are supported, most of the DAGs do not require it and are immutable. If they are baked into the container image (a fairly common case) it would’ve been sufficient to parse them once. But to satisfy a relatively small percentage of dynamic DAGs there is no option to save up resources by avoiding extra re-parsing iterations (partially solved by AIP-66 manifest options).

Considerations

What change do you propose to make?

Originally this AIP proposed a single set of changes across the DAG processing. With multiple Airflow 3 improvements landed (AIP-72, AIP-66, DagBag refactoring into vanilla and DBDagBag), it makes sense to split this proposal into 2 relatively independent tracks:

Track 1: Abstract DAG importer 

Introduce a new pluggable interface - "DAG importer". This mechanism will be abstracting away the transformation of a bundle content into a DAG object, which is ingestable by the DAG processor and executable by the task runner. DAG importer can be supplied by the providers.

The importer assumes that DAGs are scoped by file-system-like paths. In case of most of the bundles, this is a real file system. Proposed interface:

class AbstractDagImporter:
  # Get DAG objects from specified path(s).
  def import_path(path: str, options: ImportOptions) -> Iterable[DagsImportResult]:...
  # Get DAG paths in the subpath.
  def list_paths(path: str) -> Iterable[Path]:...
  def path_exists(dag_path: str) -> bool:...
  def modified_time(dag_path: str) -> timestamp:...

  # State persistence and restoration for passing the state at IPC
  def get_state() -> dict:...
  def load_state(state: dict) -> None:...

With its introduction, the split of responsibilities becomes the following:

  • DAG importer [pluggable] - How the DAG is constructed from sources, what paths exists in sources.
  • DAG bundle [pluggable] - DAG sources storage management, sources version control.
  • DAG processor [non-pluggable] - What DAGs are (re)ingested and when, how they are persisted in metadata DB.

Implementation

DAG importer is a (potentially) stateful  object, that is created within a DAG bundle (1:1). It is supplied along the bundle path to DagBag (in DAG processor/worker) and when working with paths metadata in DAG processor. Since we want to make it pluggable, it should be possible to access (at least some) of the internal Task API to access objects, like Connections. Which in turn means, that we should run it as a supervised process, so the process of fetching paths metadata (listing, checking existence, etc.) would need to happen in a separate process.

Configuration

The configuration of DAG importer is a part of a bundles configuration. Example:

[
  {
    "name": "dags-folder",
    "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
    "kwargs": {
      "dag_importer_class": "airflow.dag_processing.importers.local_python_importer.LocalPythonImporter"
    }
  },
  {
    "name": "notebooks-git",
    "classpath": "airflow.providers.git.bundles.git.GitDagBundle",
    "kwargs": {
      "dag_importer_class": "airflow.providers.jupyter.importers.LocalNotebookImporter"
    }
  },
  {
    "name": "bq-pipelines",
    "classpath": "airflow.providers.google.cloud.bundles.pipelines.PipelinesBundle",
    "kwargs": {
      "dag_importer_class": "airflow.providers.google.cloud.importers.BQPipelinesBundle",
      "kwargs": {
        "project_id": "my-project",
        "location": "us-east1" 
      }
    }
  }
]

Compatibility

With the assumption that the default DAG importer is a LocalPythonImporter (with the same behavior as a current DagBag), the new mechanism should be fully compatible with the current behavior of Airflow.

One non-trivial question is interoperability between different bundle and importers. Based on our prototype, we've identified two "flavors" of importers:

a) File-format importers. Those importers can naturally work with any file-system based bundle (such as LocalDagBundle, Git, S3, etc.). They focus on importing the different DAG formats. Examples of those are:

  • LocalPythonImporter - current Airflow Python files
  • Python notebook importer
  • YAML-format importer
  • dbt Pipeline importer

b) External importers. Those only work with "Synthetic" bundles - bundles that only pass "version" metadata to importer and providing a common path, with an importer both dealing with fetching the "source" from external source.

Example of it are YAML configurations that are stored in the external database. This setup work well for a visual pipeline constructors, which don't require an advanced versioning management and don't want to get in the business of syncing files.

With external importers the generation of DAG paths are fully in the hands of the importer logic and do not represent real local files. For example, those can be resource IDs in the API: `projects/my-project/pipelines/mypipiline1`. The external importer can use those as 1:1 replacement for dag import paths.

PoC: https://github.com/IKholopov/airflow/tree/bq-pipeline-importer

Scope (what defines track done)

  • AbstractDagImporter is implemented and added a as a suppliable via providers.
  • LocalPythonImporter is implemented (most of it is the migration of logic from DagBag, in turn simplifying its function further).
  • Bundles interface is extended to instantiate importers.
  • Importers support is integrated into DAG processor.

Track 2: Interactive DAG processor

In addition to a regular (async) DAG processor, create an alternative "Interactive" DAG processor, which will act as an internal control-plane API for DAGs parsing.

Proposed internal API:

# Parse and return DAGs/import errors for bundle and (relative) path within a bundle (no ingestion)
- POST /bundle/parse { “bundle”, “path”, “bundle_version” } ->  { “import_errors”, “dags” }
# Parse, ingest and return ingested DAG IDs/import errors for bundle and (relative) path within a bundle.
- POST /bundle/parse_update { “bundle”, “path”, “bundle_version”, "update_options" } ->  { “import_errors”, dict[“dag_id”, “dag_version”] }
# Disable DAGs (by ID or relative path)
- DELETE /bundle {“bundle”, “path”, “dag_ids” }
# Get bundle version metadata
- GET /bundle -> { “bundle”, “bundle_version” }
# Pin bundle version for specified DAGs
- POST /bundle/set_version { “bundle”, “dag_ids”, “bundle_version” }

Implementation

A new flag `airflow dag-processor --interactive-api` will start a FastAPI described above. It can run aside a regular async dag-processor, but sets of bundles must be non-overlapping between interactive and async dag-processors and designated at such in bundle configuration (`dag_bundle_config_list` Airflow configuration).

DAG model is expanded with an optional `default_dag_version` relation to DAG version, which can be used to pin an older version for an individual DAG.

Pre-requisites

  • AIP-92
  • Callbacks migration (part of AIP-92) into a separate processing unit/workers.

Open questions

  • Detailed authn/authz for DAG processor. One of the desired features - ability to operate a "read-only" mode DAG processor, which would require authorization to access only designated connections via Internal task SDK API and no write access to perform DAG ingestion in DAG processing API. This AIP that the question of a long-term authn/authz schema is figured out, but we anticipate that additional might be required to simplify the setup of read-only mode and prevent accidental misconfigurations (e.g. designate some connections as read-only and throw an error if read-only mode is requested, but access to non-read-only connections is granted).

Compatibility

As this is an introduction of a new component, no real impact on the existing setups is anticipated. Configuration changes should be backwards compatible.

PoC: https://github.com/mpapierowski/airflow/tree/demo/mcp 

Scope

  • DagModel is expanded with a relation for setting a default DAG version
  • Interactive DAG processor (API) is implemented as a standalone FastAPI
  • Airflow CLI command `dag-processor` is adjusted to starting interactive processor via flags


4 Comments

  1. Overall looks good to me - fits with other major AIPs in Airflow 3.0 (e.g. AIP-72) and brings the value of better DAG processing management. 

  2. Overall LGTM. Matches what I was imagining for this topic, and I don't see any conflicts (at this point at least (smile)).

  3. I like the way it is defined - and agree with Michal Modras and Jedidiah Cunningham . Also the idea of making some refactorings in DagBag and related code in Airflow 3 without yet full implementation of this AIP is a good idea. 

    Maybe I'd add one more use case - this should be also stepping stone for the long term "workflow-in-workflow" cases (like Cosmosfor DBT and Databricks workflow support we have in a limited way)) - at least the first part of it where external workflwos can be mapped (i.e bundle-parsed) into Airflow DAG. The next steps would be to add "null" tasks and a way to interact with the workflow running elsewhere, but this one wil be a good prerequisite to have.

  4. Igor Kholopov What timelines are you targeting to land this?