Video of current problem: https://www.youtube.com/watch?v=sNrBruPS3r4
We propose to persist all information of a DAG.py file in the Airflow metastore and make the webserver read the state from the metastore instead of parsing the dags in the webserver process itself. This has several advantages:
Webserver Gunicorn processes read state from the DB instead of each webserver process managing its own state by reading the DAGs itself. This currently results in the weird disappearing and reappearing behaviour for several minutes after you make a change (e.g. add a new DAG). The DB will be the single source of truth.
Once the scheduler processed the DAG it should be visible for all Gunicorn workers. The webserver will not scan the DAG files anymore, only use it for specific actions:
Trigger; This requires the DAG to be processed once. This does not need to live in memory, and can be done on the fly.
Code; Just to view the code, execution of the DAG is not required here.
Required changes
The webserver should not use the DagBag but the metastore DB as the single source of truth. This will prevent DagBags in different Gunicorn workers to go out of sync with each other. Also the webserver does not need to process the DAG files anymore.
Related JIRA issue: https://jira.apache.org/jira/browse/AIRFLOW-3562
In order to achieve this all DAG components should be stored in the database. This is currently not the case so the following components should be persisted in the database:Biggest database change: DagEdges: https://jira.apache.org/jira/browse/AIRFLOW-3585
Other information will be detected during working on this.
Edges in the database
Adding edges to the database is needed to be able to visualize the graph of a DAG.
To do this there are 3 options:
Persist single version of DAG edges (latest only)
Problem is that each version is overwritten if the DAG is updated, so only the latest is stored
Adding edges for each DagRun.
In this implementation history is maintained.
Each DagRun has its own complete version of the DAG in the database, which is rather redundant.
Versioned graphs.
In order to have best of both, I suggest to store each version of a DAG graph with some ID, to which every DagRun can be linked so we preserve DAG history while not storing redundant data.
8 Comments
Ash Berlin-Taylor
On option 3 would each DAG run store a copy of the graph even if it was identical to the previous one, or would it do some form diff to share versions between dag runs?
Would the scheduler/DAG parser "assert" the latest version of the edge each time it parses the dag? If the graph changes between parses, but no RUNs used the older version, would that version simply disappear? (I think that's fine)
What happens to the graph of a DAG run if the dag file changes mid-way through the run? (i.e. task 1 has run, task 2 is running, task 3 would be next, but the DAG file is updated and task 4 and 5 replaces task 3)
I'm in favour of this, we just need to answer these questions.
Bas Harenslak
Most straight-forward IMO is:
Ash Berlin-Taylor
That is much easier said than done: the only thing that can be executed is what is in file at present - the graph edges cannot contain enough information to execute a task.
Peter van 't Hof
You're right. For now the goal is to fix the webserver issue. On the execution of a task the dag file is still required at after this change.
This change can be used in a later stage inside scheduler/task execution. This would be a nice to look at but doing this in this AIP will be to complex.
Dan Davydov
Copying my comments from github:
There is already a serialized representation of DAGs so DB serialization should probably go through that even though that might be a bit more work.
In the long run I envision DAG serialization happening on new Airflow clients, which send a request to a new Airflow service which basically serves as a CRUD wrapper around a DB to store both the SimpleDag as well as some kind of reference to some encapsulation of all of the python dependencies for the DAG (e.g. docker image name). This way all 3 of the webserver/worker/scheduler could use the same data model and source of truth. I feel it might make sense to figure out the long term plan first via an AIP and some brainstorming sessions and make sure there is an easy path forward from any intermediate proposals so we don't make our lives harder later undoing changes/figuring out how to do migrations. Security is another thing to keep in mind when thinking about this problem too since this work would be required for multi-tenancy in Airflow. This AIP should cover the transition concerns.
Lautren Adrienné Mccalmondt Cowell Nolan
Yingbo Wang
Bas Harenslak
In hindsight, this AIP was defined too broad and stalled in discussions. Julian de Ruiter and me created AIP-18 and AIP-19 to follow up with a better defined scope.