As a follow up on AIP-19 (making the webserver stateless) I suggest to store the DAG structure and metadata in the database. Where AIP-19 targets the webserver to read only from the DB, this AIP targets Airflow core reading the dag.py and storing all information in the DB. Currently, this is done only partially, and the webserver processes need to read the dag.py in order to fetch missing information which is very costly. This AIP should:
- Simplify architecture (disentangle processes)
- Make tasks fetch-able from the DB without having to read the dag.py file
- Improve performance by not having to read the full dag.py file when running a single task
What problem does this solve?
Currently dag.py files are parsed by multiple processes (webserver, scheduler & workers). This is:
- Unclear from a code perspective
- Unnecessary because we assume DAG schedule interval does not change every few seconds, and dag.py can be parsed only when required, to avoid re-parsing all the time. This problem is especially large with larger dag.py files. For example, a dag.py with a sleep(60) at the top level will take a long time to parse.
I suggest to make a "DagParser" that processes dag.py files only when necessary and storing the retrieved metadata in the database. Other processes should read only from the database.
Why is this change needed?
In the current architecture the dag.py parsing is entangled in multiple processes and is executed too often, leading to out of sync state and performance issues. See
I see Airflow being used in two ways:
- Static DAGs - the structure (tasks + dependencies) is static, does not change over time
- Dynamic DAGs - the structure changes over time, e.g. by reading the structure from a YAML file instead of the Python file itself. This makes it impossible to parse the dag.py file only once. You will need to re-parse the dag.py file at the time it is executed.
With only static DAGs, we could parse dag.py at the creation time and from then on only read the metadata from the database. However, with dynamic DAGs this is not possible since the dag.py code itself does not change, thus we can only detect changes by evaluating the dag.py file.
So, I suggest the following (simplified) architecture:
The DagParser responsibilities:
- Read dag.py files:
- When a dag.py file is added
- When a dag.py file is edited (detect with file listeners)
- With dynamic DAGs the dag.py will not change, so possibly periodically?
- When a DagRun starts, to fetch the current situation
- Extract metadata (name, start_date and schedule_interval most importantly) and DAG structure (tasks + dependencies)
- Tasks should be serialized with dill (currently also used by Airflow), which also supports e.g. lambda expressions
- For fast comparison, compute and store a hash from each byte stream. Whenever an already known dag.py file is parsed, we can lookup the hashes and store a new task if needed.
- Store metadata and DAG structure in the database tables DAG, Task and Edge
The scheduler responsibilities:
- Check if current datetime == next execution datetime for DAG
- If so, kick off the parsing process, to fetch the structure of a DAG at that point in time. This is required because we want to support dynamic DAGs, which must be reprocessed at the time of execution.
- Based on the fetched information, create a DagRun, possible new tasks, edges, and corresponding TaskInstances
Other things not included in this picture:
- Currently, the webserver parses DAG files itself. This should be removed (out of scope for this AIP), and the webserver should read ONLY from the database, or via an API. But NOT handle dag.py parsing itself.
I think there's another critical reason to do this. Lineage. Dags evolve over time and indeed because they are code they can react to the environment at the time they are run and potentially generate different tasks.
This "what happened" view is completely lost at the moment from the UI. The graph rendered in the UI is always based on the Dag and the environment "right now." So it represents an "as-is" view of history when what would be far more useful for troubleshooting is an "as was" view.
So if on a particular day, let's say Wednesday, the DagRun steps generated were a → b → c then there should be some way of definitely retrieving that view when I look at that view after the Dag has completed. If, two days later, I try to view that DagRun, with the current setup, and the Dag code generate a special "end of week" step for a Friday then what happens is the DagRun view rendered in the UX gets rendered including this final eow step, which was never actually there. Because of this, the task instance record is missing so it shows up as if it's never been run (which is true, it wasn't.). But it's confusing for people trying to support these things.
Worse, imagine the opposite. On Friday a → b → c → eow was run, but I look at that five days later. In that case, because it's not Friday, eow is not generated. It just vanishes from the historical view of the Dag in the UI. I can go find out what happened in the database but it's not obvious at all and indeed a lot of the dependencies are lost.
What I'm saying is, there is at least one other very good reason to do this which is nothing to do with making the Webserver stateless, or changing how Dags are parsed and used during execution. There's just a much needed feature missing right now, which is the "as-was" view of history. Aka "Show me what happened when this DagRun happened; not a hybrid of what it might have looked like if I ran it now.
I've got plenty of examples of this if anyone cares to take a look.
Indeed! Stateless webserver + all data in DB are related but not the same. IMO storing all state in the webserver would allow "truly" dynamic DAGs like you describe, and preserve DAG structure history for every DAG run.
What's your opinion on the following: you have a dynamic DAG, where tasks are generated based on whatever. You run the DAG and all is fine, but then you want to rerun some tasks from the past. With the current code, the tasks will look different. E.g. 3 days ago 5 tasks were generated and now 10 tasks would be generated. Do you (1) choose to run with the code from 3 days ago (requires keeping track of all code ever run), (2) choose to run the code as it is right now (current behaviour, will override the DAG structure from 3 days ago with current code) or (3) request the user to rerun with either the old or the current code?
Ideally I'd pick option #3, but haven't got clear in my head how that would work out yet.
Quick follow on question from re-reading your comment. You say "IMO storing all state in the webserver" but I think you meant "database"?
There are challenges with automatically running with old code. Airflow doesn't define any repository for the dags and associated libraries, their deployment is managed on the file system, so attempting to keep multiple copies could be problematic at best. I think that would have to fall outside this particular AIP.
In my opinion, for whatever it's worth, option 2 is already there and quite possibly what a lot of people want a lot of the time (the best example being I need to re-run with the latest code because there was a bug in the old code and I've fixed it.)
Option 3 is something that a support person would have to find a way to do manually, since effectively you're trying to "roll back a code deployment at the same time as the latest code is there." To make that work Airflow would likely have to become a lot more opinionated about how it was deployed and run in order to maintain some form of isolation, whereas it's a lot more practical for teams using airflow to design their operators with backwards compatibility in mind and let configuration drive what those operators do; or in one off situations to do something manually to fix problems. I may be making a more bold statement for option 3 though. I'm saying I would almost view that as "totally outside the scope of Airflow at this time." If you need to do that, figure a way to do it by spinning up an airflow instance with the old code etc. If it happens regularly have that backup airflow instance always ready if you need it etc etc. I think ultimately that's much easier for a support or devops team to manage.
To your general comment on "truly" dynamic Dags, we make heavy use of them. I am running a multi-tenant platform for which I run a single dag per client, but that dag makes heavy use of nested sub dags which are designed to be shared across all clients, and they generate the tasks "that client needs." I have configuration in variables which tell me, for instance "This client has customer satisfaction data" whereas "That client doesn't." As such, for the second client, none of the csat tasks are ever generated. But if, one day, (very likely in my case) that client says "I've got csat data now" I just have to tweak a variable and lo and behold those tasks are generated. It makes managing a large number of clients processing in a simple way, with highly common code, much much easier.
Ideally I'd love it if a task could actually cause the downstream scheduled tasks to change. For instance if I decide I want to spawn a lot of copies of an operator to load in a large set of data, I'd love to be able to decide, based on the output of one operator, which tells me how much work I need to do, to drive how many of those operators I need to spawn. As of today I need to either rely on the underlying operators to do that behind the scenes, or I have to back into it from a starting point of "N" processes and then make the ones that aren't needed just exit immediately, which isn't ideal. This is most certainly beyond the scope of this AIP though!
Longwinded answer, sorry. In short. I agree with you. Option 2 is already there and Option 1 is a whooooooooooole other world of pain and complexity, so supporting Options 2 and 3 would be my pragmatic view of it (and as noted, I think I have a more draconian view of option 3 than you do!)