Tracking:

https://github.com/airbnb/airflow/compare/master...jlowin:dagrun-refactor 


Introduction

The primary issue with DAG execution is that there are two completely separate execution avenues in Airflow: SchedulerJob and BackfillJob. DagRuns were recently added to track DAG execution state but they are used inconsistently, adding to the confusion. To put it briefly, there are three competing issues:

  1. Scheduler uses DagRun and runs any tasks in RUNNING DagRuns, but also runs any queued tasks regardless of whether they are in an active DagRun or not. Backfill does not use DagRuns and loops over the tasks in its DAG, brute forcing them into the executor until they finish or fail. Therefore they interfere with each other. Scheduler takes a lock on the DAG it's running, but Backfill ignores locks. It might be ok if we could guarantee that Scheduler and Backfill weren't run simultaneously (even though we can't!), but operators like SubDagOperator use Backfill internally, meaning they must play nicely together.

  2. DagRuns were originally intended to track the state of a DAG's execution, just as TaskInstances do for tasks. However, the implementation is not complete. DagRuns must be unique for each combination of (DAG, execution_date), but nonetheless allow arbitrary attributes like run_id and external_trigger. If there can only be one DagRun per execution date, and that DagRun is marked as external_trigger, then the scheduler won't be able to create a non-external_trigger DagRun for the same date! (I'm actually surprised this hasn't bitten anyone yet... probably because DagRuns are not heavily used outside the Scheduler.)

What's needed is to formalize the concept of a DagRun and make it the canonical way to execute a DAG and track the state of that DAG's execution, just as we do for TaskInstances.

This refactor does just that:

Description of New Workflow

DagRuns represent the state of a DAG at a certain point in time (perhaps they should be called DagInstances?). To run a DAG – or to manage the execution of a DAG – a DagRun must first be created. This can be done manually (simply by creating a DagRun object) or automatically, using methods like dag.schedule_dag(). Therefore, both scheduling new runs OR introducing ad-hoc runs can be done by any process at any time, simply by creating the appropriate object.

Just creating a DagRun is not enough to actually run the DAG (just as creating a TaskInstance is not the same as actually running a task). We need a Job for that. The DagRunJob is fairly simple in structure. It maintains a set of DagRuns that it is tasked with executing, and loops over that set until all the DagRuns either succeed or fail. New DagRuns can be passed to the job explicitly via DagRunJob.submit_dagruns() or by defining its DagRunJob.collect_dagruns() method, which is called during each loop. When the DagRunJob is executing a specific DagRun, it locks it. Other DagRunJobs will not try to execute locked DagRuns. This way, many DagRunJobs can run simultaneously in either a local or distributed setting, and can even be pointed at the same DagRuns, without worrying about collisions or interference.

The basic DagRunJob loop works like this:

  1. refresh dags
  2. collect new dagruns
  3. process dagruns (including updating dagrun states for success/failure)
  4. call executor/own heartbeat

By tweaking the DagRunJob, we can easily recreate the behavior of the current SchedulerJob and BackfillJob. The Scheduler simply runs forever and picks up ALL active DagRuns in collect_dagruns(); Backfill generates DagRuns corresponding to the requested start/end dates and submits them to itself prior to initiating its loop.


Changes

Issues

Classes

Notable additions and changes.

DagRun

Description

DagRun has been updated a lot. The new table looks like this:


__tablename__ = "dag_run"

dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, default=func.now(), primary_key=True)
start_date = Column(DateTime, default=func.now())
end_date = Column(DateTime)
state = Column(String(50))
conf = Column(PickleType)
lock_id = Column(Integer)

__table_args__ = (
    Index('dr_dag_date', dag_id, execution_date, unique=True),
)
Methods

DAG

Methods

DagRunJob

Description

DagRunJob is a Job that has methods for executing and managing DagRuns. It has the following _execute structure:

def _execute(self):

    self.executor.start()

    i = 0
    while self.dagruns:
        self.refresh_dags(full_refresh=(i % self.refresh_dags_every == 0))
        self.collect_dagruns()
        self.process_dagruns()
        self.executor.heartbeat()
        self.heartbeat()
        i += 1

    self.executor.end()
Methods

SchedulerJob

Description

SchedulerJob is a subclass of DagRunJob with the following _execute. Note that the only real difference from DagRunJob is the loop criteria and the call to schedule_dags(). The Scheduler runs in a loop, just as the current Scheduler does, exiting when num_runs is hit or possibly never.

self.executor.start()

i = 0
while not self.num_runs or self.num_runs > i:
    try:
        loop_start_dttm = datetime.now()
        self.logger.info('Starting scheduler loop...')
        try:
            self.refresh_dags(
                full_refresh=(i % self.refresh_dags_every == 0))
            self.schedule_dagruns()
            self.collect_dagruns()
            self.process_dagruns()
            self.manage_slas()

        except Exception as e:
            self.logger.exception(e)

        self.logger.info('Done scheduling, calling heartbeat.')

        self.executor.heartbeat()
        self.heartbeat()
    except Exception as e:
        self.logger.exception(e)

    i += 1

self.executor.end()
Methods

BackfillJob

Description

BackfillJob is a subclass of DagRunJob with this _execute structure:

self.heartbeat()
self.executor.start()

runs = [
    DagRun(dag_id=self.dag.dag_id, execution_date=dttm)
    for dttm in self.dag.date_range(
        start_date=self.bf_start_date, end_date=self.bf_end_date)]

self.submit_dagruns(runs)
self.target_runs = runs

while self.dagruns:
    self.collect_dagruns()
    self.process_dagruns()
    self.executor.heartbeat()
    self.heartbeat()

    progress = self.get_progress()
    self.logger.info(' | '.join([
        '[backfill progress: {pct_complete:.1%}]',
        'total dagruns: {total_dagruns}',
        'total tasks: {total_tasks}',
        'finished: {finished}',
        'succeeded: {succeeded}',
        'skipped: {skipped}',
        'failed: {failed}',
        ]).format(**progress))


self.executor.end()


BackfillJob prints progress, like this: 

[2016-04-28 18:26:00,011] {jobs.py:912} INFO - [backfill progress: 0.0%] | total dagruns: 1 | total tasks: 2 | finished: 0 | succeeded: 0 | skipped: 0 | failed: 0

Methods

The BackfillJob adds no new methods; its only difference from DagRunJob is that it generates and submits a list of DagRuns to itself.