Status

StateCompleted
Discussion Thread

Scoping out a new feature for 2.1: improving schedule_interval

[DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs

Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

ProjectAIP-39 Pluggable schedule_interval
In Release2.2.0

Motivation

Currently a DAG can be scheduled by either providing a cron expression (or one of the @ presets) or by passing a timedelta, and this works well for DAGs that need to operate on a regular schedule but there are many other use cases that are currently hard to express "natively" in Airflow, or require some confusing work arounds.

In addition to having more powerful scheduling options, the execution_date concept is one of the most confusing things for newcomers to Airflow to learn. Part of this is simply the name -- the immediate thing that springs to mind is "the time when the task is executing", i.e. right now, but that is not the meaning this holds (it is always at least one interval older than that, if not more when a task is being re-run)

In this AIP we propose a new name for execution_date, introduce a new concept called "data interval", and present an interface for more powerful pluggable scheduling behaviours.

Desirable Schedule patterns

A few examples of the kind of workflows/schedule that I would like Airflow to be able to handle (and if possible, the current way of achieving this)

  1. Daily on weekdays

    If you tried the "naive" expression of "0 0 * * MON-FRI" this would 'work', but not quite as users might expect. This is because the data for Friday is not processed until the start of the next interval, which is Monday at midnight

    Execution

    When it's run

    Monday

    Tuesday 00:00:01

    Tuesday

    Wednesday 00:00:01

    Wednesday

    Thursday 00:00:01

    Thursday

    Friday 00:00:01

    Friday

    Monday 00:00:01

    The current work around for this is to have a task at the start of the DAG that skips if run for a Saturday or Sunday, but we should be able to do better.


  2. 15th or last-working day before

    This is a common day for pay-roll in the US (where companies frequently pay twice-a-month)

    Right now this is tricky to express in Airflow -- the best effort would be to run on the 15 for 4 days before and, and then skip if there is a later working day.

  3. Trading days only

    (i.e. dates when Financial markets are open)

    This is an extension of the above Mon-Fri case, but with the additional "filter" that some weekdays are excluded. The exact days vary from country to country and year to year.

    Again, to achieve this it would involve a check/skip as the first step of the DAG

  4. Run at the exact time specified

    The current behaviour of running the DAG run after the period is over (i.e. daily after that day is over) makes sense for a "traditional" data processing job, where you can't process a day's worth of data until the day is over.

    However there are plenty of valid workflows where having this "execute after period is over" behaviour is confusing.


    One such example where this workflow would be useful is where you are hitting an API to collect snapshot data, where having the data "stamped" with the time of access is the most intuitive.


    There is no real work around for this -- other than perhaps using
    next_execution_date instead of execution_date in templates.

    This has been asked for many times, most recently https://github.com/apache/airflow/pull/5787 and https://github.com/apache/airflow/issues/8237

Many of these use cases we will not directly tackle in this AIP, instead we will provide a means to make the scheduling pluggable enough to support them, including allowing users to write their own behaviours. These features could be added by providers, meaning they can have separate release schedules from Airflow core.

Proposal

Deprecate execution_date

The name execution_date is just too confusing, and at this point the only practical option is to rename it to something else.

We propose renaming execution_date to data_interval_start, and next_execution_date to data_interval_end.

Accessing the parameters in the context dict will raise a DeprecationWarning (which should then cover access via Python/custom operator and Jinja templating.

Other names considered and rejected:

  • transaction_start/end: Confusing with DB transactions
  • period_start/end: Mathematically wrong (the period end would be analogous to the end_date of the dag, i.e. the point at which there are no more intervals)
  • batch_start/end: "But what about streaming?"
  • data_window: not as clear

Replace execution_date with run_id on TaskInstance table

With the deprecation of execution_date in the context, we should also remove it from TaskInstance too.

Currently the link between TI and DagRun is implicit via the (dag_id, execution_date) columns. The OpenAPI specification for DagRuns already makes run_id "key" in the URL as we foresaw this type of work happening.

Now that DagRun is firmly entrenched in the Airflow, (and has been for a long time) we can make this relationship explicit and have a TI linked to the DagRun via run_id, and remove execution_date on TI (with deprecation to look up this property via DagRun so it is not a breaking change.)

Making this link explicit would mean that deleting a DagRun would also delete the TaskInstances for it.

Rename DagRun.execution_date to logical_date

We need a single date value that is the "canonical" date for the DagRun date, that can be used for sorting DagRuns for the UI view, so the current "execution_date" column on DagRun needs to be kept, but we should rename - and we've chosen "logical_date".

(Note: The original proposal voted on used the name "schedule_date", but when documenting the field we realised this name is also hard to explain. Since existing documentation already describe execution_date as "the DAG run's logical date" in multiple places, we feel logical_date is a more intuitive naming instead.)

We will also clearly document the meaning of this column -- it is the time when the DagRun is scheduled to be run (or more technically the earliest time it is runnable – if the scheduler is backed up it may not start right away.)

Summary of Renames


Old Name

New Name

execution_date

data_interval_start

next_execution_date

data_interval_end

tomorrow_ds

<no replacement, deprecate>

tomorrow_ds_nodash

<no replacement, deprecate>

yesterday_ds

<no replacement, deprecate>

yesterday_ds_nodash

<no replacement, deprecate>

prev_execution_date

prev_logical_date

next_execution_date

next_logical_date


ds, ds_nodash, ts, and ts_nodash, ts_nodash_with_tz will be kept, but the value will be changed to be based upon data_interval_start (this means the current value would be unchanged.)

Introduce the concept of a "Timetable" class

To support all the different use cases, and to allow support for cases we haven't yet thought of we propose adding a new concept to Airflow of the Timetable class.

schedule_interval parameter to DAG will not be removed, or even deprecated, but transparently upgraded. A single parameter makes sense (and is clean) for many current use-cases, so we don't want to remove that.

In this AIP we won't include many Timetable classes, just enough to replicate the current functionality,

AbstractTimetable interface

The scheduler will use the dag.timetable property to make all scheduling decisions (deprecating methods such as following_schedule) to the timetable instance.

The current proposed interface is this:

class DagRunInfo(NamedTuple):
    run_date: Optional[DateTime]
    """
    The earliest time that this DagRun could be created and tasks scheduled, or None for do not schedule.

    Returning None is not a "terminal" state, this timetable can return a date next time it is asked
    """

    data_interval: Optional[Tuple[DateTime, DateTime]]
    """
    The data interval this DagRun should operate over, as ``[start, end)``
    (meaning start is inclusive, but end is exclusive)
    """


class AbstractTimetable(ABC):
    @abstractmethod
    def next_dagrun_info(
        date_last_automated_dagrun: Optional[pendulum.DateTime],

        session: Session,
    ) -> Optional[DagRunInfo]:
        """
        Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the
        execution date, and the earliest it could be scheduled

        :param date_last_automated_dagrun: The max(execution_date) of existing
            "automated" DagRuns for this dag (scheduled or backfill, but not
            manual)
        """

Create TimeDeltaTimetable, DataTimetable, and CronTimetable classes

TimeDelta and DataTimetable classes represent the current possible gamut of types you can pass to schedule_interval, and these will be used to seamlessly upgrade parameters.

DataTimetable encompases the current behaviour of taking a cron expression and computing data_interval_start based on the "previous" cron schedule, TimeDeltaTimetable is for the current timedelta case.

CronTimetable is the new and represents the "run at the exact time specified" use case -- i.e. for DAGs that do not want a concept of a data interval.

And this AIP would be incomplete without addressing one of the most common asks: to run at the "end" of the interval. We slightly invert this, and instead rephrase the ask as "run at this exact time", and this is provided by the CronTimetable class.

class CronTimetable(AbstractTimetable):
    # data_interval_start = data_interval_end
    def __init__(self, cron_expression: str):
        ...

class DataTimetable(CronTimetable):
    def __init__(self, cron_expression: str, interval: Optional[timedelta] = None):
        ...

class TimeDeltaTimetable(AbstractTimetable):
    def __init__(self, schedule: timedelta, interval: Optional[timedelta] = None):
        ...

Example Uses

A few small illustrative examples of how the given problems outlined at the start can be achieved.

Use case 1: Daily on weekdays

DataTimetable is for DAGs that have a data interval -- i.e. all current dags.

@dag(timetable=DataTimetable('0 0 * * MON-FRI', interval=days(1)))
def process():
    ...

When passed just a cron_expression argument the DataTimetable will have the current behaviour, where the data_interval_end (and the run_date) is set to the next interval.

By passing an explicit interval here, we make the data_interval_end be start+interval, and this is what gets us to having Friday's data be processed on Saturday at midnight, instead of Monday at midnight.

Use case 4: Run at the exact time specified

CronTimetable is for DAGs that don't want a data interval. In this case data_interval_start, data_interval_end and run_date all have the same value.

@dag(timetable=CronTimetable('* 0 * * *')
def capture_snapshot():

    @task
    def run_pgdump(run_date):
        execute_pg_dump(
            filename=f"mydb_{run_date.isoformat()}.pgdump",
        )

Implementation Details

This is not an exhaustive list of steps, just a list of a few salient points.

  • Error if scheduler_interval and timetable args both provided to DAG.
  • Convert schedule_interval into an instance of CronTimetable.
  • Proxy next/previous schedule methods on DAG to Timetable instance
  • Since timetables will be evaluates in the scheduler we will need to "register" them in a similar manner to OperatorLinks (blindly taking a class-name from serialized format and importing the module/creating an instance is a possible security risk)
  • Add data_interval_start and data_interval_end columns to DagRun table

Future Enhancements

Once we have a basic pluggable timetable/schedule_interval here is some functionality that we could add, either with just a PR, a third party "timetable provider module" or a future AIP.

  1. "Rollup", an alternative to backfill

    This covers a few use-cases, but once we have an explicit start and end of interval defined it Is desirable to be able to re-run the same (for example) daily dag for a longer period.

    This could be an alternative to backfill, where the Trigger interface in the UI could show optional date range inputs letting the user say "run this dag for a 1 month period".

    This behaviour also leads to a new alternative method of doing catchup: if three dag runs are missed, rather than creating three individual daily runs (for example) we could instead create one DAG run with a three-day data interval. This would be opt-in on a per-dag basis, as it depends entirely on how the task is written

  2. @sunset

    This is an extension of the previous "run at exact time" use case, where an astronomer (not the company, but someone who looks at stars!) is wanting to use Airflow to automate processes that happen at the start of each night.

    This combines two features: running at the exact time specified (not the end of the period), and additionally where the exact time changes from day-to-day.

    (Background: Celery has support for this https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html?highlight=sunset#solar-schedules)

  3. Regular schedule, but delay start

    Often when processing data from third parties the delivery of the data is delayed, so a common pattern is to want to wait for a fixed offset. This could be done with a time sensor, but more efficient would be to delay the initial scheduling.

    For example, you might have a daily DAG where you want to process "yesterday's" data (i.e. current traditional ETL workload) but you want to start the job 2 hours late.

    Setting 0 2 * * * would "work" but then execution_date doesn't line up with the data correctly.

  4. Rolling windows

    For example, you might want to look at a 7 day rolling window of data, but run the DAG daily.

    This can be achieved right now by having a daily dag and then "manually" looking at an extra 6 days, but being able to formally specify this and have the data interval reflect it would be useful, especially when it comes to tracking data Lineage.

  5. More complex time schedules

    We could introduce the ability to have really powerful time schedules using boolean logic. For example:

    tt = Timetable(
            _or(_and(Hours([3, 9, 18]), Minutes([2, 30, 19])),
                _and(
                        Hours([17]),
                        Minutes([0, 15, 45]),
            )))


    This would run at 3:02, 3:19, 3:30, 9:02, 9:19, 9:30, 17:00, 17:15, 17:45, 18:02, 18:19 and 18:30

10 Comments

  1. OK Ash Berlin-Taylor 

    Here are my thoughts.

    1. I'd call it `CronSchedule` `TimetableSchedule` etc.

    2. I would remove data_interval_start / end concept from this proposal and combine CronSchedule and DataSchedule.  When interval is None they are the same.

    And I would make CronSchedule be 

    def __init__(self, cron_expression, delay) # i.e. rename interval to delay.

    I think that it simply isn't necessary to add the data-awareness component along with this change, and it touches on a bit of complexity that I think is better left until later with an AIP that focuses on it more exclusively.  

    I think that everything you are enabling with `data_interval_start` / end could already be accomplished (and done so more explicitly) with reference to `prev_run_date` / `next_run_date`.  

    So I think it's best to not introduce the feature until it unlock's something because it will only constrain future development in this area without adding a lot of value.  Dags will be easier to reason about when they reference `prev_schedule ..` `next_schedule` rather than `data_interval_start` / end (particularly when the latter are actually derived from the former anyway).

    What do you think?


    1. 1) Yeah, sure

      2) data_interval being separate from scheduling interval is the key part of this proposal – without it we can't do the Mon-Fri case and have Friday's data be processed at 00:00:00 on Saturday (without making it run every day and skip Saturday and Sunday).

    2. Daniel Standish Ash Berlin-Taylor

      1. I have existing thoughts on Timetable.  I'll continue our discussion in the developer email thread.
      2. I think you're drawing some inaccurate conclusions about what's possible with this interface. I'll address the main ones individually, but the overall reason is that because this is just an interface, the complexity of the logic that determines the next dagrun info is arbitrary. This method is free to run calculations, access a data store, make an HTTP request, etc. This arbitrary logic extends to both when the next DAG should next be scheduled, and the duration of the data interval.
        • “[data_interval is] always going to be derived from `date_last_automated_dagrun`”.
          • While `date_last_automated_dagrun` is the only argument passed into `next_dagrun_info`, a more complete way of thinking about it is that `next_dagrun_info` returns the next time to schedule a DAG after arbitrary date X. Rather than grabbing the next date from a predetermined sequence, as Airflow does currently with cron, this allows each execution to have its scheduled_date and data_interval calculated independently. It can also access the context_variables passed in.
        • “I.e. more complicated things like replacing backfill, or having "inital load" behavior differ from incremental behavior (this is a common scenario, or using watermarks.  Because the only input  seems to be the prev dag run date – and anything you can do with this you could simply do in a template (and in a more transparent way). “
          • Backfilling can be performed by including logic that checks a context variable for the last successful execution and then extends the data interval back to that point.
        • “You also have the problem of configuring this behaviour at the dag run level as opposed to the task – maybe within your dag you have 1 task that only looks back 1 day but 1 other task that has a 7-day rolling window. “
          • This introduces a lot of complexity that seems out of scope with updating the DAG-level schedule_interval.
        • “my mind is like "let's stick with template calculations" until we have something that is more powerful. “
          • I’d argue that this is an exceedingly powerful design, in terms of it’s ability to add flexibility with little complexity. It isn’t powerful in the sense of “solving more problems than before”, rather, it tries to come up with better solutions to the subset of problems already addressed by data_interval and the scheduling logic.
        • “It doesn't seem that you need the data interval concept in order for a Schedule abstraction to be useful.  Without it you could still use it to implement arbitrary schedules (and to have things "run at this time"), which is what people have been asking for.  Only the data interval logic would stay with the tasks (like it is now) rather than be moved to the schedule class.”
          • That would minimize the usefulness of this considerably, as the interval and the scheduled time are inextricably linked. Further, scheduler_interval is currently handled at the DAG-level. So this is just updating the existing DAG-level functionality; it should impact the ability of the Task-level logic to operate the same as it currently does.
        • “If you wanted one dag to have rolling window and another to be daily intervals, you'd need DailySunsetSchedule in addition to Rolling7DaySunsetSchedule?”
          • This is true, but I’m not sure why it’s a problem. Creating Rolling7DaySunsetSchedule is just a matter of extending DailySunsetSchedule and modifying one or two properties.
        • “It could take into account task context* and therefore it could determine if it is initial load.  It could also be independent of other tasks in the same dag”.
          • Not sure I follow this logic entirely. We want to minimize coupling, but I agree that making macros or Dag-level properties would be useful.




      1. James Timmins thanks for the thorough replies.

        Honestly when reading the proposal the method set_context_variables completely slipped past me – did not see it.  (FWIW though it is present in the code block, it's not mentioned anywhere in the text of the AIP or used in examples – might be helpful for readers to add an example that makes use of this).

        Without set_context_variables you'd be unable to know which dag you're in let alone which task or task instance which is why it didn't seem valuable.  But, being able to mutate task context, clearly this is more powerful.

        I'll think think through some of the use cases and provide more feedback later.

  2. James Timmins when set_context_variables is used for the purpose of modifying the data interval, the actual intervals processed aren't recorded anywhere.  This would seem to somewhat limit the backfill and lineage use cases, because you would not know after the fact what data was actually processed.  The dag run data interval might tell you one thing but the reality might be different.  You have any concerns with that?


  3. Interval is proposed to be stored on the DagRun table in two new columns:

    Add data_interval_start and data_interval_end columns to DagRun table


    1. I understand that.  But it's not clear what happens when set_context_variables is used to modify the interval.

      Part of the proposal is this:

      "Backfilling can be performed by including logic that checks a context variable for the last successful execution and then extends the data interval back to that point"

      That suggests you'd do something like this:

      def set_context_variables(context):
          context['data_interval_start'] = context['prev_start_date_success']

      But if you modify the  interval using context variables it's not clear if it's stored anywhere.  It would not be the same as the interval stored on dag run.

      So if you then wanted to run "airflow backfill" and have it "fill in the gaps", the dag run table would not tell you the truth, and there would be nowher to go to find the truth.

      In post_execute you could write out the actual calculated interval.  But not to dag_run table because it might be different than other tasks.  


      1. For backfill that would take start and end dates as input from the user.


        Perhaps having the `set_context_variables` method is just more confusing – it's certainly not needed for anything in the initial proposal, so we should probably remove it for clarity.

        1. we should probably remove it for clarity

          Either way.

          I've been struggling with this data interval implementation because I think there's a need for a more comprehensive data interval framework that's pluggable at the task level.  Admittedly, since I don't have that vision all worked out, it's not that helpful.  And you guys didn't seem to intrigued by the thought (smile) so...

          But I don't think there's anything about this AIP that would prevent us from adding that later.  So it's not a reason to oppose this proposal.

          For example, you are adding a data interval at the scope of the dag run, but later we could extend so that there are data intervals within the scope of each task and task group that could be implemented.

          It still does seem to me that this proposal would accomplish its mission just the same without data interval:

          • abstracting schedule_interval into classes gives us power to define arbitrary schedules
          • adding "delay" let's us solve the M-F midnight use case.
          • renaming execution date to schedule date (and letting the schedule date coincide roughly with start time) reduces a long-standing and painful ambiguity

          Tasks could simply reference the "primitives" `schedule_date` and `delay` to articulate their data intervals.

          But anyway please do go ahead and move forward this and don't let my reservations hold you back or make you feel like I'm not enthusiatic or supportive of this effort in general cus I am.  When you have a PR to test out I'll be happy to try it out and we can continue the discussion with more concrete examples.

          Small notes:
          1. I agree with others on the mailing list thread that Schedule is better than Timetable.  Again naming need not block the AIP.
          2. In `set_context_variables` it seems that you don't need `dagrun` as a param because it's already present in context.
          3. I suggest `update_context_variables` because the context already exists at this point
          4. Just a comment that sometimes one might have integer and binary intervals. I know that datetime is implied by the `interval` param but a datetime could be stored as string values.

          Thanks

        2. One thing about cron_expression + interval is that since interval is timedelta, one thing you cannot do is have interval equal to the schedule interval (i.e. the behavior for all dags right now).

          So for example we can support your M-F example.  But it seems DataTimetable can't be used to achieve the existing behavior for that cron expression, i.e. where the "interval" or "delay" is exactly the schedule interval (so that monday morning at midnight the interval start is is last schedule date and end is current schedule date).

          For that I think you'd need to use CronTimetable and reference prev_schedule_date (or make a another subclass).

          That sound right?