Motivation

Currently Airflow requires DAG files to be present on a file system that is accessible to the scheduler, webserver, and workers. Given that more and more people are running Airflow in a distributed setup to achieve higher scalability, it becomes more and more difficult to guarantee a file system that is accessible and synchronized amongst services. By allowing Airflow to fetch DAG files from a remote source outside the file system local to the service, this grant a much greater flexibility, eases implementation, and standardizes ways to sync remote sources of DAGs with Airflow.

Considerations

Is there anything special to consider about this AIP? Downsides? Difficultly in implementation or rollout etc? 

What change do you propose to make?

DAGs are persisted in remote filesystem-like storage and Airflow need to know where to find them. DAG repository is introduced to record the remote root directory of DAG files. When loading the DAGs, Airflow would first download DAG files from the remote DAG repository and cache them under the local filesystem directory under $AIRFLOW_HOME/dags (or other location). Airflow should only fetch DAGs from remote source if the local copy is stale.

DAG Repository

We will create a file, dag_repositories.json under $AIRFLOW_HOME, to record the root directory of DAGs on the remote storage system. Multiple repositories are supported.

repositories.json
dag_repositories: [
	"repo1": {
		"url": "s3://my-bucket/my-repo",
        "subpath": "dags"
		"conn_id": "blahblah"
	},
	"repo2": {
		"url": "git://repo_name/dags",
		"conn_id": "blahblah2"
	}
]

DagFetcher

The following is the DagFetcher interface, we will implement different fetchers for different storage system, GitDagFetcher and S3DagFetcher. Say we have a dag_repositories.json configuration like above. DagFetcher would download files under s3://my-bucket/dags to $CACHED_REPOS/dags and we will create a symlink to $AIRFLOW_HOME/dags so that the files can be discovered by Airflow.

class BaseDagFetcher():
    def fetch(repo_id, subpath=None):
    """
    Download files from remote storage to local directory under $CACHED_REPO/repo_id.
    Create a symlink in $AIRFLOW_HOME/dags/repo_id to $CACHED_REPO/repo_id or $CACHED_REPO/repo_id/subpath 
    if subpath is specified.
	"""

Proposed changes

DagBag

We should ensure that we are loading the latest DAGs cache copy, thus we should fetch DAGs from remote repo before we load the DagBag.

Scheduler

Currently, DagFileProcessorManager periodically calls DagFileProcessorManager._refresh_dag_dir to look for new DAG files. We should change this method to fetch DAGs from remote source at first.

What problem does it solve?

In a distributed Airflow setup, we need to have an additional process to move DAG files to services like scheduler, webserver and workers and make sure DAG files are in sync. By allowing Airflow services to fetch DAGs from remote sources, we remove the need for such additional process that is a duplicate effort. It also standardize the way we synchronize DAGs among services.

Why is it needed?

Same as above.

Are there any downsides to this change?

The DAG loading performance may be impacted as Airflow services will need to do an additional step to fetch DAGs from remote sources. To minimize the performance impact, each Airflow service will keep a local cache of DAGs and will only fetch if local copy is stale.

Which users are affected by the change?

This would only impact users who specified remote sources of DAGs. Users who prefer to use Airflow in the old way (moving DAG files to $AIRFOW_HOME/dags by themselves) won't be affected.

How are users affected by the change? (e.g. DB upgrade required?)

DAG loading performance can be affected as there is an additional step to fetch DAGs from remote sources but we can minimize it by caching a local copy.

Other considerations?

What about DAG manifest? Will this design block us from supporting DAG manifest in the future?

No, it won't. Say the location of each DAG is specified in the DAG manifest. The fetcher would fetch DAGs based on the DAG manifest instead of dag_repositories.json. To ease the migration to adopt DAG manifest, we can create a script that scans DAG files on the remote sources to figure out the location and populate the DAG manifest accordingly. Check out AIP-20 DAG manifest if you are interested in learning more.

What about DAG versioning?

Enabling remote DAG fetching would open the gate for supporting DAG versioning in the future. Say we implemented AIP-20 DAG manifest afterwards, we will be able to support DAG-level versioning. We could record the version of DAGs we use for each DagRun. Therefore the current running DagRun would not be impacted if the DAG was updated in the middle.

What defines this AIP as "done"?

Airflow support at least fetching DAGs from one kind of remote source. I am thinking of supporting fetching from Git or S3 at first.


