Status

StateSuperseded by AIP-66
Discussion Thread
JIRA

AIRFLOW-4138 - Getting issue details... STATUS

Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

Motivation

Currently, DAGs are discovered by Airflow through traversing the all the files under $AIRFLOW_HOME/dags, looking for files that contains "airflow" and "DAG" in the content, which is not efficient. We need to find a better way for Airflow to discover the DAGs.

Considerations

Is there anything special to consider about this AIP? Downsides? Difficultly in implementation or rollout etc? 

What change do you propose to make?

I am proposing to introduce DAG manifest, an easier and more efficient way for Airflow to discover DAGs. The DAG manifest would be composed with manifest entries, where each entry represents a single DAG and contains in formation about where to find the DAG.

Format:

dag_manifest_entry:
    dag_id: the DAG ID
	uri: where dag can be found, DAG locations will be given via URI, i.e. s3://my-bucket/dag1.zip, local:////dags/day1.zip
	conn_id: connection id to use to interact with remote location

File-based DAG manifest

Airflow services will look at $AIRFLOW_HOME/manifest.json for the DAG manifest. The manifest.json contains all the DAG entries. We should expect a manifest.json like:

[
	"dag_1": {
		"uri": "local://dags/hello.py"
	},
	"dag_2": {
		"uri": "s3://dags/superhero.py"
	}
]

Custom DAG manifest

The manifest can also be generated by a callable supplied in the airflow.cfg that would generate a list of entries when called, i.e

[core]
# callable to fetch dag manifest list
dag_manifest_entries = my_config.get_dag_manifest_entries

The DAG manifest can be stored on S3 and my_config.get_dag_manifest_entries will read the manifest from S3.

What problem does it solve?

An easier and more efficient approach for Airflow DAG discovery.

Why is it needed?

  • With the manifest people are able to more explicitly note which DAGs should be looked at for by Airflow
  • Airflow no longer has to crawl through a directory importing various files possibly causing problems
  • Users are not forced to allow for a way to crawl various remote sources
  • We can get rid of  AIRFLOW-97 - Getting issue details... STATUS , which requires strings such as "airflow" and "DAG" to be present in DAG file.

Are there any downsides to this change?

  • An extra step to add a new DAG, e.g. add an entry in the DAG manifest.
  • Migration is required for upgrade. We may need to create a script that traverse all the files under $AIRFLOW_HOME/dags or remote storage (assuming we have remote dag fetcher) and populates entries in the DAG manifest.

Which users are affected by the change?

All users attempted to upgrade to the latest.

How are users affected by the change? (e.g. DB upgrade required?)

Users need to run a migration script to populate the DAG manifest.

Other considerations?

N/A

What defines this AIP as "done"?

Airflow discovers DAGs by looking at DAG manifest without traversing through all the files in the filesystem.

