This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Skip to end of metadata
Go to start of metadata

There seems to be a bit of mysticism around the scheduler in Airflow, that makes people reluctant to review PRs in this area or contribute. That should not be the case. The scheduler is the core of Airflow it needs to be the most understood and readable bit of code. So here is a small write up to get you started.

A DAG consists of Tasks and obviously you need those tasks to run. To do that we schedule the dag in "schedule_dag" (jobs.py) to make it into a DagRun. This means that a DagRun is an instantiation of a DAG in time. Then we need to process the tasks (process_dag in jobs.py) of the current active dagruns. We simply loop over the available tasks and see if they are runnable (if ti.is_runnable()) . If a task is runnable it will get sent to the executor.

That's all there is to the basic concept. Really? Yes:

def _do_dags(self, dagbag, dags, tis_out):
    """
    Iterates over the dags and schedules and processes them
    """
    for dag in dags:
        self.logger.debug("Scheduling {}".format(dag.dag_id))
        dag = dagbag.get_dag(dag.dag_id)
        if not dag:
            continue
        try:
            self.schedule_dag(dag)
            self.process_dag(dag, tis_out)
            self.manage_slas(dag)
        except Exception as e:
            self.logger.exception(e)

Now it is a bit more messy at the moment. That is due to the fact that DagRuns are not yet first class citizens. In the past Airflow only had a notion of DagRun: it mentioned them in the code but they did not really exist, instead DAGs were instantiated into "Tasks in Time": TaskInstances. This creates architectural issues but also real life ones. Some of the issues in the current scheduler with "depend_on_past" stem from this, because a Task is not really able to answer "which task instance is the first in time?". It is also the reason why scheduler loops start to increase over time if DAGs get more complex (many tasks). The good news is work is underway to improve this (see the roadmap).

States and race conditions in the scheduler

States are used in Airflow to understand what the different tasks and dagruns are doing. We currently know the following states.

NONE = None
QUEUED = "queued"
SCHEDULED = "scheduled"
REMOVED = "removed"
RUNNING = "running"
SUCCESS = "success"
SHUTDOWN = "shutdown"  # External request to shut down
FAILED = "failed"
UP_FOR_RETRY = "up_for_retry"
UPSTREAM_FAILED = "upstream_failed"
SKIPPED = "skipped"

The scheduler processes tasks that have a state of NONE, SCHEDULED, QUEUED, and UP_FOR_RETRY. NONE is a newly created TaskInstance, QUEUED is a task that is waiting for a slot in an executor and UP_FOR_RETRY means a task that failed before but needs to be retried. If the task has a state of NONE it will be set to SCHEDULED if the scheduler determines that it needs to run. Tasks in the SCHEDULED state are sent to the executor, at which point it is put into the QUEUED state until it actually runs.

Unfortunately a race condition remains for UP_FOR_RETRY tasks as another scheduler can pick those up. To eliminate this the check for UP_FOR_RETRY needs to migrate from the TI to the scheduler. However, was it not for that fact that we have backfills... (see below)

Better state handling

Work in progress

In order to remove race conditions and to be able to kill any of airflows components and still be able to continue where we left off, better state handling needs to be done. This means that at handover to a different process only certain states can get set by each process. For example the scheduler should only have an outgoing state of "SCHEDULED". The executor should set a state of "LAUNCHED". A task instance can set UP_FOR_RETRY, RUNNING, UPSTREAM_FAILED, SUCCEEDED but only handles LAUNCHED.

Queues for workers should be handled in the worker, for pools most likely in the scheduler

Multiprocess scheduling

Overview

In previous versions of the scheduler, user-supplied DAG definitions were parsed and loaded in the same process as the scheduler. Unfortunately, this made it possible for bad user code to adversely affect the scheduler process. For example, if a user DAG definition includes a `system.exit(-1)`, parsing the DAG definition would cause the scheduler process to exit.

