Status

StateAccepted
Discussion Thread
Vote Threadhttps://lists.apache.org/thread/cb2d9v3qr7h9mn54j6klxwtmxssq90yz
Vote Result Threadhttps://lists.apache.org/thread/yjpwx2tmrg6go3xmynwpxzr3ypttqcyn
Progress Tracking (PR/GitHub Project/Issue Label)
Date Created

19.09.2024 20:24

Version ReleasedTarget: 3.1
AuthorsDennis Ferruzzi 

Summary

The existing SLA implementation before Airflow 3.0 has been problematic and a constant point of debate.  In the Airflow 3 planning meetings, it was discussed and decided that the move to a new major version is a good time to tear out the old implementation and replace it with something new.   Since AIP-57 has been abandoned, this proposal will replace and supersede AIP-57.  That AIP and the discussion around it are not to be considered as part of this proposal and, unless mentioned here, nothing in those discussions or that AIP is considered to be part of this one.

The old SLA feature is removed in Airflow 3.0 (PR here) and in 3.1 it will be replaced with Deadline Alerts.  The purpose of this AIP is to discuss the implementation of the new Deadline Alerts feature to replace the SLA feature.

One of the main points of contention of the SLA feature revolves around when you start counting.  If you set an SLA of "1 hour", is that one hour after it was scheduled, one hour after it starts running, one hour after it was queued, or something else entirely.   This leads to frequent questions and confusion.  This AIP revolves around the idea of allowing the user to define that starting point and treat the new SLA as a calculated "need-by date".   To avoid confusion, the new feature will be referred to as Deadlines or Deadline Alerts rather than SLA and the implementation will allow a much more versatile experience for the user.

Motivation

Problems in the Old Implementation

(credit for this section to Sung Yun in AIP-57)

  1. Sla_miss was evaluated only if the task_id had ever been in SUCCESS or SKIPPED state.  This means that if the task had not succeeded in the first place - due to it being blocked, still running, or yet to be scheduled -  the sla_miss_callback was not executed.  This is a detrimental misfeature of the old SLA implementation, as users often want to use SLA for delay detection, to enable them to investigate any possible issues in their workflow pre-emptively. 
  2. If a dag is ever changed to rename the task_ids or add new task_ids, the first runs of the new task_ids will never have their SLAs evaluated. 
  3. SLA was defined as a timedelta relative to the dagrun.data_interval_end based on the timetable of the scheduled job.  Some DAGs are triggered externally - either manually or via a Dataset - meaning they don’t have a fixed schedule or start time preventing the evaluation of an SLA miss. 
  4. Sla_miss_callback was an attribute of the DAG, but the SLAs were defined as attributes of the individual tasks. This is counterintuitive, and does not allow users to run callbacks that are customized at the task level if deadlines are missed.
  5. The old implementation triggered the parsing of dag files when an sla_miss_callback was sent to the DagFileProcessor. This means that if an sla was defined within a dag, we may end up parsing the dag file more frequently than at the frequency defined by min_file_process_interval because the sla_miss_callbacks were sent to the DagFileProcessor every time dag_run.update_state was called. This is especially bad for sla_miss_callbacks compared to the other dag level callbacks, because sla_miss_callback was requested every time we examine and attempt to schedule the dagrun, even if there were no state changes within the dag.

Considerations

As stated above, this AIP intentionally ignores all previous discussion on the topic or fixing, changing, redefining, or re-implementing "SLA".   Previous discussions have been ongoing for a very long time, across many different mediums, and have gone through multiple proposals.  It is impossible to summarize or collate all those discussions, so treat this as a "blank slate" proposal and do not assume any other changes outside of what is explicitly stated herein.

Definitions

As this is a topic that has been discussed many times, in an effort to avoid confusion, I am using the following definitions in this project.

Reference / Reference Timestamp

