Status

StateCompleted
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

Discussion ThreadAirflow DAG Serialisation
JIRA

AIRFLOW-5088 - Getting issue details... STATUS

Github PRs

DAG serialization using JSON: https://github.com/apache/airflow/pull/5701 (merged)

DAG persistence in DB: https://github.com/apache/airflow/pull/5743 (merged)

In Release1.10.7

Motivation

This doc proposes an approach that serializes/persists DAGs using JSON to decouple Airflow webserver from DAG parsing and accelerating scheduler (secondary goal).

Compared to the current DAG persistence/webserver proposals (API 12, 18, 19), this proposal is more lightweight: it minimizes the change to current Airflow code. All new features are pluggable. The core part is serializing/de-serializing current Airflow DAGs/Operators in JSON. It can be used as an intermediate move towards the long-term optimization.

A part of this proposal (stringifying DAGs) is already tested and launched in Google Cloud Composer.

Considerations

Major consideration is the scope overlap and compatibility with other ongoing AIPs on DAG persistence/webserver optimzations.


1. Overview

This proposal adds two features:

  • When [core] store_serialized_dags is enabled, the DAG processing processes of scheduler JSONify and write DAGs into Airflow DB (a new column in DAG table). The webserver directly reads DAGs from DB:
    • It solves webserver scalability issues (DAG loading timeout, OOM) of Airflow webserver (AIRFLOW-4924);
    • Loading DAG in webserver can be done in the gunicorn worker process, or a separate background process;
    • Users can set [core] min_serialized_dag_update_interval to reduce DB write traffic.
    • To maintain backwards-compatibility the default value of [core] store_serialized_dags would be False for which case Webserver would work as usual - parse files from local filesystem.
  • (Secondary goal) When [scheduler] dagcached_accelerator is enabled,  the scheduler launches an additional process to feed the DAG scheduling loop with cached DAGs, which is much faster than DAG processing processes:
    • It speeds out scheduling actions;
    • Users do not need a large [scheduler] max_threads when they have hundreds+ of DAGs;
    • Airflow scheduler may schedule stale DAGs.

DAG processing processes of scheduler JSONify DAGs and persist them into DB. Webserver process  reads DAGs from DB. If dagcached_accelerator is enabled,  the scheduler launches an additional process to feed the DAG scheduling loop with cached DAGs, which is faster than loading DAG from files. 

Users set [scheduler] dagcached_accelerator_interval to control how frequently DAGs are fed to the scheduler loop. The rate DAGs being refreshed from files is controlled by DAG parsing processes, i.e.[scheduler] max_threads and [scheduler] min_file_process_interval.

2. DAG Serialization

2.1 Current Options in Airflow (not work)

DagPickle

Airflow uses Pickle as the default approach for serializing. Airflow has table DagPickle to store pickled DAGs in DB. However, this method does not work for some DAGs or operators, because they may contain non-picklable fields.

For example, functions not defined globally are known to be non-picklable. In addition, customer defined modules can not be unpickled when reading them from DB, because these modules have not been imported.

SimpleDag

Quoting from Airflow code, it is “a simplified representation of a DAG that contains all attributes required for instantiating and scheduling its associated tasks.”. It does not contain enough information required by the webserver.

2.2. JSON vs Pickle

JSON is preferred over Pickle in serializing DAGs:

2.3 How to Serialize?

Here is the proposed serialization approach:

---------------------------------------------------------------------------------------------------------------

https://github.com/apache/airflow/blob/916011df6969f44a278fea631aa65aa741e423e4/airflow/dag/serialization/serialization.py#L170-L205

---------------------------------------------------------------------------------------------------------------

At de-serialization time, it reconstructs DAGs and Operators. The reconstructed DAGs and Operators have the same class as the original ones, but we call them 'stringified' version, because a few fields are made into strings.

These DAGs/Operators contain enough information for webserver and scheduler, but can not be used by workers (to replace DagPickle).

Error Prone Use Cases