11 Comments

  1. $AIRFLOW_HOME/manifest.json - either dag_manifest.json or dags/manifest.json would be better locations I think.

    > All users will be affected.

    Does this mean this is required ? I think for many small installs the autodiscovery works and should remain functional

    1. Yeah if DAG manifest file becomes the source of truth for DAGs and Airflow no longer scan files looking for "airflow" and "dag" keyword in the content to discover dag files. All user will be forced to onboard to create a dag_manifest.json

    2. small installs the autodiscovery works

      Can you add a bit more details about what you mean by small installs?

  2. Interaction with AIP-5: should each repo have (the ability) to have it's own manifest? Should the manifests list the remote repos instead of us needing to configure things "twice"

    How do we surface to users that they forgot to add a DAG to a manifest? (I'm thinking of the "I added a new DAG and it isn't getting picked up!? WTF?!" questions in slack.)

    1. I think each repo should have their own manifest and I think each manifest only need to specify the subpath under that repo.

      How do we surface to users that they forgot to add a DAG to a manifest? (I'm thinking of the "I added a new DAG and it isn't getting picked up!? WTF?!" questions in slack.)

      During the transition, we probably can add some tooling that scans the files looking for "dag" and "airflow" keyword in the content and warn the users if that DAG file is not in the DAG manifest. But in the end, we should onboard our users to know that the DAG manifest file is the place where Airflow looking for DAGs.

  3. I think there might be an added value in having manifest but I think when we use it for some kind of "consistency/versioning/atomic DAG sets" introduction. Just replacing scanning folders with manifest has no added value really (at the end folders in the filesystem are already a kind of implicit manifest we are using already ("Get me all the files that are in that folder"). There are few gotchas (like analysing whether the file has DAG inside and .airflowignore file where you can exclude certain files, but in essence folder index is our manifest.

    As explained in Re: AIP-5 Remote DAG Fetcher  I  think such manifest would be much more valuable if it also solves the "consistency" problem between related DAGs. Currently - due to the way scanning works for scheduler in Airflow it might be that related DAGs/subdags/files in DAG folder are not in sync. Having a manifest where we could define "bundles" of related files might be a way to address that. I can for example imagine that we have an ".airflow.manifest" file per subdirectory and in case such manifest is present, no scanning of the directory happens in regular way, but instead the files specified there are loaded and made available by scheduler, but then we have to think a bit more and have some scenarios how to handle versioning/atomicity in this scenario. Then I think manifest concept might be super-useful.

    1. Currently - due to the way scanning works for scheduler in Airflow it might be that related DAGs/subdags/files in DAG folder are not in sync.

      Are you referring to dags in scheduler/webserver/worker not in sync?


    2. I agree that DAG manifest itself would not add too much value to Airflow as the file index is already an implicit manifest. The real value comes out if we decide to support versioned DagRun through AIP-5 Remote DAG Fetcher to guarantee code consistency between scheduler and executor throughout a DagRun, the DAG manifest helps limit the number of files needed to be fetched on the executor in order to execute a task.

  4. I would suggest making this config file option into a list of callables. The default value would then be, say: `[airflow.models.deprecated_filesystem_crawler, airflow.manifest.default_manifest_file]`. You could then turn either of them off easily; or subclass them to tweak the behavior, and replace it with your subclass. This would be a much better user experience than having a hard cutover to manifests.

  5. As for the relationship between this and AIP-5... The primary objection that people have had in AIP-5 is that it forces us to support arbitrary complexity of cloning/checkout code. I think that we do want some remote-connection code in Airflow core - but not primarily for the synchronization itself, but so that we can have a reasonable API for displaying synchronization state . Something that lets you know at a glance whether a DAG is "currently deploying", "up to date", "failing to sync", and so on.

    I'd favor a combined definition here. As per AIP-20, build some plumbing to understand concepts like git versions. As per this AIP, expect a list of DAGs with URIs (or a callable to retrieve the DAG list).

    However, allow a callable to also be provided for that DAG (with implicit callables for common URI schemes). The goal of that callable is to return the current desired version, and either perform the fetch from S3/git/etc. itself, or to block until an external system reconciles to the correct version.

    Example 1, "git sync": your Manifest Callback returns that you should be on commit "C3D4". You are currently on "A1B2". This triggers your Fetcher Callback, which actually connects to Git and does an in-place update.

    Example 2, "DAG code generator". You use a framework that generates DAG code from a definition file, and stores it in S3. You want to benefit from knowing the git commit; but the actual artifact isn't in git, and isn't available for some time after the commit lands. You write a subclass of the Git fetcher above. It polls against git to look for changes, but doesn't actually update to the new git commit until the corresponding S3 code is ready. This allows surfacing "current version" commit hashes in the UI, while still using a different/multiple backends.

    Example 3, "pre-baked DAG Docker image": you have a Manifest Callback, which returns that your DAG should be on commit "C3D4". However, the Dockerized worker process hasn't been restarted yet, and is still running commit "A1B2". You write a Fetcher Callback that blocks, but does not  perform an update itself. This allows old Dockerized workers to continue working on their tasks, but refuse to take invalid new work. You write an external process that queries for workers that both aren't running work, and have an old manifest version; and kills them.

    In this framework, the callable could be one of a handful of core Airflow callables that implement the 80/20 most common set of remote syncing features. More complex cases would be handled by users writing their own class implementation.

    With this approach, if there is a fetching issue, Airflow is aware of the version drift  and can surface a UI alert on which DAGs are affected by it. This helps operators understand the impact immediately.