An Enum will contain a selection of options, similar to how a user currently provides the TriggerRule.   The implementation will include an interface which allows future work to easily add new reference timestamps, but they will not be a freeform input for the user.  These options will resolve to a datetime() representing the possible points in time to start counting from.  This value can be dynamic (examples include NEXT_DAGRUN_EXECUTION_DATE  or DAGRUN_QUEUED_AT, which map to their respective database fields), or a static  date/time value provided as a string and cast to a datetime() object.

Delta

A datetime.timedelta.  When added to the reference timestamp you get the “need-by date”.  (see below)

Need-by

“Need-by” will be defined as the reference timestamp (see above) plus some delta.  For example,  `need_by = DeadlineAlerts.NEXT_DAGRUN_EXECUTION_DATE + 1_hour`

Callback

A Callable (or, by extension, a functools.partial) to execute when a miss is detected.  This implementation will allow for either a custom callback, as seen in success_callback and failure_callback, and special attention will go into making sure using any existing or custom Notifiers is as seamless as possible.

Why is it needed?

Before diving into the challenges with the current SLA feature, let's consider what our users are expecting from an SLA feature in Airflow:

(Credit for user stories to Shubham Mehta )

Scenario 1: The Mission-Critical Daily Report

Elmo, a data engineer at a large e-commerce company, manages a DAG that generates a critical daily sales report. This report must be ready by 9 AM every day for the executive team's business review.

Elmo expects:

  • To set a DAG-level deadline of 9 AM for the entire workflow
  • To receive immediate notifications if the DAG is running behind schedule and might miss the deadline
  • To easily adjust the Alert for weekends and holidays, while having ability to define deadline in local timezone
  • To see a historical view of how often the DAG meets its SLA, helping him identify trends and potential improvements

Scenario 2: The Long-Running Processing Task

Rosita, a data scientist at a finance firm, works with a DAG that includes a data processing task which transforms the data using EMR extracted from an external API. This task occasionally runs much longer than expected, delaying downstream processes.

Rosita needs:

  • To set a maximum execution time for the extract task
  • To receive notifications when the task exceeds its expected duration
  • To log detailed information when the expected time limit is breached, helping her investigate and optimize the task

Scenario 3: The ETL Pipeline

Zoe, an engineer at a tech startup, manages a DAG with multiple stages: data ingestion, preparation, processing, and reporting. She needs to ensure each stage completes within a specific timeframe.

Zoe requires:

  • To set individual Alerts for different tasks within her DAG
  • To receive notifications if any task in the pipeline exceeds its timeframe
  • To easily view which parts of her pipeline are consistently meeting or missing their deadlines

What change do you propose to make?

  1. In dag_processing/processor.py add "save next SLA": 
    1. If this DAG has a Deadline configured, then calculate the “need-by” and store the timestamp in a sorted-by-closest-timestamp object (or db table like the old SLAMiss table?)
  2. In the scheduler loop add “check for Deadline misses”:
    1. If the first/soonest sla has passed: pop the timestamp, queue the callback to run on the worker [possible future work to add a new process similar to a Triggerer], recurse.
  3. Add the Deadline Alert handler back in where the previous SLA callback handler used to live.
  4. When a dagrun ends, attempt to remove its SLA timestamp(s).

Examples

I may either go with the class init() pattern or the builder pattern, that will be decided at implementation, but usage would look something like one of these.   I am currently leaning towards the format of Example 2.

# Example 1
with DAG(
    dag_id="email_if_not_done_1_hour_after_dagrun_queued_example",
    schedule=None,
    start_date=yesterday,
    tags=["deadline_alerts_example"],
    deadline=Deadline(
        reference=DeadlineReference.DAGRUN_QUEUED_AT,
        interval=datetime.timedelta(hour=1),
        callback=send_smtp_notification(
            from_email="someone@mail.com",
            to="someone@mail.com",
            subject="[Error] The dag {{ dag.dag_id }} failed",
            html_content="debug logs",
        )
     )
) as dag:
    ...

