As a follow up on AIP-19 (making the webserver stateless) I suggest to store the DAG structure and metadata in the database. Where AIP-19 targets the webserver to read only from the DB, this AIP targets Airflow core reading the dag.py and storing all information in the DB. Currently, this is done only partially, and the webserver processes need to read the dag.py in order to fetch missing information which is very costly. This AIP should:
- Simplify architecture (disentangle processes)
- Make tasks fetch-able from the DB without having to read the dag.py file
- Improve performance by not having to read the full dag.py file when running a single task
What problem does this solve?
Currently dag.py files are parsed by multiple processes (webserver, scheduler & workers). This is:
- Unclear from a code perspective
- Unnecessary because we assume DAG schedule interval does not change every few seconds, and dag.py can be parsed only when required, to avoid re-parsing all the time. This problem is especially large with larger dag.py files. For example, a dag.py with a sleep(60) at the top level will take a long time to parse.
I suggest to make a "DagParser" that processes dag.py files only when necessary and storing the retrieved metadata in the database. Other processes should read only from the database.
Why is this change needed?
In the current architecture the dag.py parsing is entangled in multiple processes and is executed too often, leading to out of sync state and performance issues. See
I see Airflow being used in two ways:
- Static DAGs - the structure (tasks + dependencies) is static, does not change over time
- Dynamic DAGs - the structure changes over time, e.g. by reading the structure from a YAML file instead of the Python file itself. This makes it impossible to parse the dag.py file only once. You will need to re-parse the dag.py file at the time it is executed.
With only static DAGs, we could parse dag.py at the creation time and from then on only read the metadata from the database. However, with dynamic DAGs this is not possible since the dag.py code itself does not change, thus we can only detect changes by evaluating the dag.py file.
So, I suggest the following (simplified) architecture:
The DagParser responsibilities:
- Read dag.py files:
- When a dag.py file is added
- When a dag.py file is edited (detect with file listeners)
- With dynamic DAGs the dag.py will not change, so possibly periodically?
- When a DagRun starts, to fetch the current situation
- Extract metadata (name, start_date and schedule_interval most importantly) and DAG structure (tasks + dependencies)
- Tasks should be serialized with dill (currently also used by Airflow), which also supports e.g. lambda expressions
- For fast comparison, compute and store a hash from each byte stream. Whenever an already known dag.py file is parsed, we can lookup the hashes and store a new task if needed.
- Store metadata and DAG structure in the database tables DAG, Task and Edge
The scheduler responsibilities:
- Check if current datetime == next execution datetime for DAG
- If so, kick off the parsing process, to fetch the structure of a DAG at that point in time. This is required because we want to support dynamic DAGs, which must be reprocessed at the time of execution.
- Based on the fetched information, create a DagRun, possible new tasks, edges, and corresponding TaskInstances
Other things not included in this picture:
- Currently, the webserver parses DAG files itself. This should be removed (out of scope for this AIP), and the webserver should read ONLY from the database, or via an API. But NOT handle dag.py parsing itself.