To help mitigate such cases, the scheduler processes DAGs in a child processes. This gives it better isolation and faster performance. The logic for scheduling is as follows:

  1. Enumerate the all the files in the DAG directory.
  2. Start a configurable number of processes and for each one, assign a DAG file to process.
  3. In each child process, parse the DAG file, create the necessary DagRuns given the state of the DAG's task instances, and for all the task instances that should run, create a TaskInstance (with the `SCHEDULED` state) in the ORM.
  4. Back in the main scheduler process, query the ORM for task instances in the `SCHEDULED` state. If any are found, send them to the executor and set the task instance state to `QUEUED`.
  5. If any of the child processes have finished, create another process to work on the next file in the series, provided that the number of running processes is less than the configured limit. 
  6. Once a process has been launched for all of the files in the DAG directory, the cycle is repeated. If the process to parse a particular DAG file is still running when the file's turn comes up in the next cycle, a new process is not launched and a process for the next file in the series is launched instead. This way, a DAG file that takes a long time to parse does not necessarily block the processing of other DAGs.

The reason that the tasks are created in the `SCHEDULED` state, but then are set to the `QUEUED` state once it is sent to the executor, is to ensure that a task instance isn't repeatedly send to the executor if the executor is slow and a DAG definition file is processed multiple times before the executor has a chance to run the task. When the child process examines a DAG for potential tasks to put into the `SCHEDULED` state, it skips those task instances in the `QUEUED` state.

Configuration

  • With the above processing scheme, the `num_runs` argument doesn't make as much sense. Instead, the `run_duration` argument is useful for controlling how long the scheduler runs in the loop.
  • Since the scheduler can run indefinitely, it's necessary to periodically refresh the list of files in the DAG definition directory. The refresh interval is controlled with the `dag_dir_list_interval` configuration parameter.
  • In cases where there are only a small number of DAG definition files, the loop could potentially process the DAG definition files many times a minute. To control the rate of DAG file processing, the `min_file_process_interval` can be set to a higher value. This parameter ensures that a DAG definition file is not processed more often than once every `min_file_process_interval` seconds.

Code Layout

The DagFileProcessor launches a child process to parse a DAG definition file. For the DAGs found in the file, it examines DAG runs and the state of the task instances. If there are task instances that should run, it creates them in the `SCHEDULED` state.

The DagFileProcessorManager coordinates how DagFileProcessors are launched. It keeps track of which files need to be processed and ensures that once a DagFileProcessor is done, the next file in the series is processed accordingly. It also controls the number of simultaneous DagFileProcessors and ensures that the number if simultaneous instances do not exceed the configured limit.

The SchedulerJob coordinates parsing of the DAG definition files using the DagFileProcessorManager and sends task instances in the `SCHEDULED` state to the executor.

Pools

All tasks start with State.NONE. Bolke de Bruin's latest PR (AIRFLOW-128) introduces State.SCHEDULED when the TI has been sent to the executor by the scheduler but not yet run. When the executor launches a task in a separate process a TI, it eventually calls TI.run() which does one of two things:

  1. If the TI can be run, sets State.RUNNING and calls the execute() method.
  2. If the TI has a pool, sets State.QUEUED and returns

The scheduler has a prioritize_queued method which loads up all the queued tasks and tries to run them if there are slots available in their respective pools. That second run is the one that actually moves pooled tasks to State.RUNNING.

Backfills

Backfills are a bit of an awkward duck in the pond. They do not know about DagRuns, won't create them, and don't keep to the schedule so they can break "depend_on_past". They execute outside the scheduler and can therefore oversubscribe workers (using more resources than assigned). Backfills just create TaskInstances and start running them. In order to fix the scheduler and the race condition, first the scheduler and the backfills need to become aware of each other. This will make depend_on_past work and keep things in a consistent state. Avoiding oversubscribing the backfills should be managed by the scheduler.

 

 

 

  • No labels