# Example 2
with DAG(
    dag_id="custom_callback_if_not_done_1_hour_after_dagrun_scheduled_example",
    schedule=daily,
    start_date=yesterday,
    tags=["deadline_alerts_example"],
    deadline=DeadlineAlerts.DAGRUN_SCHEDULED_AT.build(
        interval=datetime.timedelta(hour=1),
        callback=functools.partial(my_custom_callback, arg1="Something went wrong!", arg2="Error Code: 123"),
    )
) as dag:
    ...

# Example 3
with DAG(
    dag_id="same_as_above_but_longer",
    schedule=daily,
    start_date=yesterday,
    tags=["deadline_alerts_example"],
    deadline=DeadlineAlerts.DAGRUN_SCHEDULED_AT
        	.with_interval(datetime.timedelta(days=1))
        	.callback(my_custom_callback)
        	.callback_kwargs({"arg1": "Something went wrong!", "arg2": "Error Code: 123"})
			.build()
) as dag:
    ...

# Example 4
with DAG(
    dag_id="email_if_not_run_in_over_24_hours_example",
    schedule=daily,
    start_date=yesterday,
    tags=["deadline_alerts_example"],
    deadline=DeadlineAlerts.DAG_LAST_EXECUTION_TIME.build(
        interval=datetime.timedelta(hour=1),
        callback=send_smtp_notification(
            from_email="someone@mail.com",
            to="someone@mail.com",
            subject="[Error] The dag {{ dag.dag_id }} failed",
            html_content="debug logs",
        )
	)    
) as dag:
    ...

Metrics

New metrics will embrace an OTel-first design plans, meaning tags will be considered from the beginning, not an afterthought

  1. deadline_alerts_triggered: A counter tagged with the dag_id so a user can see how many times a given DAG has caused a Deadline Alert.
  2. Other New Metrics Here??

Manually-triggered DAG support

Code will be added to the dagrun trigger so that manually-triggered dag will set “scheduled_start_time” to “utcnow()”

Extensibility

Changes will be made in such a way to minimize the conflict with the other potential Deadline types which are mentioned below under "Future Work"[Appendix 1].

Current Reference Candidates [See also: Appendix 2]

  • dag_run - execution_date
  • dag_run - queued_at
  • job - start_date
  • serialized_dag - last_updated
  • dataset - created_at (future work)
  • dataset - updated_at (future work)


What problem does it solve?

Allowing the user to decide which point int he DAG's lifecycle the need-by time is based off will help clear up much of the confusion around the old SLA implementation.

Are there any downsides to this change?


Which users are affected by the change?

All users who currently use the SLA feature will need to migrate.  The SLA feature has been removed in 3.0 and replaced with a notification to this effect, and the documentation for the Deadline Alerts will include examples to set an Alert which works the same way as the old SLA to help minimize the transition effort for those users who liked the old way.

How are users affected by the change? (e.g. DB upgrade required?)

The SLAMiss table has been removed in 3.0 and this implementation will likely replace it with a similar DeadlineAlerts table which maps dagrun_id::timestamp.


What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3)

DAGs which implement the old SLA feature will need to be manually migrated.  Documentation will include examples showing the old vs new configurations to make that as easy as possible

What defines this AIP as "done"?

This AIP will only cover DAG-level Deadlines. 


Appendices

Appendix 1: Future Work

Future work could add:

  • Task-level Deadlines (Task within a DAG did not finish before a given time)
  • Dataset Deadlines (Dataset has not been modified before a given time)
  • Update the Alerts view on the frontend to include a user-defined failure code or message.
  • A new sidecar process, equivalent to a Triggerer, which can offload the processing of the Deadline Alert callbacks may be future work.

Appendix 2: Other timestamps in the databases that could possibly be used

