You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Status

StateDraft
Discussion Thread
JIRA


Motivation

This doc proposes an approach that serializes/persists DAGs using JSON to decouple Airflow webserver from DAG parsing and accelerating scheduler (secondary goal).

Compared to the current DAG persistence/webserver proposals (API 12, 18, 19), this proposal is more lightweight: it minimizes the change to current Airflow code. All new features are pluggable. The core part is serializing/de-serializing current Airflow DAGs/Operators in JSON. It can be used as an intermediate move towards the long-term optimization.

A part of this proposal (stringifying DAGs) is already tested and launched in Google Cloud Composer.

Considerations

Major consideration is the scope overlap and compatibility with other ongoing AIPs on DAG persistence/webserver optimzations.


1. Overview

This proposal adds two features:

  • When [core] dag_cached is enabled, the DAG processing processes of scheduler JSONify and write DAGs into Airflow DB (a new column in DAG table). The webserver directly reads DAGs from DB:
    • It solves webserver scalability issues (DAG loading timeout, OOM) of Airflow webserver (AIRFLOW-4924);
    • Loading DAG in webserver can be done in the gunicorn worker process, or a separate background process (asynchronous DAG loading: https://github.com/apache/airflow/pull/5594).
  • (Secondary goal) When [scheduler] dagcached_accelerator is enabled,  the scheduler launches an additional process to feed the DAG scheduling loop with cached DAGs, which is much faster than DAG processing processes:
    • It speeds out scheduling actions;
    • Users do not need a large [scheduler] max_threads when they have hundreds+ of DAGs.

DAG processing processes of scheduler JSONify DAGs and persist them into DB. Webserver process  reads DAGs from DB. If dagcached_accelerator is enabled,  the scheduler launches an additional process to feed the DAG scheduling loop with cached DAGs, which is faster than loading DAG from files. 

Users set [scheduler] dagcached_accelerator_interval to control how frequently DAGs are fed to the scheduler loop. The rate DAGs being refreshed from files is controlled by DAG parsing processes, i.e., [scheduler] max_threads and [scheduler] min_file_process_interval.

2. DAG Serialization

2.1 Current Options in Airflow (not work)

DagPickle

Airflow uses Pickle as the default approach for serializing. Airflow has table DagPickle to store pickled DAGs in DB. However, this method does not work for some DAGs or operators, because they may contain non-picklable fields.

For example, functions not defined globally are known to be non-picklable. In addition, customer defined modules can not be unpickled when reading them from DB, because these modules have not been imported.

SimpleDag

Quoting from Airflow code, it is “a simplified representation of a DAG that contains all attributes required for instantiating and scheduling its associated tasks.”. It does not contain enough information required by the webserver.

2.2. JSON vs Pickle

JSON is preferred over Pickle in serializing DAGs:

2.3 How to Serialize?

Here is the proposed serialization approach:

---------------------------------------------------------------------------------------------------------------

def serialize(x):  # it serializes anything: one or a list/dict of DAGs/Operators

    return json.dumps(_serialize(x, {}))


def _serialize_object(x, visited_dags):  # helper fn to serialize an object as a JSON dict

    return {k: _serialize(v, visited_dags) for k, v in vars(x).items()}


def _serialize(x, visited_dags):  # helper fn to run DFS recursion

    try:

        if _is_primitive(x):  # e.g., int, bool, float, byte, str

            return x

        elif isinstance(x, dict):

            return {k: _serialize(v, visited_dags) for k, v in x.items()}

       elif isinstance(x, list):

            return [_serialize(v, visited_dags) for v in x]

        elif isinstance(x, models.DAG):

            if x.dag_id in visited_dags:

                return visited_dags[x.dag_id]

            else:

                new_x = {'_type': 'DAG'# _type is the type hint for de-serialization

                visited_dags[x.dag_id] = new_x  # placeholder for deeper recursions

                new_x['_var'] = _serialize_object(x, visited_dags)  # _vars records an object as JSON dict for de-serialization

                return new_x

        elif isinstance(x, models.BaseOperator):

            return {'_type': 'Operator',

                        '_class': x.__class__.__module__ + '.' + x.__class__.__name__,   # it creates a x.__class__ object in de-serialization time

                        '_var': _serialize_object(x, visited_dags)}

        elif isinstance(x, datetime.datetime):

            return {'_type': datetime, '_var': x.isoformat()} 

        elif isinstance(x, datetime.timedelta):

            return {'_type': timedelta, '_var': x.total_seconds()} 

        elif callable(x):  # all functions are replaced by strings of their source code

            return get_python_source(x)

        elif isinstance(x, set):

            return {'_type': 'set', '_var': [_serialize(v, visited_dags) for v in x]}

        elif isinstance(x, tuple):

            return {'_type': 'tuple', '_var': [_serialize(v, visited_dags) for v in x]}

        else:

            return str(x)  # anything else falls back to whatever __str__ provides

    except Exception:

        return 'failed_to_jsonify'

---------------------------------------------------------------------------------------------------------------

At de-serialization time, it reconstructs DAGs and Operators. The reconstructed DAGs and Operators have the same class as the original ones, but we call them 'stringified' version, because a few fields are made into strings.

These DAGs/Operators contain enough information for webserver and scheduler, not can not be used by workers (to replace DagPickle).

Error Prone Use Cases

Here are a few potential incompatibilities (please add more) and the proposed solutions:

  • Functions (e.g., callbacks, user-defined-filters, user-defined-macros) become strings:
    • OK for now: displaying functions' source code in webserver works well
  • Non-Airflow Operators:
    • Operators defined in airflow.operators and airflow.contrib.operators can be created because these classes are imported in Airflow. However, user defined operators are not available;
    • The propose solution is to create a BaseOperator, then replace its class name by the '_class' field (e.g., FooOperator). FooOperator can then be displayed correctly in webserver;
  • Functions in templates:
    • Airflow webserver may render template using functions (e.g., user-defined-macros), however, it does not work because functions are 'strings' now. The proposed solution is to render the templates before serialization.

* many thanks to the review comments in https://github.com/apache/airflow/pull/5594 that lead to the solutions

3. Life Cycle of a Cached DAG

3.1 DB Schema Change

Adding a dag_json column in the DAG table (or create a new table JsonDag). It also needs to make fileloc a DB index, to:

  • Be able to read all DAGs related to one DAG file from DB;
  • Mark DAGs as deleted if the DAG file is deleted:
    • It is necessary to mark deleted DAGs, otherwise deleted DAGs always show up in webserver (it reads from DB);
    • If we go with the option to use a new table, then the rows are deleted for deleted DAGs (instead of marking them);
    • Detecting which files are deleted is done by DAG parsing manager process of the scheduler.

Column fileloc currently has a length of 2000 in Airflow code. It is over the limit of indexing. We should either decrease its size or use a hash fileloc_hash for indexing.

3.2 Creating and refreshing DAGs

dag_json is inserted or updated when the scheduler parses a DAG from a DAG file.

3.3 Loading DAGs

Airflow webserver calls collect_dags of DagBag to read all JSON DAGs in DB.

To minimize the code change to Airflow scheduler, also considering the case that there is a huge number of DAGs (thousands), we do not load all JSON DAGs in scheduler accelerator. Airflow loads DAGs as a Python modules from a file:

  • m = imp.load_source(mod_name, filepath)

The proposal is to implement a method load_dag_module_from_db to replace imp.load_source (when dag_cached is enabled). It loads all DAGs in a file each time.

3.4 Deleting DAGs

JSON DAGs should be marked as DELETED (or removed) to exclude them from webserver (we assume webserver has no access to DAG folder). How it works has been explained in 3.1.


  • No labels