Here are a few potential incompatibilities (please add more) and the proposed solutions:

  • Functions (e.g., callbacks) become strings:
    • OK for now: displaying functions' source code in webserver works well
  • Non-Airflow Operators:
    • Operators defined in airflow.operators and airflow.contrib.operators can be created because these classes are imported in Airflow. However, user defined operators are not available;
    • The propose solution is to create a BaseOperator, then replace its class name by the '__class__.__name__' field (e.g., FooOperator). FooOperator can then be displayed correctly in webserver;
  • Functions in templates:
    • Airflow webserver may render template using functions (e.g., user-defined-macros). For this Airflow will need to parse the DAG Files instead of DB.
    • The proposed solution is to render the templates before serialization. It can not provide the correct context in rendering, e.g., execution date. The solution is TBD.

* many thanks to the review comments in https://github.com/apache/airflow/pull/5594 that lead to the solutions

Below are the cases where the Webserver will still need access to the python DAG files on the local system.

  • Rendered Template tab will still have to parse Python file as it needs all the details like the execution date and even the data passed by the upstream task using Xcom.
  • Code View will have to open the DAG File & Show it using Pygments. It does not need to Parse the Python file.

3. Life Cycle of a Cached DAG

3.1 DB Schema Change

Storing serialized DAGs requires a new table serialized_dags (a few more columns for metadata needed as well). It also needs to make fileloc (full DAG file path) a DB index, to:

  • Be able to read all DAGs related to one DAG file from DB;
  • Mark DAGs as deleted if the DAG file is deleted:
    • It is necessary to mark deleted DAGs, otherwise deleted DAGs always show up in webserver (it reads from DB);
    • If we go with the option to use a new table, then the rows are deleted for deleted DAGs (instead of marking them DELETED);
    • Detecting which files are deleted is done by the DAG parsing manager process of the scheduler.

Column fileloc currently has a length of 2000 in Airflow code. It is over the limit of indexing. We should either decrease its size or use a hash fileloc_hash for indexing.

Sample table created based on https://github.com/apache/airflow/pull/5743:

3.2 Creating and refreshing DAGs

The serialized dag is inserted or updated when the scheduler parses a DAG from a DAG file.

3.3 Loading DAGs

Airflow webserver calls collect_dags of DagBag to read all JSON DAGs in DB. 

To minimize the code change to Airflow scheduler, also considering the case that there is a huge number of DAGs (thousands), we do not load all JSON DAGs in the scheduler accelerator in one read. Airflow loads DAGs as a Python modules from a file:

  • m = imp.load_source(mod_name, filepath)

The proposal is to implement a method load_dag_module_from_db to replace imp.load_source (when store_serialized_dags is enabled). It loads all DAGs in a file each time: the same behavior as vallina Airflow, no other code change is needed. 

3.4 Deleting DAGs

JSON DAGs should be marked as DELETED (or removed) to exclude them from webserver (we assume webserver has no access to DAG folder). How it works has been explained in 3.1.

3.5 (optional) Caching DAGs in memory for scheduling

Scheduler accelerator can add an additional cache layer of DAGs in memory. It is periodically refreshed from DB DAGs. It reduces DB read traffic, but makes the implementation more complex. Its metric should be evaluated based on measurement.

4. Long-Term Roadmap with Other Proposals

This AIP aims at implementing DAG persistence with minimal change on Airflow. It is short-term/fast/easy to launch and test. It can also be backported to previous versions.

If we can agree upon the basic JSON serialization approach, other longer term proposals can be gradually applied on it. A timeline with other proposals is compiled here:

Terms

  • serialized DAG: a new serializable DAG class used by webserver/scheduler

