Status
Motivation
Currently Airflow requires DAG files to be present on a file system that is accessible to the scheduler, webserver, and workers. Given that more and more people are running Airflow in a distributed setup to achieve higher scalability, it becomes more and more difficult to guarantee a file system that is accessible and synchronized amongst services. By allowing Airflow to fetch DAG files from a remote source outside the file system local to the service, this grant a much greater flexibility, eases implementation, and standardizes ways to sync remote sources of DAGs with Airflow.
Considerations
Is there anything special to consider about this AIP? Downsides? Difficultly in implementation or rollout etc?
What change do you propose to make?
DAGs are persisted in remote filesystem-like storage and Airflow need to know where to find them. DAG repository is introduced to record the remote root directory of DAG files. When loading the DAGs, Airflow would first download DAG files from the remote DAG repository and cache them under the local filesystem directory under $AIRFLOW_HOME/dags (or other location). Airflow should only fetch DAGs from remote source if the local copy is stale.
DAG Repository
We will create a file, dag_repositories.json under $AIRFLOW_HOME, to record the root directory of DAGs on the remote storage system. Multiple repositories are supported.
dag_repositories: [ "repo1": { "url": "s3://my-bucket/my-repo", "subpath": "dags" "conn_id": "blahblah" }, "repo2": { "url": "git://repo_name/dags", "conn_id": "blahblah2" } ]
DagFetcher
The following is the DagFetcher interface, we will implement different fetchers for different storage system, GitDagFetcher and S3DagFetcher. Say we have a dag_repositories.json configuration like above. DagFetcher would download files under s3://my-bucket/dags to $CACHED_REPOS/dags and we will create a symlink to $AIRFLOW_HOME/dags so that the files can be discovered by Airflow.
class BaseDagFetcher(): def fetch(repo_id, subpath=None): """ Download files from remote storage to local directory under $CACHED_REPO/repo_id. Create a symlink in $AIRFLOW_HOME/dags/repo_id to $CACHED_REPO/repo_id or $CACHED_REPO/repo_id/subpath if subpath is specified. """
Proposed changes
DagBag
We should ensure that we are loading the latest DAGs cache copy, thus we should fetch DAGs from remote repo before we load the DagBag.
Scheduler
Currently, DagFileProcessorManager periodically calls DagFileProcessorManager._refresh_dag_dir to look for new DAG files. We should change this method to fetch DAGs from remote source at first.
What problem does it solve?
In a distributed Airflow setup, we need to have an additional process to move DAG files to services like scheduler, webserver and workers and make sure DAG files are in sync. By allowing Airflow services to fetch DAGs from remote sources, we remove the need for such additional process that is a duplicate effort. It also standardize the way we synchronize DAGs among services.
Why is it needed?
Same as above.
Are there any downsides to this change?
The DAG loading performance may be impacted as Airflow services will need to do an additional step to fetch DAGs from remote sources. To minimize the performance impact, each Airflow service will keep a local cache of DAGs and will only fetch if local copy is stale.
Which users are affected by the change?
This would only impact users who specified remote sources of DAGs. Users who prefer to use Airflow in the old way (moving DAG files to $AIRFOW_HOME/dags by themselves) won't be affected.
How are users affected by the change? (e.g. DB upgrade required?)
DAG loading performance can be affected as there is an additional step to fetch DAGs from remote sources but we can minimize it by caching a local copy.
Other considerations?
What about DAG manifest? Will this design block us from supporting DAG manifest in the future?
No, it won't. Say the location of each DAG is specified in the DAG manifest. The fetcher would fetch DAGs based on the DAG manifest instead of dag_repositories.json. To ease the migration to adopt DAG manifest, we can create a script that scans DAG files on the remote sources to figure out the location and populate the DAG manifest accordingly. Check out AIP-20 DAG manifest if you are interested in learning more.
What about DAG versioning?
Enabling remote DAG fetching would open the gate for supporting DAG versioning in the future. Say we implemented AIP-20 DAG manifest afterwards, we will be able to support DAG-level versioning. We could record the version of DAGs we use for each DagRun. Therefore the current running DagRun would not be impacted if the DAG was updated in the middle.
What defines this AIP as "done"?
Airflow support at least fetching DAGs from one kind of remote source. I am thinking of supporting fetching from Git or S3 at first.
64 Comments
Chao-Han Tsai
Ash Berlin-Taylor Tao Feng would be great if I can gather some feedbacks from you first before I send the AIP out for a broader discussion.
Ezra Epstein
Don't most cloud services and similar offer the ability to mount a shared filesystem? And doesn't that solve this issue?
I think the approach support VCSes (git) and similar to be a bit hack-ish. In general we envision our deployment process to include pulling from git (or other) repos into local, or network-mounted, file-system folders. All that's needed is a way to "point" an airflow instance at that mounted folder and airflow already supports that.
I guess I'm unclear on the specific use case that is the driver for this proposal.
Chao-Han Tsai
> Don't most cloud services and similar offer the ability to mount a shared filesystem? And doesn't that solve this issue?
But what about those people who are hosting Airflow on their own datacenter without a shared filesystem. Should we push them to host their Airflow on a cloud service that support shared filesystem instead?
> I think the approach support VCSes (git) and similar to be a bit hack-ish.
Can you elaborate on why you think this is hack-ish? This removes the need to setup a deployment pipeline to pull DAG files from git to the filesystem. Also this would open other opportunities for Airflow as it gained access to all versions of DAGs. We can pin a DagRun to a specific version of DAG file so that running DagRun would not be impacted by changes to the DAG code.
Ian Davison
Not all cloud services support an easy way to mount things as a shared filesystem. This problem is especially evident with running things in k8s on AWS. This AIP goes hand in hand with creating a DAG manifest. This is because the current method of having the scheduler crawl a file system to find DAGs is not the most ideal. Doing a Git pull does work for some people for mounting their DAGs, but this can become unscalable if you have many many DAGs. A lot of inspiration for this AIP came from Maxime Beauchemin, the creator of Airflow, who in a presentation last September brought up the idea of having remote DAGs.
Moving towards a manifest with URIs allows for people to have more control over their DAG deployment, and
This change wouldn't break your current workflow there would simply be a way to generate a DAG manifest from a directory, but for a lot of more complicated cloud deployments, especially those with in k8s this will allow for Airflow services to be more stateless
Valeriys Soloviov
I faced the issue when we moved to k8s and there is no One-to-Many easy solution on AWS.
So, I must create an extra step of resync the dags on update.
Ezra Epstein
Most data centers - self hosted - have NAS, so it's a non-issue.
For others, the same workflow that pulls the Airflow config to create the image or deploy it can do multiple git pulls to a sibling folder and then set the env variable for the DAGS to point to the sibling folder. So, whether the deployment is manual or automatic, is is easily scriptable (bash etc.) and you can get 100% of the desired functionality without coupling Airflow to git or any other infrastructure tech. We do this and it's pretty simple to set up.
Sergio Kef
Does this assume that airflow is deployed along with the dags?
Ezra Epstein
It doesn't assume that. Both the model of deploying together and deploying the DAGs separately is supported. We support the latter in several ways including simply sub-folders of a sibling "dag" folder that is outside the main airflow install folder and via one or more files that dynamically create DagBags from arbitrary folder locations. Etc.
Chao-Han Tsai
I do agree that the DAG deployment pipeline is not complex. But I think if Airflow can fetch from git directly, it still removes the need for the deployment pipeline, even it is simple. If the user still prefers to deploy DAGs themselves, they can still deploy DAGs to $AIRFLOW_HOME/dags as Airflow still looks for DAGs in the old way. What I am proposing is just another way to deploy DAGs.
Aside from that, exposing Airflow to each versions of DAGs means opens the opportunity to support versioned DagRun in the future.
Sergio Kef
I like the idea, I think this goes the right direction.
I would also add that we should facilitate a "push" method apart (or instead) of the pulling.
Maybe a rest endpoint that we could use on the github webhooks?
I think more and more DAGs as "configurations" and less as actual scripts
Ezra Epstein
I think the idea of coupling a great workflow tool (Airflow) with one particular approach to deployment (direct pulls from git) to be limited, brittle, and add unnecessary bloat to Airflow.
Ian Davison
This isn't coupling Airflow to any sort of deployment approach, the URIs can be configured to point at literally anything. The dag fetcher will be such that you can build a fetcher for any sort of technology. Currently with Airflow the DAGs HAVE to be on a filesystem visible by it's services, that is much more restrictive than this is
Chao-Han Tsai
Airflow would not be limited to one particular approach of deployment. Airflow still scan DAG files under $AIRLFOW_HOME for DAGs. Fetcher actually give you more flexibility to deploy DAGs.
Ian Davison
The big issues that need to be overcome with a remote DAG fetcher is how to validate if the local cache is stale, and how do you handle DAG python dependencies (i.e. helper functions someone defined and saved in a separate file in the DAGs directory).
I think handling python dependencies is the most difficult and most important to tackle.
Chao-Han Tsai
I am proposing that we register a list of URI to the root directory of remote repositories. The fetcher would recurse the remote directory, fetch DAG file and the dependencies under the same directory to local cache. And different fetchers would have different approach to validate the local cache copy. For example, GitFetcher would simply pull from master.
Ian Davison
This approach will cause problems if you have a very large DAG repository and you're using something like the k8s executor. That would mean every time a task pod spins up it'll have to fetch the entire repo, if you're repo is huge this will add a lot of overhead for executing tasks. As well it'll add a lot of overhead for restarting any of the services
Chao-Han Tsai
I see. Ian Davison can you tell me a bit more about how k8s executor get DAGs currently? I would imagine that it currently need to fetch the entire dags repository from somewhere when we spin up the pod. If so, do we see issues when the DAG repository is huge.
Chao-Han Tsai
Looking at the code it seems like it pulls the code from git currently before it spin up the pods
https://github.com/apache/airflow/blob/1c43cde65c5c4fc6fae943927c5d75855ad545f0/airflow/contrib/executors/kubernetes_executor.py#L156-L158
Chao-Han Tsai
Ian Davison I would prefer bringing in the Remote DAG fetching and DAG manifest incrementally instead of at once to limit the scope of change of this AIP. Also DAG manifest will change the way users develop DAGs substantially and I think it deserves its own AIP-20 DAG manifest.
Introducing remote DAG fetcher in the way I proposed is totally backward compatible as people can use Airflow in the old way and it won't degrade the performance of k8s executor as it already fetches entire git repo before spinning up the pod. We still can introduce DAG manifest afterwards with the current proposed implementation. We will provide a migration script that scans both local and remote repo for DAGs and populates the DAG manifest like in AIP-20 DAG manifest. So I think it is fine that we implement this AIP first without introducing DAG manifest.
Ian Davison
For the k8s executor for the task pods people will supply DAGs in one of three methods:
On of the hopes is that with a DAG fetcher and a manifest for the k8s executor it wouldn't be necessary to use a PVC anymore
Chao-Han Tsai
> On of the hopes is that with a DAG fetcher and a manifest for the k8s executor it wouldn't be necessary to use a PVC anymore
What is PVC?
Ash Berlin-Taylor
PVC = persistent volume claim.
Chao-Han Tsai
Tao Feng Ash Berlin-Taylor Kaxil Naik Xiaodong DENG Fokko Driesprong
Would be great if I can get some feedbacks from you
Tao Feng
Chao-Han Tsai , sorry for the delay, just skim through the AIP. Here are my questions:
Chao-Han Tsai
Tao Feng thanks for the review!
> How do we make sure the cache DAG in sync with remote one? Or how often do you fetch from remote dagbag to local cache?
Each implementation of the DagFetcher will have different kind of cache invalidation mechanism. For example, GitDagFetcher can simply do a `git pull` and it will ensure that the local repo is the same as remote.
Currently, `DagFileProcessorManager` periodically calls `DagFileProcessorManager._refresh_dag_dir` to look for new DAG files. I am thinking to change this method to fetch DAGs from remote source at first. So the frequency is determined by `dag_dir_list_interval` in airflow.cfg.
Chao-Han Tsai
> Where is dag_repositories.json located? If we want to add a new dag location, do we need to stop scheduler first?
The path to `dag_repositories.json` can be specified in airflow.cfg or we can just put it under $AIRFLOW_HOME/dag_repositories.json so that airflow can discover it. We can make scheduler periodically reload `dag_repositories.json` to pick up newly added DAG repos.
Chao-Han Tsai
Yeah the idea is that scheduler will spin up processes to fetch DAGs from remote sources. To avoid hitting network bound and increasing schedule latency, we kept local cache copy of DAGs and only fetch from remote when local cache is invalid. Let's take GitDagFetcher as an example, the fetcher would periodically performs a `git pull` and it would only fetch files when the local SHA is behind remote SHA. Most of the time, the local SHA should be the same as remote SHA and the latency introduced by `git pull` should be negligible.
And we should only need to fetch a commit or two each time when there are changes to remote given that we are frequently pulling from remote.
Ash Berlin-Taylor
How will the DAG model/ORM class be updated (as we are moving toward using that more as the primary source) so that we can track which repo (and which version?) it came from?
We need to give some thought to how errors are handled and surfaced to any operators or monitoring - having the dag fetcher fail silently would be bad.
Chao-Han Tsai
> How will the DAG model/ORM class be updated (as we are moving toward using that more as the primary source) so that we can track which repo (and which version?) it came from?
I think we will need to record the repo_id for each DAG. In the future if we want to support versioned DagRun, we can record the version in DagRun table. So that when we execute the task, we can use the repo_id in DagModel and version in DagRun to fetch a specific version of repo. But I think that might worth another AIP?
Chao-Han Tsai
> We need to give some thought to how errors are handled and surfaced to any operators or monitoring - having the dag fetcher fail silently would be bad.
When fetcher failed to update a repo, the scheduler should continue with the scheduling if the repo is not corrupted. We should offer statsd monitoring around fetcher failures.
Chao-Han Tsai
Ash Berlin-Taylor thanks for the review. Very insightful : )
Ash Berlin-Taylor
How do we map from the conn_id's you mentioned above to, say, a GitDagFetcherClass? How could we support pluggable fetchers?
This AIP sounds like a nice addition but I think it needs fleshing out in more concrete detail about
Chao-Han Tsai
> How do we map from the conn_id's you mentioned above to, say, a GitDagFetcherClass? How could we support pluggable fetchers?
We will map from the URL to the corresponding fetcher implementation based on the the prefix. Say we have git://repo_name/dags and we will use the GitDagFetcher.
I am thinking about supporting pluggable fetchers like how we support pluggable executors like in https://github.com/apache/airflow/blob/master/airflow/executors/__init__.py#L64
Chao-Han Tsai
> how the cache would work.
I am thinking about letting user specifying a $cache_path and all the repos will be sync there like:
$cache_path/repo1
$cache_path/repo2
And we will symlink it to $AIRFLOW_HOME/dags so that it is discoverable by Airflow. With regard to how we sync would be implementation details to different storage systems. For example, for git we should just kept pulling the latest.
Ash Berlin-Taylor
I think rather than symlinking we should update the DagBag etc to be able to read form multiple folders - that way the dags folder and repo caches could co-exist.
Chao-Han Tsai
I think even with the symlink approach, the dags folder and repo_cache can still co-exist. People still can deploy the DAG files to $AIRFLOW_HOME like the old ways and it still can be picked up by Airflow. It is just the symlink to cache repos and the DAG files will co-exist in the $AIRFLOW_HOME/dags, which I think may be messy.
The reason I propose the symlink was to minimize the change but I agree we should make DagBag to support loading from multiple folders to achieve better isolation between the cache and the old $AIRFLOW_HOME/dags
James Meickle
Any git sync want to allow a commitish, which could be: a specific commit, a branch, or a tag (could be lightweight or annotated). But changing commits also may require submodules (or may not want them updated every pull), so options for that will probably be required too.
Chao-Han Tsai
> what the API for the DagFetcher class is
I am planning to do a quick POC first and the interface may change.
Chao-Han Tsai
> what changes need making to the hooks(?) to support this.
Can you shed some light are what hooks you are referring to?
Ash Berlin-Taylor
I guess how do the hooks (which are what the conn_id points at right?) in the repo.json interact with the DagFetcher classes?
Chao-Han Tsai
During the fetch phase, we will initialize one process per repo. The process will use the prefix in the URL to find the corresponding fetcher implementation. For example, the URL is s3://bucket-name, and we will use the S3DagFecher. And the process will also pass the conn_id to the S3DagFetcher and it will retrieve the corresponding credential stored in https://github.com/apache/airflow/blob/6970b233964ee254bbb343ed8bdc906c2f7bd974/airflow/models/connection.py#L48
Chao-Han Tsai
> what we will use to write the Git based fetcher (I'd very much prefer it if we didn't shell out to make git calls but used a binding instead, or reused some existing tooling already written for this.).
I am currently looking into https://github.com/gitpython-developers/GitPython, which is under BSD-3 license and I think it is already part of the Airflow dependency https://github.com/apache/airflow/blob/master/setup.py#L326.
Ash Berlin-Taylor
> GitPython is not suited for long-running processes (like daemons) as it tends to leak system resources
I would have suggested libgit2/pygit2 but it's GPL'd https://github.com/libgit2/pygit2/blob/master/COPYING
Chao-Han Tsai
Thanks for pointing that out! I did not aware that GitPython isn't suitable for long running. I will do some research there.
Chao-Han Tsai
One thing I would like to point out is that we will spin up multiple processes and each one fetches one repo and exit the process after the fetch is complete. Since we will be using GitPython in a separate process that is not long running, we should be fine from the system leak. This is also one of the suggestion in the GitPython document: https://gitpython.readthedocs.io/en/stable/intro.html#leakage-of-system-resources
Chao-Han Tsai
> how we minimize scheduler delay
We will need to block the scheduler to mutate the DAG files under $CACHE_REPOS that symlinked to $AIRFLOW/dags before scheduler runs self._refresh_dag_dir() to discover the DAG files. We will spin up one process to fetch DAG files for each repo.
With regard to the mutation I see there are two ways achieving it:
(1) Pull synchronously during that time. I think GitDagFetcher may fall into this category as I think pulling changes from a Git repo should be fast each time given that we are pulling from remote periodically so the changes each time should be small.
(2) Pulling changes asynchronously to other places and swap symlink to $AIRFLOW/dags during that time.
James Meickle
I wrote a git poller that does this async: detect there's been a change, pull it, then stop the scheduler / swap symlink atomically / restart scheduler. I think it's generally a nice workflow.
I would not make the assumption that pulling from git is always fast, or returns a sensible error. I think it's better to run async, and to warn if the async job has been failing/hanging for too long.
Chao-Han Tsai
I see. We can make GitDagFectcher async and swap symlink like you said : )
Chao-Han Tsai
> anything we need to do to ensure that the versions between scheduler and workers don't get out of sync when running tasks.
The workers would always fetch the latest DAGs before the execution so it always runs the latest. It is possible that the DAG got updated between scheduling and execution and I think this is also an issue in the Airflow today.
We will need to record the repo version/SHA in the DagRun so that when the task executes it will fetch that version of repo first before execution. I think this might worth another AIP.
James Meickle
Not all remote stores will be versioned (like S3 buckets, which can or can not be depending on option). In those cases we just always have to pull latest. It might be wise to make a Versioned and Unversioned dagfetcher class.
Also, always pulling latest before every task may not be desirable for load reasons on the backend. This should be probably be configurable.
Chao-Han Tsai
For a storage system like S3, we can enforce certain rules like storing the snapshot of DAG repo in:
s3://path_to_repo/<version>
and we save the version number in DagRun so that we use a specific version of repo throughout the DAG run. Of course, we will need some documentation around that.
Jarek Potiuk
Similarly to few other voices in the discussion. I do not really see an added value in implementing this as part of Airflow core. The approach we have now which is "make the folder sync somehow" is good enough and much more flexible already. There are various ways you can make sure syncing is happening - depending on the data center, usual approach used by the organization, whether they USE GCS/S3/Azure/on-premises - they can make sure it happens in the way that will be most convenient for their deployments. Baking it in Airflow seems like an overkill. It goes against the approach which I think Airflow should follow "Do one thing only and do it well".
For me Airflow should decide on how to push work to Celery/Kubernetes/LocalExecutor etc. and possibly - in the future - how to make sure code is consistent between schedulers and executors (something that is being discussed in AIP-18 Persist all information from DAG file in DB ) but not how to "pull" Dag configuration files to the scheduler's folder. Just adding two/three synchronisation mechanisms to airflow in a "Plugginable" way would increase the code/architecture complexity with very limited added value I think.
The added value that I could see in this area revolves around AIP-20 DAG manifest - but (my comments will follow there soon) this should be more about making sure DAGs are "consistent". I think current scanning mechanism is prone to inconsistencies - as Dags and related files (or even sub-workflows) might be simply inconsistent currently, especially with huge installations where scanning the directories might take a lot of time, there might be long periods where the loaded DAGs are not consistent and it might lead to a number of various problems (sometimes difficult to fix later). I think remote DAG fetcher does not address this in any way for now and we should give priority to AIG-20 instead. Maybe after this one is addressed we can come back to think about how to fetch the data. Then it might turn out that we want to synchronize DAGs in a completely different way than synchronising the whole folders. I can imagine various ways of doing it which do not involve whole folder syncing.
Chao-Han Tsai
Jarek Potiuk thanks for the review!
I think the value of supporting remote dag fetching is to expose Airflow to external system where all versions of DAGs are stored to support versioned DagRun to guarantee code consistency between scheduler and executors. The scheduler records the repo version during scheduling and the executor can later on fetch a specific version of repo prior to task execution. Each task is responsible for fetching their own version of repo and we no longer need a shared $AIRFLOW_HOME/dags that are shared by all the processes and the executor become stateless.
I agree that we can also achieve such code consistency among services through AIP-18 Persist all information from DAG file in DB but it will not be able to support dynamic DAGs, which seems to be a use case that is widely adopted by the users. Correct me if I am wrong.
I do agree the fetcher will add more complexity to the Airflow core architecture and that would be the cost to take if we want to expose Airflow to an external DAG storage.
I am thinking of implementing this AIP first to support versioned DagRun and synchronize at a repo level then work on AIP-20 DAG manifest to reduce the scope to DAG level so that the worker only need to fetch a bundle of files instead of cloning the whole repo.
Jarek Potiuk
But this will only work (snapshotting the dags used) if we use GIT. If we use S3 or another source of remote files, then this snapshotting will not work in a generic way. So the whole point of remote DAG fetcher (generic fetching mechanism) cannot be really fulfilled.
I think extending AIP-20 DAG manifest and starting from there might be a better idea. In manifest we can record DAGS + assets + versions of the files used - but we need to define how we want to do it. IMHO it should be somewhat automated. GIT is one of the options to use (and indeed quite a good one when I think about it). It can also have support for dynamically generated DAGs (depending at which stage those DAGs are generated)
So maybe this "Remote DAG fetcher" as a standalone idea is not the "right" one. Maybe we should just join those two ideas - "Manifest + Git as the source of DAGs". We could record the version/hash of the files to be used in manifest and check them out in atomic way and let scheduler scan + use the right versions that has just been checked out.
By adding manifest we can also achieve something that might be useful for packaging the dags for Kubernetes Executor with pre-baked DAGs. When we have the manifest and run a DAG from the manifest, we would know which DAGs we should also pull to build a pre-baked image. Currently Kubernetes Executor uses similar mechanism you describe with Git fetching the DAGs (on top of shared volumes), but it has long been planned to use pre-baked DAGs instead, I believe. Using Git makes the executor depends on shared Git service which can be avoided (and it requires baked-in authorisation). That could make AIP-18 Persist all information from DAG file in DB deprecated as well, as long as we can use Docker images with pre-baked DAGs in Celery and other executors (not only in Kubernetes).
Chao-Han Tsai
> But this will only work (snapshotting the dags used) if we use GIT. If we use S3 or another source of remote files, then this snapshotting will not work in a generic way. So the whole point of remote DAG fetcher (generic fetching mechanism) cannot be really fulfilled.
We still can snapshot the repo in S3 if user follow certain conventions like:
s3://path_to_repo/<version>
and we save the version number in DagRun so that we use a specific version of repo throughout the DAG run. Of course, we will need some documentation around that.
Chao-Han Tsai
> I think extending AIP-20 DAG manifest and starting from there might be a better idea. In manifest we can record DAGS + assets + versions of the files used - but we need to define how we want to do it. IMHO it should be somewhat automated. GIT is one of the options to use (and indeed quite a good one when I think about it). It can also have support for dynamically generated DAGs (depending at which stage those DAGs are generated)
So based on your suggestion, I would image that in the dag_manifest.json we will record the following (correct me if I am wrong):
and we will need to provide some tooling to populate/update dag_manifest.json automatically whenever a user add/update a DAG, which may need some insight from you on we should approach this.
Chao-Han Tsai
Interesting. How does the scheduling to execution flow looks like if we are using pre-baked DAGs? Who will be responsible for baking DAG files into the image, it it Airflow scheduler or users?
I can imagine that we will need to expose the image registry to Airflow and we will need to store the image SHA in the DagRun table so that we can have consistent code version during execution.
If we are pre-baking DAGs to images, do you think it make more sense to bake the whole DAG repo into the image instead just a DAG with its assets?
Kaxil Naik
It would definitely be good to have a remote fetcher on VMs where we don't want all users to be able to write in DAGs directory.
Problems with something like GCSFuse which uses GCS to store dags is they it is not that reliable and has several open issues. However, as Jarek Potiuk mentioned we should weigh the advantages it has over just custom implementation.
And AIP-20 DAG manifest can probably cover this too.
Ken Melms
I can see the benefit of this – as right now I'm having difficulty in that my enterprise is enforcing deployments to AWS within the FARGATE clustering system, which appears to have NO concept of persistent storage or sharing of volumes among different services, and thus the only way to truly ensure that I'm able to pull and synchronize DAG updates post-deployment is to sync them with S3 buckets or other like technology. GIT is out, as we are limited to only using resources within the VPC or AWS services that we can IAM, no Internet gateway or bi-directional data flow to the enterprise datacenters.
I was in the process of inspecting the effort required to build a plug-in using Boto3 to do just this when I saw this AIP.
I disagree with people thinking this is bloat or unnecessary - though it may be for your use case, I can assure you in my use case this is REQUIRED.
Please keep pressing on this AIP, as I think it should be core functionality in a cloud-aware application to be able to use cloud-based storage.
Valeriys Soloviov
I came from Oozie Job World (with Py/Spark). When I submit the job to execution (to YARN):
What's nice to have:
What's not covered:
We have ~> 70 data scientists & data engineers. We want to give an open and fast dev environment and stable production.
If we host all in git(e.g. GitHub) or AWS we must provide credentials to the all "clients" and workers.
It will be great if AIrflow server could sync DAGs on "master" only and we could get the code from this point of trust.
David Pryce
Chao-Han Tsai any movement here?
We see need for this with more than one group of developers trying to iterate / create new DAGs and maintain a production ready git flow our current solution is to use various per group owned DAG repos and then try to combine into one (git submodule potentially) but if Airflow core could handle multiple DAG locations that would make this multi-tenancy issue disappear!
Kyle Prifogle
I agree this and the general development of an external registry for DAGs could be useful and in some cases essential. Would be really slow and overkill for just getting things onto S3, but couldn't you theoretically use Presto DB as the metadatabase (since SQLAlchemy supports this dialect) to achieve this, and then just push to the underlying s3 out of band?
Still begs for a simpler solution, asking users to back their airflow with presto seems like a heavy lift.
luoyuliuyin
I have a question:
Why does airflow dag not support reading directly from db, but reading from local DAG fetcher?
The current way of discovering dag is scan local files and then synchronize to db. If I want to create a dag, I need to create a dag file in the scheduler dags_folder, and then synchronize the dag file to the web and worker. Why can't I store dag file to the db directly? then web, scheduler, and worker all obtain the dag file through the db?
Kaxil Naik
Superseded by AIP-66