Status
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Motivation
Currently Airflow requires DAG files to be present on a file system that is accessible to the scheduler, webserver, and workers. Given that more and more people are running Airflow in a distributed setup to achieve higher scalability, it becomes more and more difficult to guarantee a file system that is accessible and synchronized amongst services. By allowing Airflow to fetch DAG files from a remote source outside the file system local to the service, this grant a much greater flexibility, eases implementation, and standardizes ways to sync remote sources of DAGs with Airflow.
Considerations
Is there anything special to consider about this AIP? Downsides? Difficultly in implementation or rollout etc?
What change do you propose to make?
DAGs are persisted in remote filesystem-like storage and Airflow need to know where to find them. DAG repository is introduced to record the remote root directory of DAG files. When loading the DAGs, Airflow would first download DAG files from the remote DAG repository and cache them under the local filesystem directory under $AIRFLOW_HOME/dags (or other location). Airflow should only fetch DAGs from remote source if the local copy is stale.
DAG Repository
We will create a file, dag_repositories.json under $AIRFLOW_HOME, to record the root directory of DAGs on the remote storage system. Multiple repositories are supported.
Code Block | ||
---|---|---|
| ||
dag_repositories: [ "repo1": { "url": "s3://my-bucket/my-repo", "subpath": "dags" "conn_id": "blahblah" }, "repo2": { "url": "git://repo_name/dags", "conn_id": "blahblah2" } ] |
DagFetcher
The following is the DagFetcher interface, we will implement different fetchers for different storage system, GitDagFetcher and S3DagFetcher. Say we have a dag_repositories.json configuration like above. DagFetcher would download files under s3://my-bucket/dags to $CACHED_REPOS/dags and we will create a symlink to $AIRFLOW_HOME/dags so that the files can be discovered by Airflow.
Code Block | ||
---|---|---|
| ||
class BaseDagFetcher(): def fetch(repo_id, subpath=None): """ Download files from remote storage to local directory under $CACHED_REPO/repo_id. Create a symlink in $AIRFLOW_HOME/dags/repo_id to $CACHED_REPO/repo_id or $CACHED_REPO/repo_id/subpath if subpath is specified. """ |
Proposed changes
DagBag
We should ensure that we are loading the latest DAGs cache copy, thus we should fetch DAGs from remote repo before we load the DagBag.
Scheduler
Currently, DagFileProcessorManager periodically calls DagFileProcessorManager._refresh_dag_dir to look for new DAG files. We should change this method to fetch DAGs from remote source at first.
What problem does it solve?
In a distributed Airflow setup, we need to have an additional process to move DAG files to services like scheduler, webserver and workers and make sure DAG files are in sync. By allowing Airflow services to fetch DAGs from remote sources, we remove the need for such additional process that is a duplicate effort. It also standardize the way we synchronize DAGs among services.
Why is it needed?
Same as above.
Are there any downsides to this change?
The DAG loading performance may be impacted as Airflow services will need to do an additional step to fetch DAGs from remote sources. To minimize the performance impact, each Airflow service will keep a local cache of DAGs and will only fetch if local copy is stale.
Which users are affected by the change?
This would only impact users who specified remote sources of DAGs. Users who prefer to use Airflow in the old way (moving DAG files to $AIRFOW_HOME/dags by themselves) won't be affected.
How are users affected by the change? (e.g. DB upgrade required?)
DAG loading performance can be affected as there is an additional step to fetch DAGs from remote sources but we can minimize it by caching a local copy.
Other considerations?
What about DAG manifest? Will this design block us from supporting DAG manifest in the future?
No, it won't. Say the location of each DAG is specified in the DAG manifest. The fetcher would fetch DAGs based on the DAG manifest instead of dag_repositories.json. To ease the migration to adopt DAG manifest, we can create a script that scans DAG files on the remote sources to figure out the location and populate the DAG manifest accordingly. Check out AIP-20 DAG manifest if you are interested in learning more.
What about DAG versioning?
Enabling remote DAG fetching would open the gate for supporting DAG versioning in the future. Say we implemented AIP-20 DAG manifest afterwards, we will be able to support DAG-level versioning. We could record the version of DAGs we use for each DagRun. Therefore the current running DagRun would not be impacted if the DAG was updated in the middle.
What defines this AIP as "done"?
Airflow support at least fetching DAGs from one kind of remote source. I am thinking of supporting fetching from Git or S3 at first.