This is a list of all timestamps in the DB, included purely to inspire ideas of other ways which this feature might be used.  Many of these are not practical to use as a basis for a Deadline, but user cases may come with with interesting ideas and these are included here for reference.  The list is formatted as table_name - field_name 

  • log - dttm
  • log - execution_date
  • job - end_date
  • job - latest_heartbeat
  • job - start_date
  • callback_request - created_at
  • dag_code - last_updated
  • dag_pickle - created_dttm
  • ab_user - changed_on
  • ab_user - created_on
  • ab_user - last_login
  • ab_register_user - registration_date
  • sla_miss - execution_date
  • sla_miss - timestamp
  • import_error - timestamp
  • serialized_dag - last_updated
  • dag_schedule_dataset_alias_reference - created_at
  • dag_schedule_dataset_alias_reference - updated_at
  • dataset - created_at
  • dataset - updated_at
  • dag_schedule_dataset_reference - created_at
  • dag_schedule_dataset_reference - updated_at
  • task_outlet_dataset_reference - created_at
  • task_outlet_dataset_reference - updated_at
  • dataset_dag_run_queue - created_at
  • dataset_event - timestamp
  • dag - last_expired
  • dag - last_parsed_time
  • dag - last_pickled
  • dag - next_dagrun
  • dag - next_dagrun_create_after
  • dag - next_dagrun_data_interval_end
  • dag - next_dagrun_data_interval_start
  • dag_warning - timestamp
  • log_template - created_at
  • dag_run - data_interval_end
  • dag_run - data_interval_start
  • dag_run - end_date
  • dag_run - execution_date
  • dag_run - last_scheduling_decision
  • dag_run - queued_at
  • dag_run - start_date
  • dag_run - updated_at
  • task_instance - end_date
  • task_instance - queued_dttm
  • task_instance - start_date
  • task_instance - trigger_timeout
  • task_instance - updated_at
  • dag_run_note - created_at
  • dag_run_note - updated_at
  • task_reschedule - end_date
  • task_reschedule - reschedule_date
  • task_reschedule - start_date
  • task_fail - end_date
  • task_fail - start_date
  • xcom - timestamp
  • task_instance_note - created_at
  • task_instance_note - updated_at
  • task_instance_history - end_date
  • task_instance_history - queued_dttm
  • task_instance_history - start_date
  • task_instance_history - trigger_timeout
  • task_instance_history - updated_at
  • trigger - created_date
  • session - expiry


