Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

Currently a DAG can be scheduled by either providing a cron expression (or one of the @ presets) or by passing a timedelta, and this works well for DAGs that need to operate on a regular schedule but there are many other use cases that are currently hard to express "natively" in Airflow, or require some confusing work arounds.

In addition to having more powerful scheduling options, the execution_date concept is one of the most confusing things for newcomers to Airflow to learn. Part of this is simply the name -- the immediate thing that springs to mind is "the time when the task is executing", i.e. right now, but that is not the meaning this holds (it is always at least one interval older than that, if not more when a task is being re-run)

In this AIP we propose a new name for execution_date, introduce a new concept called "data interval", and present an interface for more powerful pluggable scheduling behaviours.

Desirable Schedule patterns

A few examples of the kind of workflows/schedule that I would like Airflow to be able to handle (and if possible, the current way of achieving this)

  1. Daily on weekdays

    If you tried the "naive" expression of "0 0 * * MON-FRI" this would 'work', but not quite as users might expect. This is because the data for Friday is not processed until the start of the next interval, which is Monday at midnight

    Execution

    When it's run

    Monday

    Tuesday 00:00:01

    Tuesday

    Wednesday 00:00:01

    Wednesday

    Thursday 00:00:01

    Thursday

    Friday 00:00:01

    Friday

    Monday 00:00:01

    The current work around for this is to have a task at the start of the DAG that skips if run for a Saturday or Sunday, but we should be able to do better.


  2. 15th or last-working day before

    This is a common day for pay-roll in the US (where companies frequently pay twice-a-month)

    Right now this is tricky to express in Airflow -- the best effort would be to run on the 15 for 4 days before and, and then skip if there is a later working day.

  3. Trading days only

    (i.e. dates when Financial markets are open)

    This is an extension of the above Mon-Fri case, but with the additional "filter" that some weekdays are excluded. The exact days vary from country to country and year to year.

    Again, to achieve this it would involve a check/skip as the first step of the DAG

  4. Run at the exact time specified

    The current behaviour of running the DAG run after the period is over (i.e. daily after that day is over) makes sense for a "traditional" data processing job, where you can't process a day's worth of data until the day is over.

    However there are plenty of valid workflows where having this "execute after period is over" behaviour is confusing.


    One such example where this workflow would be useful is where you are hitting an API to collect snapshot data, where having the data "stamped" with the time of access is the most intuitive.


    There is no real work around for this -- other than perhaps using
    next_execution_date instead of execution_date in templates.

    This has been asked for many times, most recently https://github.com/apache/airflow/pull/5787 and https://github.com/apache/airflow/issues/8237

Many of these use cases we will not directly tackle in this AIP, instead we will provide a means to make the scheduling pluggable enough to support them, including allowing users to write their own behaviours. These features could be added by providers, meaning they can have separate release schedules from Airflow core.

Proposal

Deprecate execution_date

The name execution_date is just too confusing, and at this point the only practical option is to rename it to something else.

We propose renaming execution_date to data_interval_start, and next_execution_date to data_interval_end.

