This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Skip to end of metadata
Go to start of metadata
StateAbandoned in favour of AIP-18 and AIP-19
Discussion Thread[DISCUSS] AIP-12 Persist DAG into DB
JIRA

AIRFLOW-3562 - Getting issue details... STATUS

Created

2019-01-30

Video of current problem: https://www.youtube.com/watch?v=sNrBruPS3r4

We propose to persist all information of a DAG.py file in the Airflow metastore and make the webserver read the state from the metastore instead of parsing the dags in the webserver process itself. This has several advantages:

  • Webserver Gunicorn processes read state from the DB instead of each webserver process managing its own state by reading the DAGs itself. This currently results in the weird disappearing and reappearing behaviour for several minutes after you make a change (e.g. add a new DAG). The DB will be the single source of truth.

  • Once the scheduler processed the DAG it should be visible for all Gunicorn workers. The webserver will not scan the DAG files anymore, only use it for specific actions:

    • Trigger; This requires the DAG to be processed once. This does not need to live in memory, and can be done on the fly.

    • Code; Just to view the code, execution of the DAG is not required here.

Required changes

  • The webserver should not use the DagBag but the metastore DB as the single source of truth. This will prevent DagBags in different Gunicorn workers to go out of sync with each other. Also the webserver does not need to process the DAG files anymore.
    Related JIRA issue: https://jira.apache.org/jira/browse/AIRFLOW-3562
    In order to achieve this all DAG components should be stored in the database. This is currently not the case so the following components should be persisted in the database:

Edges in the database

Adding edges to the database is needed to be able to visualize the graph of a DAG.

To do this there are 3 options:

  1. Persist single version of DAG edges (latest only)

    • Problem is that each version is overwritten if the DAG is updated, so only the latest is stored

  2. Adding edges for each DagRun.

    • In this implementation history is maintained.

    • Each DagRun has its own complete version of the DAG in the database, which is rather redundant.

  3. Versioned graphs.

    • In order to have best of both, I suggest to store each version of a DAG graph with some ID, to which every DagRun can be linked so we preserve DAG history while not storing redundant data.

8 Comments

  1. On option 3 would each DAG run store a copy of the graph even if it was identical to the previous one, or would it do some form diff to share versions between dag runs?

    Would the scheduler/DAG parser "assert" the latest version of the edge each time it parses the dag? If the graph changes between parses, but no RUNs used the older version, would that version simply disappear? (I think that's fine)

    What happens to the graph of a DAG run if the dag file changes mid-way through the run? (i.e. task 1 has run, task 2 is running, task 3 would be next, but the DAG file is updated and task 4 and 5 replaces task 3)

    I'm in favour of this, we just need to answer these questions.

  2. Most straight-forward IMO is:

    • Iff a DAG (tasks & edges) changes, store the complete new version
    • Store the DAG separate from the DagRun, and set a foreign key in the dag_run table to the correct DAG version, so that each version is stored only once
    • Make each DagRun execute only a single DAG version
    • Set the DAG version at the start of a DagRun and don't allow changes half-way a DagRun
    • With all the above, we would persist a complete DAG version in the DB on every change of a DAG. Once a DagRun starts, it would run the latest version of a DAG, and if there are more >1 new DAG versions since the last run we would end up with a "dangling" DAG version. The version would simply never be used but it will still exist in the DB, and we'd need a separate "DAG version cleaner process" to remove these versions. I'd say this has low priority and can be picked up at later point in time.
  3. Make each DagRun execute only a single DAG version

    That is much easier said than done: the only thing that can be executed is what is in file at present - the graph edges cannot contain enough information to execute a task.

  4. You're right. For now the goal is to fix the webserver issue. On the execution of a task the dag file is still required at after this change.

    This change can be used in a later stage inside scheduler/task execution. This would be a nice to look at but doing this in this AIP will be to complex.

  5. Copying my comments from github:

    There is already a serialized representation of DAGs so DB serialization should probably go through that even though that might be a bit more work.

    In the long run I envision DAG serialization happening on new Airflow clients, which send a request to a new Airflow service which basically serves as a CRUD wrapper around a DB to store both the SimpleDag as well as some kind of reference to some encapsulation of all of the python dependencies for the DAG (e.g. docker image name). This way all 3 of the webserver/worker/scheduler could use the same data model and source of truth. I feel it might make sense to figure out the long term plan first via an AIP and some brainstorming sessions and make sure there is an easy path forward from any intermediate proposals so we don't make our lives harder later undoing changes/figuring out how to do migrations. Security is another thing to keep in mind when thinking about this problem too since this work would be required for multi-tenancy in Airflow. This AIP should cover the transition concerns.


    • Make each DagRun execute only a single DAG version
    • Ssl Encrypted Messages for one time click, can run.exe.
    • From the tree view in UI. I believe it should always show a single DagRun per execution_date. Does the dag_run table still keep single instance per execution_date?
    • When clear an existing DagRun to rerun it. Does current persist information in DB able to kick off a dagrun based on older DAG version?
  6. In hindsight, this AIP was defined too broad and stalled in discussions. Julian de Ruiter and me created AIP-18 and AIP-19 to follow up with a better defined scope.