Proposed timeline

  1. (this) JSON Serialization of DAGs
    1. will be out with https://github.com/apache/airflow/pull/5743

  2. (this, optional) Asynchronous  DAG loading in webserver
    1.  webserver process uses a background process to collect DAGs, solve scalability issue before DAG persistence in DB being out
    2. webserver process itself does not need to restart every 30s to collect DAGs

  3. (this) DAG persistence in DB for webserver
    1. minimal Airflow code change
    2. an optional feature enabled via configuration
    3. rolled out with Airflow 1.10.5
    4. will be out with https://github.com/apache/airflow/pull/5743
  4. (this, optional) Using DAG cached in DB for scheduling

  5. Defining serialized DAG for webserver
    1. this proposal keeps all fields of DAG/Operator, however, some fields are not used by webserver or scheduler
    2. trimming these fields are easy, just providing a list of fields to include or exclude (Sec 2.3): _serialize_object(x, visited_dags) =>_serialize_object(x, visited_dags, include=['foo'], exclude=['bar'])
    3. we should carefully check all webserver/scheduler code to make sure trimmed fields are not used, e.g., task.owner is used in webserver
    4. will be out with https://github.com/apache/airflow/pull/5743

  6. (long-term) Defining serialized DAG for scheduler
    1. Once we have 'stringified DAG' or 'serialized DAG', SimpleDAG/SimpleTaskInstance used by scheduler are not needed
    2. adding more fields to stringified DAGs to be compatible with scheduler

  7. (long-term) Directly reading DAGs from DB in webserver
    1. let webserver process fetch data from DB, instead of making a DAG bag and refresh it
    2. it solves the webserver inconsistency issue

  8. (long-term) Event-driven DAG parsing
    1. Instead of polling DAG files for updating/deleting DAGs, event based approaches, e.g., inotify (https://pypi.org/project/inotify_simple/) can be used

  9. Remove pickling related code
    1. We no longer require pickling and it is not safe so let us remove it for 2.0


10 Comments

  1. > The webserver directly reads DAGs from DB

    If under this proposal the webserver is reading from the DB, then the scheduler should be the thing responsible for writing that row, not any part of the webserver process.

    1. Yes. We plan to let SchedulerJob write DAGs into DB. Webserver does not need the access to DAG files.

  2. > Non-Airflow Operators

    Does the webserver need to execute any code from these custom operators? (No, certainly not most of it) So do we actually need to even create instances of the operator classes? (Possibly, it may make the code change simpler if we do).

    For this to work too we'd have to also record things like ui_colour  and template_fields  but that is doable.

    I like the general solution you've got here for this problem though.

    1. Yes. we actually do not need the instances of operator. It proposes to create one just to be aligned with what Airflow UI takes today. 

      We just need the operator's class name to be displayed (ui_color can be inferred from it?)

    2. We chose this idea that keep as much as possible the original DAG/task objects, because we would like to keep webserver untouched. We are not sure which field may be accessed by webserver.

      A dedicated serilaized DAG class with only necessary fields would be definitely a better long-term plan to go.


  3. One thing that I'd like you to look at please: Currently in airflow.utils.dag_processing we have "SimpleDag and SimpleTaskInstance" classes which is what the DAG parsing processes return to the scheduler - I this change should either look at augmenting those classes (to encapsulate this "stringification" logic) or removing these classes entirely.

    1. Yes. I agree that we should probably get rid of SimpleDag and SimpleTaskInstance if we have serializable DAGs. Otherwise there are many forms of DAG/task that make the code redundant.

      I will add this idea to this proposal doc.

      1. Thank you Zhou. I would be a great proponent of removing these classes as well, mostly because that would allow us to greatly simplify the class hierarchy.


  4. There are 4 different parties that are all organizing their own efforts of solving serialization. We probably need someone to lead this, aggregate notes, put together an AIP with tradeoffs of different considerations and how various issues will be addressed.

    Attached are my own notes on the problems that need to be solved and some strawman solutions to them.

    I think it's quite important to be diligent here, and pursue the long-term solution rather than the short-term one in order to get the serialization format correct for the most part as migrating later will be a lot of work.


    1. Kaxil and I will lead this and try and align the streams - I suspect there will be a number of AIPs that get us gradually where we want to go, rather than one giant one that does it all.