Accessing the parameters in the context dict will raise a DeprecationWarning (which should then cover access via Python/custom operator and Jinja templating.

Other names considered and rejected:

  • transaction_start/end: Confusing with DB transactions
  • period_start/end: Mathematically wrong (the period end would be analogous to the end_date of the dag, i.e. the point at which there are no more intervals)
  • batch_start/end: "But what about streaming?"
  • data_window: not as clear

Replace execution_date with run_id on TaskInstance table

With the deprecation of execution_date in the context, we should also remove it from TaskInstance too.

Currently the link between TI and DagRun is implicit via the (dag_id, execution_date) columns. The OpenAPI specification for DagRuns already makes run_id "key" in the URL as we foresaw this type of work happening.

Now that DagRun is firmly entrenched in the Airflow, (and has been for a long time) we can make this relationship explicit and have a TI linked to the DagRun via run_id, and remove execution_date on TI (with deprecation to look up this property via DagRun so it is not a breaking change.)

Making this link explicit would mean that deleting a DagRun would also delete the TaskInstances for it.

Rename DagRun.execution_date to schedule_date

We need a single date value that is the "canonical" date for the DagRun date, that can be used for sorting DagRuns for the UI view, so the current "execution_date" column on DagRun needs to be kept, but we should rename - and we've chosen "schedule_date".

We will also clearly document the meaning of this column -- it is the time when the DagRun is scheduled to be run (or more technically the earliest time it is runnable – if the scheduler is backed up it may not start right away.)

Summary of Renames


Old Name

New Name

execution_date

data_interval_start

next_execution_date

data_interval_end

tomorrow_ds

<no replacement, deprecate>

tomorrow_ds_nodash

<no replacement, deprecate>

yesterday_ds

<no replacement, deprecate>

yesterday_ds_nodash

<no replacement, deprecate>

prev_execution_date

prev_schedule_date

next_execution_date

next_schedule_date


ds, ds_nodash, ts, and ts_nodash, ts_nodash_with_tz will be kept, but the value will be changed to be based upon data_interval_start (this means the current value would be unchanged.)

Introduce the concept of a "Timetable" class

To support all the different use cases, and to allow support for cases we haven't yet thought of we propose adding a new concept to Airflow of the Timetable class.

schedule_interval parameter to DAG will not be removed, or even deprecated, but transparently upgraded. A single parameter makes sense (and is clean) for many current use-cases, so we don't want to remove that.

In this AIP we won't include many Timetable classes, just enough to replicate the current functionality,

AbstractTimetable interface

The scheduler will use the dag.timetable property to make all scheduling decisions (deprecating methods such as following_schedule) to the timetable instance.

The current proposed interface is this:

Code Block
languagepy
themeMidnight
class DagRunInfo(NamedTuple):
    run_date: Optional[DateTime]
    """
    The earliest time that this DagRun could be created and tasks scheduled, or None for do not schedule.

    Returning None is not a "terminal" state, this timetable can return a date next time it is asked
    """

    data_interval: Optional[Tuple[DateTime, DateTime]]
    """
    The data interval this DagRun should operate over, as ``[start, end)``
    (meaning start is inclusive, but end is exclusive)
    """


class AbstractTimetable(ABC):
    @abstractmethod
    def next_dagrun_info(
        date_last_automated_dagrun: Optional[pendulum.DateTime],

        session: Session,
    ) -> Optional[DagRunInfo]:
        """
        Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the
        execution date, and the earliest it could be scheduled

        :param date_last_automated_dagrun: The max(execution_date) of existing
            "automated" DagRuns for this dag (scheduled or backfill, but not
            manual)
        """

Create TimeDeltaTimetable, DataTimetable, and CronTimetable classes

TimeDelta and DataTimetable classes represent the current possible gamut of types you can pass to schedule_interval, and these will be used to seamlessly upgrade parameters.

DataTimetable encompases the current behaviour of taking a cron expression and computing data_interval_start based on the "previous" cron schedule, TimeDeltaTimetable is for the current timedelta case.

CronTimetable is the new and represents the "run at the exact time specified" use case -- i.e. for DAGs that do not want a concept of a data interval.

And this AIP would be incomplete without addressing one of the most common asks: to run at the "end" of the interval. We slightly invert this, and instead rephrase the ask as "run at this exact time", and this is provided by the CronTimetable class.

Code Block
languagepy
themeMidnight
class CronTimetable(AbstractTimetable):
    # data_interval_start = data_interval_end
    def __init__(self, cron_expression: str):
        ...

class DataTimetable(CronTimetable):
    def __init__(self, cron_expression: str, interval: Optional[timedelta] = None):
        ...

class TimeDeltaTimetable(AbstractTimetable):
    def __init__(self, schedule: timedelta, interval: Optional[timedelta] = None):
        ...


Example Uses

A few small illustrative examples of how the given problems outlined at the start can be achieved.

Use case 1: Daily on weekdays

DataTimetable is for DAGs that have a data interval -- i.e. all current dags.

Code Block
@dag(timetable=DataTimetable('0 0 * * MON-FRI', interval=days(1)))
def process():
    ...

When passed just a cron_expression argument the DataTimetable will have the current behaviour, where the data_interval_end (and the run_date) is set to the next interval.

By passing an explicit interval here, we make the data_interval_end be start+interval, and this is what gets us to having Friday's data be processed on Saturday at midnight, instead of Monday at midnight.

Use case 4: Run at the exact time specified

CronTimetable is for DAGs that don't want a data interval. In this case data_interval_start, data_interval_end and run_date all have the same value.

Code Block
languagepy
themeMidnight
@dag(timetable=CronTimetable('* 0 * * *')
def capture_snapshot():

    @task
    def run_pgdump(run_date):
        execute_pg_dump(
            filename=f"mydb_{run_date.isoformat()}.pgdump",
        )


Implementation Details

This is not an exhaustive list of steps, just a list of a few salient points.

  • Error if scheduler_interval and timetable args both provided to DAG.
  • Convert schedule_interval into an instance of CronTimetable.
  • Proxy next/previous schedule methods on DAG to Timetable instance
  • Since timetables will be evaluates in the scheduler we will need to "register" them in a similar manner to OperatorLinks (blindly taking a class-name from serialized format and importing the module/creating an instance is a possible security risk)
  • Add data_interval_start and data_interval_end columns to DagRun table

Future Enhancements

Once we have a basic pluggable timetable/schedule_interval here is some functionality that we could add, either with just a PR, a third party "timetable provider module" or a future AIP.

  1. "Rollup", an alternative to backfill

    This covers a few use-cases, but once we have an explicit start and end of interval defined it Is desirable to be able to re-run the same (for example) daily dag for a longer period.

    This could be an alternative to backfill, where the Trigger interface in the UI could show optional date range inputs letting the user say "run this dag for a 1 month period".

    This behaviour also leads to a new alternative method of doing catchup: if three dag runs are missed, rather than creating three individual daily runs (for example) we could instead create one DAG run with a three-day data interval. This would be opt-in on a per-dag basis, as it depends entirely on how the task is written

  2. @sunset

    This is an extension of the previous "run at exact time" use case, where an astronomer (not the company, but someone who looks at stars!) is wanting to use Airflow to automate processes that happen at the start of each night.

    This combines two features: running at the exact time specified (not the end of the period), and additionally where the exact time changes from day-to-day.

    (Background: Celery has support for this https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html?highlight=sunset#solar-schedules)

  3. Regular schedule, but delay start

    Often when processing data from third parties the delivery of the data is delayed, so a common pattern is to want to wait for a fixed offset. This could be done with a time sensor, but more efficient would be to delay the initial scheduling.

    For example, you might have a daily DAG where you want to process "yesterday's" data (i.e. current traditional ETL workload) but you want to start the job 2 hours late.

    Setting 0 2 * * * would "work" but then execution_date doesn't line up with the data correctly.

  4. Rolling windows

    For example, you might want to look at a 7 day rolling window of data, but run the DAG daily.

    This can be achieved right now by having a daily dag and then "manually" looking at an extra 6 days, but being able to formally specify this and have the data interval reflect it would be useful, especially when it comes to tracking data Lineage.

  5. More complex time schedules

    We could introduce the ability to have really powerful time schedules using boolean logic. For example:

    Code Block
    languagepy
    themeMidnight
    tt = Timetable(
            _or(_and(Hours([3, 9, 18]), Minutes([2, 30, 19])),
                _and(
                        Hours([17]),
                        Minutes([0, 15, 45]),
            )))


    This would run at 3:02, 3:19, 3:30, 9:02, 9:19, 9:30, 17:00, 17:15, 17:45, 18:02, 18:19 and 18:30