63 Comments

  1. Ash Berlin-Taylor Tao Feng would be great if I can gather some feedbacks from you first before I send the AIP out for a broader discussion.

  2. Don't most cloud services and similar offer the ability to mount a shared filesystem?  And doesn't that solve this issue?   

    I think the approach support VCSes (git) and similar to be a bit hack-ish.   In general we envision our deployment process to include pulling from git (or other) repos into local, or network-mounted, file-system folders.  All that's needed is a way to "point" an airflow instance at that mounted folder and airflow already supports that.  

    I guess I'm unclear on the specific use case that is the driver for this proposal.

    1. Don't most cloud services and similar offer the ability to mount a shared filesystem?  And doesn't that solve this issue?  

      But what about those people who are hosting Airflow on their own datacenter without a shared filesystem. Should we push them to host their Airflow on a cloud service that support shared filesystem instead?

      I think the approach support VCSes (git) and similar to be a bit hack-ish.

      Can you elaborate on why you think this is hack-ish? This removes the need to setup a deployment pipeline to pull DAG files from git to the filesystem. Also this would open other opportunities for Airflow as it gained access to all versions of DAGs. We can pin a DagRun to a specific version of DAG file so that running DagRun would not be impacted by changes to the DAG code.

    2. Not all cloud services support an easy way to mount things as a shared filesystem. This problem is especially evident with running things in k8s on AWS. This AIP goes hand in hand with creating a DAG manifest. This is because the current method of having the scheduler crawl a file system to find DAGs is not the most ideal. Doing a Git pull does work for some people for mounting their DAGs, but this can become unscalable if you have many many DAGs. A lot of inspiration for this AIP came from Maxime Beauchemin, the creator of Airflow, who in a presentation last September brought up the idea of having remote DAGs.

      Moving towards a manifest with URIs allows for people to have more control over their DAG deployment, and

      • Allow for DAGs to be stored in any location (with a URI it can be on the file system, or git, or s3, or...) .
      • Allow for people to not have to deal with ensuring a network file system is mounted into each service
      • Allow for versioned DAG Runs which will be extremely helpful for consistency 
      • No longer does the scheduler have to crawl through a directory and open up files checking to see if a DAG is defined in it

      This change wouldn't break your current workflow there would simply be a way to generate a DAG manifest from a directory, but for a lot of more complicated cloud deployments, especially those with in k8s this will allow for Airflow services to be more stateless  

      1. I faced the issue when we moved to k8s and there is no One-to-Many easy solution on AWS. 

        So, I must create an extra step of resync the dags on update.

  3. Most data centers - self hosted - have NAS, so it's a non-issue.

    For others, the same workflow that pulls the Airflow config to create the image or deploy it can do multiple git pulls to a sibling folder and then set the env variable for the DAGS to point to the sibling folder.  So, whether the deployment is manual or automatic, is is easily scriptable (bash etc.) and you can get 100% of the desired functionality without coupling Airflow to git or any other infrastructure tech.  We do this and it's pretty simple to set up.



    1. Does this assume that airflow is deployed along with the dags?

      1. It doesn't assume that.  Both the model of deploying together and deploying the DAGs separately is supported.  We support the latter in several ways including simply sub-folders of a sibling "dag" folder that is outside the main airflow install folder and via one or more files that dynamically create DagBags from arbitrary folder locations.  Etc.

    2. I do agree that the DAG deployment pipeline is not complex. But I think if Airflow can fetch from git directly, it still removes the need for the deployment pipeline, even it is simple. If the user still prefers to deploy DAGs themselves, they can still deploy DAGs to $AIRFLOW_HOME/dags as Airflow still looks for DAGs in the old way. What I am proposing is just another way to deploy DAGs.


      Aside from that, exposing Airflow to each versions of DAGs means opens the opportunity to support versioned DagRun in the future. 

  4. I like the idea, I think this goes the right direction.

    I would also add that we should facilitate a "push" method apart (or instead) of the pulling.

    Maybe a rest endpoint that we could use on the github webhooks?

    I think more and more DAGs as "configurations" and less as actual scripts

  5. I think the idea of coupling a great workflow tool (Airflow) with one particular approach to deployment (direct pulls from git) to be limited, brittle, and add unnecessary bloat to Airflow.

    1. This isn't coupling Airflow to any sort of deployment approach, the URIs can be configured to point at literally anything. The dag fetcher will be such that you can build a fetcher for any sort of technology. Currently with Airflow the DAGs HAVE to be on a filesystem visible by it's services, that is much more restrictive than this is

    2. Airflow would not be limited to one particular approach of deployment. Airflow still scan DAG files under $AIRLFOW_HOME for DAGs. Fetcher actually give you more flexibility to deploy DAGs.

  6. The big issues that need to be overcome with a remote DAG fetcher is how to validate if the local cache is stale, and how do you handle DAG python dependencies (i.e. helper functions someone defined and saved in a separate file in the DAGs directory).

    I think handling python dependencies is the most difficult and most important to tackle.

    1. I am proposing that we register a list of URI to the root directory of remote repositories. The fetcher would recurse the remote directory, fetch DAG file and the dependencies under the same directory to local cache. And different fetchers would have different approach to validate the local cache copy. For example, GitFetcher would simply pull from master.


      1. This approach will cause problems if you have a very large DAG repository and you're using something like the k8s executor. That would mean every time a task pod spins up it'll have to fetch the entire repo, if you're repo is huge this will add a lot of overhead for executing tasks. As well it'll add a lot of overhead for restarting any of the services

        1. I see. Ian Davison can you tell me a bit more about how k8s executor get DAGs currently? I would imagine that it currently need to fetch the entire dags repository from somewhere when we spin up the pod. If so, do we see issues when the DAG repository is huge.

        2. Ian Davison  I would prefer bringing in the Remote DAG fetching and DAG manifest incrementally instead of at once to limit the scope of change of this AIP. Also DAG manifest will change the way users develop DAGs substantially and I think it deserves its own AIP-20 DAG manifest.


          Introducing remote DAG fetcher in the way I proposed is totally backward compatible as people can use Airflow in the old way and it won't degrade the performance of k8s executor as it already fetches entire git repo before spinning up the pod. We still can introduce DAG manifest afterwards with the current proposed implementation. We will provide a migration script that scans both local and remote repo for DAGs and populates the DAG manifest like in AIP-20 DAG manifest. So I think it is fine that we implement this AIP first without introducing DAG manifest.


          1. For the k8s executor for the task pods people will supply DAGs in one of three methods:

            1. Git pull on pod startup
            2. Bake the DAGs into the docker image used by the task pod
            3. Use a k8s Persistent Volume Claim which is essentially a shared network volume, however it's cloud dependent

            On of the hopes is that with a DAG fetcher and a manifest for the k8s executor it wouldn't be necessary to use a PVC anymore

            1. On of the hopes is that with a DAG fetcher and a manifest for the k8s executor it wouldn't be necessary to use a PVC anymore

              What is PVC?

              1. PVC = persistent volume claim.

  7. Chao-Han Tsai , sorry for the delay, just skim through the AIP. Here are my questions:

    1. How do we make sure the cache DAG in sync with remote one? Or how often do you fetch from remote dagbag to local cache?
    2. Where is dag_repositories.json located? If we want to add a new dag location, do we need to stop scheduler first?
    3. Correct me if my understanding is incorrect. Is it the idea to have scheduler spin up multiple processes to fetch DAGs from different remote location to local centralize location? It seems we could have two performance hits:
      1. scheduling latency will go up depending on how often we fetch. If we fetch too often, we will waste a lot of efforts; if not, users may have confusion on why their DAGs not show up(DAG freshness)
      2. It may hit network bound.


    1. Tao Feng thanks for the review!

      > How do we make sure the cache DAG in sync with remote one? Or how often do you fetch from remote dagbag to local cache?

      Each implementation of the DagFetcher will have different kind of cache invalidation mechanism. For example, GitDagFetcher can simply do a `git pull` and it will ensure that the local repo is the same as remote.

      Currently, `DagFileProcessorManager` periodically calls `DagFileProcessorManager._refresh_dag_dir` to look for new DAG files. I am thinking to change this method to fetch DAGs from remote source at first. So the frequency is determined by `dag_dir_list_interval` in airflow.cfg.

    2. > Where is dag_repositories.json located? If we want to add a new dag location, do we need to stop scheduler first?

      The path to `dag_repositories.json` can be specified in airflow.cfg or we can just put it under $AIRFLOW_HOME/dag_repositories.json so that airflow can discover it. We can make scheduler periodically reload `dag_repositories.json` to pick up newly added DAG repos.


    3. Correct me if my understanding is incorrect. Is it the idea to have scheduler spin up multiple processes to fetch DAGs from different remote location to local centralize location? It seems we could have two performance hits:

      1. scheduling latency will go up depending on how often we fetch. If we fetch too often, we will waste a lot of efforts; if not, users may have confusion on why their DAGs not show up(DAG freshness)
      2. It may hit network bound.

      Yeah the idea is that scheduler will spin up processes to fetch DAGs from remote sources. To avoid hitting network bound and increasing schedule latency, we kept local cache copy of DAGs and only fetch from remote when local cache is invalid. Let's take GitDagFetcher as an example, the fetcher would periodically performs a `git pull` and it would only fetch files when the local SHA is behind remote SHA. Most of the time, the local SHA should be the same as remote SHA and the latency introduced by `git pull` should be negligible. 

      And we should only need to fetch a commit or two each time when there are changes to remote given that we are frequently pulling from remote.


  8. How will the DAG model/ORM class be updated (as we are moving toward using that more as the primary source) so that we can track which repo (and which version?) it came from?

    We need to give some thought to how errors are handled and surfaced to any operators or monitoring - having the dag fetcher fail silently would be bad.

    1. How will the DAG model/ORM class be updated (as we are moving toward using that more as the primary source) so that we can track which repo (and which version?) it came from?

      I think we will need to record the repo_id for each DAG. In the future if we want to support versioned DagRun, we can record the version in DagRun table. So that when we execute the task, we can use the repo_id in DagModel and version in DagRun to fetch a specific version of repo. But I think that might worth another AIP?

    2. > We need to give some thought to how errors are handled and surfaced to any operators or monitoring - having the dag fetcher fail silently would be bad.

      When fetcher failed to update a repo, the scheduler should continue with the scheduling if the repo is not corrupted. We should offer statsd monitoring around fetcher failures.

    3. Ash Berlin-Taylor thanks for the review. Very insightful : )

  9. How do we map from the conn_id's you mentioned above to, say, a GitDagFetcherClass? How could we support pluggable fetchers?

    This AIP sounds like a nice addition but I think it needs fleshing out in more concrete detail about

    • how the cache would work,
    • what the API for the DagFetcher class is
    • what changes need making to the hooks(?) to support this
    • what we will use to write the Git based fetcher (I'd very much prefer it if we didn't shell out to make git calls but used a binding instead, or reused some existing tooling already written for this.)
    • how we minimize scheduler delay
    • anything we need to do to ensure that the versions between scheduler and workers don't get out of sync when running tasks.
    1. How do we map from the conn_id's you mentioned above to, say, a GitDagFetcherClass? How could we support pluggable fetchers?

      We will map from the URL to the corresponding fetcher implementation based on the the prefix. Say we have git://repo_name/dags and we will use the GitDagFetcher.

      I am thinking about supporting pluggable fetchers like how we support pluggable executors like in https://github.com/apache/airflow/blob/master/airflow/executors/__init__.py#L64

    2. > how the cache would work.

      I am thinking about letting user specifying a $cache_path and all the repos will be sync there like:

      $cache_path/repo1

      $cache_path/repo2

      And we will symlink it to $AIRFLOW_HOME/dags so that it is discoverable by Airflow. With regard to how we sync would be implementation details to different storage systems. For example, for git we should just kept pulling the latest.

      1. I think rather than symlinking we should update the DagBag etc to be able to read form multiple folders - that way the dags folder and repo caches could co-exist.

        1. I think even with the symlink approach, the dags folder and repo_cache can still co-exist. People still can deploy the DAG files to $AIRFLOW_HOME like the old ways and it still can be picked up by Airflow. It is just the symlink to cache repos and the DAG files will co-exist in the $AIRFLOW_HOME/dags, which I think may be messy.

          The reason I propose the symlink was to minimize the change but I agree we should make DagBag to support loading from multiple folders to achieve better isolation between the cache and the old $AIRFLOW_HOME/dags

      2. Any git sync want to allow a commitish, which could be: a specific commit, a branch, or a tag (could be lightweight or annotated). But changing commits also may require submodules (or may not  want them updated every pull), so options for that will probably be required too.

    3. > what the API for the DagFetcher class is

      class BaseDagFetcher():
          def fetch(repo_id, subpath=None):
          """
          Download files from remote storage to local directory under $CACHED_REPO/repo_id.
          Create a symlink in $AIRFLOW_HOME/dags/repo_id to $CACHED_REPO/repo_id or $CACHED_REPO/repo_id/subpath if subpath is      specified    """
      


      I am planning to do a quick POC first and the interface may change.


    4. > what changes need making to the hooks(?) to support this.

      Can you shed some light are what hooks you are referring to?

      1. I guess how do the hooks (which are what the conn_id points at right?) in the repo.json interact with the DagFetcher classes?

        1. During the fetch phase, we will initialize one process per repo. The process will use the prefix in the URL to find the corresponding fetcher implementation. For example, the URL is s3://bucket-name, and we will use the S3DagFecher. And the process will also pass the conn_id to the S3DagFetcher and it will retrieve the corresponding credential stored in https://github.com/apache/airflow/blob/6970b233964ee254bbb343ed8bdc906c2f7bd974/airflow/models/connection.py#L48



    5. > what we will use to write the Git based fetcher (I'd very much prefer it if we didn't shell out to make git calls but used a binding instead, or reused some existing tooling already written for this.).

      I am currently looking into https://github.com/gitpython-developers/GitPython, which is under BSD-3 license and I think it is already part of the Airflow dependency https://github.com/apache/airflow/blob/master/setup.py#L326.

      1. > GitPython is not suited for long-running processes (like daemons) as it tends to leak system resources


        I would have suggested libgit2/pygit2 but it's GPL'd (sad) https://github.com/libgit2/pygit2/blob/master/COPYING

        1. Thanks for pointing that out! I did not aware that GitPython isn't suitable for long running. I will do some research there.

        2. One thing I would like to point out is that we will spin up multiple processes and each one fetches one repo and exit the process after the fetch is complete. Since we will be using GitPython in a separate process that is not long running, we should be fine from the system leak. This is also one of the suggestion in the GitPython document: https://gitpython.readthedocs.io/en/stable/intro.html#leakage-of-system-resources

    6. > how we minimize scheduler delay

      We will need to block the scheduler to mutate the DAG files under $CACHE_REPOS that symlinked to $AIRFLOW/dags before scheduler runs self._refresh_dag_dir() to discover the DAG files. We will spin up one process to fetch DAG files for each repo. 

      With regard to the mutation I see there are two ways achieving it:

      (1) Pull synchronously during that time. I think GitDagFetcher may fall into this category as I think pulling changes from a Git repo should be fast each time given that we are pulling from remote periodically so the changes each time should be small.

      (2) Pulling changes asynchronously to other places and swap symlink to $AIRFLOW/dags during that time.

      1. I wrote a git poller that does this async: detect there's been a change, pull it, then stop the scheduler / swap symlink atomically / restart scheduler. I think it's generally a nice workflow.

        I would not make the assumption that pulling from git is always fast, or returns a sensible error. I think it's better to run async, and to warn if the async job has been failing/hanging for too long.

        1. I see. We can make GitDagFectcher async and swap symlink like you said : )

    7. > anything we need to do to ensure that the versions between scheduler and workers don't get out of sync when running tasks.

      The workers would always fetch the latest DAGs before the execution so it always runs the latest. It is possible that the DAG got updated between scheduling and execution and I think this is also an issue in the Airflow today.

      We will need to record the repo version/SHA in the DagRun so that when the task executes it will fetch that version of repo first before execution. I think this might worth another AIP.



      1. Not all remote stores will be versioned (like S3 buckets, which can or can not be depending on option). In those cases we just always have to pull latest. It might be wise to make a Versioned and Unversioned dagfetcher class.

        Also, always pulling latest before every task may not be desirable for load reasons on the backend. This should be probably be configurable.

        1. For a storage system like S3, we can enforce certain rules like storing the snapshot of DAG repo in:

          s3://path_to_repo/<version>

          and we save the version number in DagRun so that we use a specific version of repo throughout the DAG run. Of course, we will need some documentation around that.



  10. Similarly to few other voices in the discussion. I do not really see an added value in implementing this as part of Airflow core. The approach we have now which is "make the folder sync somehow" is good enough and much more flexible already. There are various ways you can make sure syncing is happening - depending on the data center, usual approach used by the organization, whether they USE GCS/S3/Azure/on-premises - they can make sure it happens in the way that will be most convenient for their deployments. Baking it in Airflow seems like an overkill. It goes against the approach which I think Airflow should follow "Do one thing only and do it well".

    For me Airflow should decide on how to push work to Celery/Kubernetes/LocalExecutor etc. and possibly - in the future - how to make sure code is consistent between schedulers and executors  (something that is being discussed in AIP-18 Persist all information from DAG file in DB ) but not how to "pull" Dag configuration files to the scheduler's folder. Just adding two/three synchronisation mechanisms to airflow in a "Plugginable" way would increase the code/architecture complexity with very limited added value I think. 

    The added value that I could see in this area revolves around AIP-20 DAG manifest - but (my comments will follow there soon) this should be more about making sure DAGs are "consistent".   I think current scanning mechanism is prone to inconsistencies - as Dags and related files (or even sub-workflows) might be simply inconsistent currently, especially with huge installations where scanning the directories might take a lot of time, there might be long periods where the loaded DAGs are not consistent and it might lead to a number of various problems (sometimes difficult to fix later). I think remote DAG fetcher does not address this in any way for now and we should give priority to AIG-20 instead. Maybe after this one is addressed we can come back to think about how to fetch the data. Then it might turn out that we want to synchronize DAGs in a completely different way than synchronising the whole folders. I can imagine various ways of doing it which do not involve whole folder syncing.

    1. Jarek Potiuk thanks for the review!

      I think the value of supporting remote dag fetching is to expose Airflow to external system where all versions of DAGs are stored to support versioned DagRun to guarantee code consistency between scheduler and executors. The scheduler records the repo version during scheduling and the executor can later on fetch a specific version of repo prior to task execution. Each task is responsible for fetching their own version of repo and we no longer need a shared $AIRFLOW_HOME/dags that are shared by all the processes and the executor become stateless.

      I agree that we can also achieve such code consistency among services through AIP-18 Persist all information from DAG file in DB but it will not be able to support dynamic DAGs, which seems to be a use case that is widely adopted by the users. Correct me if I am wrong.

      I do agree the fetcher will add more complexity to the Airflow core architecture and that would be the cost to take if we want to expose Airflow to an external DAG storage.

      I am thinking of implementing this AIP first to support versioned DagRun and synchronize at a repo level then work on AIP-20 DAG manifest to reduce the scope to DAG level so that the worker only need to fetch a bundle of files instead of cloning the whole repo.


      1. But this will only work (snapshotting the dags used) if we use GIT. If we use S3 or another source of remote files, then this snapshotting will not work in a generic way. So the whole point of remote DAG fetcher (generic fetching mechanism) cannot be really fulfilled.

        I think extending AIP-20 DAG manifest and starting from there might be a better idea. In manifest we can record DAGS + assets + versions of the files used - but we need to define how we want to do it. IMHO it should be somewhat automated. GIT is one of the options to use (and indeed quite a good one when I think about it). It can also have support for dynamically generated DAGs (depending at which stage those DAGs are generated)

        So maybe this "Remote DAG fetcher" as a standalone idea is not the "right" one. Maybe we should just join those two ideas - "Manifest + Git as the source of DAGs". We could record the version/hash of the files to be used in manifest and check them out in atomic way and let scheduler scan + use the right versions that has just been checked out.

        By adding manifest we can also achieve something that might be useful for packaging the dags for Kubernetes Executor with pre-baked DAGs. When we have the manifest and run a DAG from the manifest, we would know which DAGs we should also pull to build a pre-baked image. Currently Kubernetes Executor uses similar mechanism you describe with Git fetching the DAGs (on top of shared volumes), but it has long been planned to use pre-baked DAGs instead, I believe. Using Git makes the executor depends on shared Git service which can be avoided (and it requires baked-in authorisation). That could make AIP-18 Persist all information from DAG file in DB deprecated as well, as long as we can use Docker images with pre-baked DAGs in Celery and other executors (not only in Kubernetes).

        1. But this will only work (snapshotting the dags used) if we use GIT. If we use S3 or another source of remote files, then this snapshotting will not work in a generic way. So the whole point of remote DAG fetcher (generic fetching mechanism) cannot be really fulfilled.

          We still can snapshot the repo in S3 if user follow certain conventions like:

          s3://path_to_repo/<version>

          and we save the version number in DagRun so that we use a specific version of repo throughout the DAG run. Of course, we will need some documentation around that.

        2. I think extending AIP-20 DAG manifest and starting from there might be a better idea. In manifest we can record DAGS + assets + versions of the files used - but we need to define how we want to do it. IMHO it should be somewhat automated. GIT is one of the options to use (and indeed quite a good one when I think about it). It can also have support for dynamically generated DAGs (depending at which stage those DAGs are generated)


          So based on your suggestion, I would image that in the dag_manifest.json we will record the following (correct me if I am wrong):

          [
          	{
          		"dag_id": "abc",
          		"files": [
          			{
          				"file_name": "hello_dag.py",
          				"version": "Some git commit SHA"
          			},
          			{
          				"file_name": "constants.py",
          				"version": "Some git commit SHA"
          			}
          		]
          	}
          ]

          and we will need to provide some tooling to populate/update dag_manifest.json automatically whenever a user add/update a DAG, which may need some insight from you on we should approach this.

        3. Interesting. How does the scheduling to execution flow looks like if we are using pre-baked DAGs? Who will be responsible for baking DAG files into the image, it it Airflow scheduler or users?

          I can imagine that we will need to expose the image registry to Airflow and we will need to store the image SHA in the DagRun table so that we can have consistent code version during execution. 

          If we are pre-baking DAGs to images, do you think it make more sense to bake the whole DAG repo into the image instead just a DAG with its assets? 


  11. It would definitely be good to have a remote fetcher on VMs where we don't want all users to be able to write in DAGs directory.


    Problems with something like GCSFuse which uses GCS to store dags is they it is not that reliable and has several open issues. However, as Jarek Potiuk  mentioned we should weigh the advantages it has over just custom implementation. 


    And AIP-20 DAG manifest can probably cover this too.

  12. I can see the benefit of this – as right now I'm having difficulty in that my enterprise is enforcing deployments to AWS within the FARGATE clustering system, which appears to have NO concept of persistent storage or sharing of volumes among different services, and thus the only way to truly ensure that I'm able to pull and synchronize DAG updates post-deployment is to sync them with S3 buckets or other like technology.  GIT is out, as we are limited to only using resources within the VPC or AWS services that we can IAM, no Internet gateway or bi-directional data flow to the enterprise datacenters.

    I was in the process of inspecting the effort required to build a plug-in using Boto3 to do just this when I saw this AIP.

    I disagree with people thinking this is bloat or unnecessary - though it may be for your use case, I can assure you in my use case this is REQUIRED.

    Please keep pressing on this AIP, as I think it should be core functionality in a cloud-aware application to be able to use cloud-based storage.

  13. I came from Oozie Job World (with Py/Spark). When I submit the job to execution (to YARN):

    • it will build an isolated container with the code accessible from any node on the Hadoop cluster.
    • No extra care to deliver code on workers.
    • No meter if it's self-contained Python/Java code or dozens ETL's. 

    What's nice to have:

    • Guarantee that all Airflow nodes will have the same dag code at the same time. Even if it requires some flag or requires to wait more at the sync time.
    • If some celery/etc worker has no particular DAG or latest DAG version - do not try/run code on it.
    • I have problems to deliver DAG's dependencies to AWS EMR, docker container and similar services. We all must write workarounds to put the same code for the local development environment to the remote nodes. I think it's making hard to build, develop and deliver to production the same DAGs.

    What's not covered:

    We have ~> 70 data scientists & data engineers. We want to give an open and fast dev environment and stable production. 

    If we host all in git(e.g. GitHub) or AWS we must provide credentials to the all "clients" and workers. 

    It will be great if AIrflow server could sync DAGs on "master" only and we could get the code from this point of trust.


  14. Chao-Han Tsai any movement here?

    We see need for this with more than one group of developers trying to iterate / create new DAGs and maintain a production ready git flow our current solution is to use various per group owned DAG repos and then try to combine into one (git submodule potentially) but if Airflow core could handle multiple DAG locations that would make this multi-tenancy issue disappear! (smile) 

  15. I agree this and the general development of an external registry for DAGs could be useful and in some cases essential.   Would be really slow and overkill for just getting things onto S3, but couldn't you theoretically use Presto DB as the metadatabase (since SQLAlchemy supports this dialect) to achieve this, and then just push to the underlying s3 out of band?

    Still begs for a simpler solution, asking users to back their airflow with presto seems like a heavy lift.

  16. I have a question:

    Why does airflow dag not support reading directly from db, but reading from local DAG fetcher?
    The current way of discovering dag is scan local files and then synchronize to db. If I want to create a dag, I need to create a dag file in the scheduler dags_folder, and then synchronize the dag file to the web and worker. Why can't I store dag file to the db directly? then web, scheduler, and worker all obtain the dag file through the db?