29 Comments

  1. "Anchors" is a terrible name and I need alternatives.  I hate calling it an anchor but I can't seem to think of a name that works better.  Please suggest.

    1. "sla_reference" is my preference

  2. one of the challenges with using the name "alerts" is that there's already some alerting in airflow namely email on failure / retry.  not sure if there's a way to unify these.

    1. That's fair.  I wanted to use a name other than SLA to help avoid confusion, especially in the transition period when some users will still be on Airflow 2.x.  But maybe that's not a good enough reason for a new name.... or maybe we need a different name.   I'm always open to better naming suggestions. 

      1. I like departing from SLA. Maybe we should simply name it "alert misses" or something like that to indicate that those are special kinds of alerts?  We could then rename "alerts" to "alert_status" for example or smth like that.

      2. These aren't really alerts to me – alerts is a richer than just running callbacks (i.e PagerDuty/OpsGenie etc where you have on call schedules and warning/error thresholds etc.)


        So -1 to calling these  "alerts" to me.


        What's wrong with keeping these called SLAs?

        1. SLA — service level alerts (smile)

        2. I'd say the callback can be anything - emails, slack messages, etc - which trigger on a condition, so they are definitely 'alerts' in my mind.

  3. I worry that we are making the same sort of mistakes we did with the timetables – namely making the interface too complex for the majority of our users so they never actually use the feature.

    1. I am not too worried about complexity but I generally agree with Ash Berlin-Taylor  - we might fall into the same trap. It is a bit low-level (because you need to understand how the "anchor" + "tiime-delta" will map to your "business" expectations.

      But I think comparing to TimeTables, this one is something that can be controlled/authored by DAG authors (so it's easier to experiment and test it than Timetables) and also the API is rather straightforward, there are not that many gotchas as we had with Timetables that's why I am not too worried.

      I think MAYBE a good way to making it simple will be to actually produce a list of use cases and how they actually map into the combination of "anchor" and "delta". What I miss in the AIP now is how the use-cases described at the beginning will map into actual parameters set for alerts (maybe the examples map to the use cases above but it's not really clear that it is the case.

      Also even better thing that we could do next is to get some predefined combos / constants/tuples whatever - that could be used to represent those use cases - so that less advanced users could understand what kind of Alerts they are adding on a "business" level rather than having to reason about the anchor + delta and what it means.

      1. >get some predefined combos / constants/tuples whatever - that could be used to represent those use cases

        Yeah, that's a solid idea.  There can (and should) be some "built-ins" here.

    2. I have had similar thought.  The interface does feel a little bit like maybe it's what the internal classes might look like but maybe the user interfaces might want to be simpler.

      I guess we should ask ourselves, does the user really need the flexibility of choosing any "time field" as a column to measure against.  Maybe they do. If not, maybe we should just make it easy and give them a few different classes or modes.

      Thinking out loud, one way would be to do something along these lines

      • DurationSla(timedelta=timedelta(minutes=30))
        • the "soft timeout" that sung was proposing, which would evaluated against when the task or dag run started.
      • CompletionSla(timedelta=timedelta(minutes=30))
        • this would be the "old" sla – measured against the earliest time the dag should have been scheduled, according to the schedule
      • NewWeirdSla?
        • maybe we could do something like, set the expectation that the dag should run at least once daily by 9am.  or at least once daily between the hours of X and X.  What would the interface for that look like?  But I can imagine that being useful for non-scheduled dags.

      That would be going the classes approach.  Alternatively could do modes.

      • TaskSla(window=timedelta(hours=1), mode='completion')
        • task should complete within 1 hr of the earliest time it should have been scheduled according to the schedule
      • TaskSla(window=timedelta(hours=1), mode='duration')
        • task should take no longer than 1 hr
      • DagSla(window=timedelta(hours=1), mode='completion')
        • dag should complete within 1 hr of earliest time it should have been scheduled
      • DagSla(window=timedelta(hours=1), mode='duration')
        • dag should not take longer than 1 hr after it starts, no matter when it starts
      • TaskSla(cadence='0 9 * * * *', window=timedelta(hours=-2), mode='...schedule?')
        • this is setting the expectation that the dag should run at least once in each interval defined by the cron schedule, no matter what the normal schedule is.
        • by specifying window, basically this means we should expect it to see a run between 7 and 9am daily.

      The point here is making it simpler for users because we sorta know what the most common use cases will be and they may not really need the full flexibility.  It's a thought anyway.

      Another thought... on naming.

      Another way to think about this is an expectation framework. An interface for defining expectations around when tasks should run / complete.

      Expectation could be a bit too general because, this is about expectations around task execution and not e.g. data quality.  But I dunno it kindof feels right.

      TaskExpectation

      DagExpectation etc

      1. The class approach is intriguing. I'll need to think on that a bit, but I like it as an idea.

        I think the complexity is toned down by providing the enum that the user can pick from.    So on the implementation end, we can add any timestamp to that enum, and the user could skip the enum entirely and use whatever they want to put in there, but the "advertised" use that will be in the docs and guides would be to use that enum, which effectively limits their options and leaves the "BYOTimestamp" option for powerusers who want some funky anchor like "user_created_at" for creating a user directory and firing off a welcome package or something.  But I do agree that it has the potential to get very complicated.

      2. I've updated the examples to a few ideas of how a class could look, and I'm liking the way the builder pattern in Example 2 looks from a user perspective.  Coincidentally, it also entirely removes the need for the name for the Anchor/Reference/whatever, which is very nice.

  4. Could you create a "fuller" example showing this in a sample dag please? I think I know how this would show up, but it would be useful to see in context.

    1. Yeah, I'll add some more detailed examples.

      1. +1 on the fuller examples, please!

    2. I've replaced the examples with more detailed ones.

      1. Ash Berlin-Taylor Vikram Koka  - Do those examples look better to you?

        1. Much clearer, thanks!

        2. Yes, thank you Dennis Ferruzzi 
          This definitely made it much easier for me to understand!

  5. Okay, my 2c:

    1. I think we should use notifiers, not "raw" callbacks, and
    2. I'm not 100% sold on the "raw" alert dict for default case, i.e. maybe we should package that up in a function or object for common use cases

    On the name side of things: my issue with alert: alert makes me thing it would also cover cases where the dag fails, but that's not what it is. It's only for SLA/lateness

    1. > I think we should use notifiers, not "raw" callbacks, and

      I don't want to restrict callbacks to only being a Notifier.  Notifiers are a very valid usecase and I can make sure they are a thoroughly documented use example, but I want to be sure that a user had flexibility here to do more than just get notified.  As one example, what if a given data source is updated every hour and the user wants to say if the DAG hasn't completed within an hour of that refresh, then don't bother continuing to process the stale data.  A Notification may (should?)  be part of that callback, but they may also want to terminate the DAG rather than waste resources processing stale data.

      > I'm not 100% sold on the "raw" alert dict for default case, i.e. maybe we should package that up in a function or object for common use cases

      I can get behind this.  I think it was mentioned elsewhere int he comments as well, but I think making this into some form of structured data object or class is a good idea rather than a raw dict.  Do you want me to update the examples before I call a vote or does this comment work for you? 

      1. I've updated the examples to drop the raw dict and replaced them with a couple of ideas showing both the builder pattern and the class.init pattern in use.  I like the way the builder pattern looks here, especially the format shown in Example 2, so I'll lean towards that.  Coincidentally, it also entirely removes the need for the name for the Anchor/Reference/whatever, which is very nice.

    2. Dennis Ferruzzi Any thoughts about "this alert is only for SLA/latness, so it's not all alerts" naming sticking point? I hate to be sticking on a naming issue, but I do think this is an important point.


      Other than that I'm really liking the latest changes to this proposal now!

      1. Agree with Ash here!!
        In terms of naming, I think both "Deadline Alerts" and "SLA Alerts" are good options.
        The case for "SLA Alerts" is pretty strong - people already know what SLA means in Airflow (Dennis - just look at the internal AWS channel with hundreds of SLA mentions), and users already thought SLA callbacks would work the way we're proposing.
        As for "Deadline Alerts", it clearly tells users what the feature does - it alerts you when your stuff doesn't finish by a certain deadline, and it'll work well for future things like dataset checks too.
        While both names would work, I slightly favor "Deadline Alerts" since it's more straightforward and abides by Dennis's principle of not carrying any baggage from the old system.
        One thing's for sure though - we shouldn't just call it "Alerts" and start voting with it, as that'll just lead to endless naming debates instead of focusing on the actual feature.

      2. I can get behind "Deadlines" or "Deadline Alerts"

  6. I didn't want to complicate the inline comment thread above, but here is the section I would like to understand better. 
    ---

    In the scheduler loop add “check for Deadline misses”:

      1. If the first/soonest sla has passed: pop the timestamp, queue the callback to run on the worker [possible future work to add a new process similar to a Triggerer], recurse.

    ---

    I have two questions / thought around this:
    a. Now that 3.0 is out, would a medium term choice be to run the callback as part of the DagFileProcessor? 
    I know that there is planned work to move Dag-level callbacks to the Worker as part of AIP-72. Is that the other option here and is that therefore a dependency? 

    b. Based on the feedback in the Airflow meetup at SF a month or so ago, there seems to be strong user desire to run the loop for "check for Deadline misses" in a process/component separate from the scheduler / worker, so that the "observer" doesn't affect the running of the tasks. 
    I therefore believe that there should be provision to run this loop outside of the scheduler for larger deployments, rather than defaulting to being directly within the scheduler loop.

    Dennis, you were in the same meetup as well, so curious about your updated thoughts on this.