Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties


StateAccepted
Discussion Thread
Vote Threadhttps://lists.apache.org/thread/cb2d9v3qr7h9mn54j6klxwtmxssq90yz
Vote Result Threadhttps://lists.apache.org/thread/yjpwx2tmrg6go3xmynwpxzr3ypttqcyn
Progress Tracking (PR/GitHub Project/Issue Label)
Date Created

Handy Timestamp
formatdd.MM.yyyy HH:mm
time1725575268198
typePublishing the page

Version ReleasedTarget: 3.1
AuthorsDennis Ferruzzi 


Table of Contents
excludeStatus

Summary

The existing SLA implementation before Airflow 3.0 has been problematic and a constant point of debate.  In the Airflow 3 planning meetings, it was discussed and decided that the move to a new major version is a good time to tear out the old implementation and replace it with something new.   Since AIP-57 has been abandoned, this proposal will replace and supersede AIP-57.  That AIP and the discussion around it are not to be considered as part of this proposal and, unless mentioned here, nothing in those discussions or that AIP is considered to be part of this one.

The old SLA feature is removed in Airflow 3.0 (PR here) and in 3.1 it will be replaced with Deadline Alerts.  The purpose of this AIP is to discuss the implementation of the new Deadline Alerts feature to replace the SLA feature.

One of the main points of contention of the SLA feature revolves around when you start counting.  If you set an SLA of "1 hour", is that one hour after it was scheduled, one hour after it starts running, one hour after it was queued, or something else entirely.   This leads to frequent questions and confusion.  This AIP revolves around the idea of allowing the user to define that starting point and treat the new SLA as a calculated "need-by date".   To avoid confusion, the new feature will be referred to as Deadlines or Deadline Alerts rather than SLA and the implementation will allow a much more versatile experience for the user.

Motivation

Problems in the Old Implementation

(credit for this section to Sung Yun in AIP-57)

  1. Sla_miss was evaluated only if the task_id had ever been in SUCCESS or SKIPPED state.  This means that if the task had not succeeded in the first place - due to it being blocked, still running, or yet to be scheduled -  the sla_miss_callback was not executed.  This is a detrimental misfeature of the old SLA implementation, as users often want to use SLA for delay detection, to enable them to investigate any possible issues in their workflow pre-emptively. 
  2. If a dag is ever changed to rename the task_ids or add new task_ids, the first runs of the new task_ids will never have their SLAs evaluated. 
  3. SLA was defined as a timedelta relative to the dagrun.data_interval_end based on the timetable of the scheduled job.  Some DAGs are triggered externally - either manually or via a Dataset - meaning they don’t have a fixed schedule or start time preventing the evaluation of an SLA miss. 
  4. Sla_miss_callback was an attribute of the DAG, but the SLAs were defined as attributes of the individual tasks. This is counterintuitive, and does not allow users to run callbacks that are customized at the task level if deadlines are missed.
  5. The old implementation triggered the parsing of dag files when an sla_miss_callback was sent to the DagFileProcessor. This means that if an sla was defined within a dag, we may end up parsing the dag file more frequently than at the frequency defined by min_file_process_interval because the sla_miss_callbacks were sent to the DagFileProcessor every time dag_run.update_state was called. This is especially bad for sla_miss_callbacks compared to the other dag level callbacks, because sla_miss_callback was requested every time we examine and attempt to schedule the dagrun, even if there were no state changes within the dag.

Considerations

As stated above, this AIP intentionally ignores all previous discussion on the topic or fixing, changing, redefining, or re-implementing "SLA".   Previous discussions have been ongoing for a very long time, across many different mediums, and have gone through multiple proposals.  It is impossible to summarize or collate all those discussions, so treat this as a "blank slate" proposal and do not assume any other changes outside of what is explicitly stated herein.

Definitions

As this is a topic that has been discussed many times, in an effort to avoid confusion, I am using the following definitions in this project.

Reference / Reference Timestamp

An Enum will contain a selection of options, similar to how a user currently provides the TriggerRule.   The implementation will include an interface which allows future work to easily add new reference timestamps, but they will not be a freeform input for the user.  These options will resolve to a datetime() representing the possible points in time to start counting from.  This value can be dynamic (examples include NEXT_DAGRUN_EXECUTION_DATE  or DAGRUN_QUEUED_AT, which map to their respective database fields), or a static  date/time value provided as a string and cast to a datetime() object.

Delta

A datetime.timedelta.  When added to the reference timestamp you get the “need-by date”.  (see below)

Need-by

“Need-by” will be defined as the reference timestamp (see above) plus some delta.  For example,  `need_by = DeadlineAlerts.NEXT_DAGRUN_EXECUTION_DATE + 1_hour`

Callback

A Callable (or, by extension, a functools.partial) to execute when a miss is detected.  This implementation will allow for either a custom callback, as seen in success_callback and failure_callback, and special attention will go into making sure using any existing or custom Notifiers is as seamless as possible.

Why is it needed?

Before diving into the challenges with the current SLA feature, let's consider what our users are expecting from an SLA feature in Airflow:

(Credit for user stories to Shubham Mehta )

Scenario 1: The Mission-Critical Daily Report

Elmo, a data engineer at a large e-commerce company, manages a DAG that generates a critical daily sales report. This report must be ready by 9 AM every day for the executive team's business review.

Elmo expects:

  • To set a DAG-level deadline of 9 AM for the entire workflow
  • To receive immediate notifications if the DAG is running behind schedule and might miss the deadline
  • To easily adjust the Alert for weekends and holidays, while having ability to define deadline in local timezone
  • To see a historical view of how often the DAG meets its SLA, helping him identify trends and potential improvements

Scenario 2: The Long-Running Processing Task

Rosita, a data scientist at a finance firm, works with a DAG that includes a data processing task which transforms the data using EMR extracted from an external API. This task occasionally runs much longer than expected, delaying downstream processes.

Rosita needs:

  • To set a maximum execution time for the extract task
  • To receive notifications when the task exceeds its expected duration
  • To log detailed information when the expected time limit is breached, helping her investigate and optimize the task

Scenario 3: The ETL Pipeline

Zoe, an engineer at a tech startup, manages a DAG with multiple stages: data ingestion, preparation, processing, and reporting. She needs to ensure each stage completes within a specific timeframe.

Zoe requires:

  • To set individual Alerts for different tasks within her DAG
  • To receive notifications if any task in the pipeline exceeds its timeframe
  • To easily view which parts of her pipeline are consistently meeting or missing their deadlines

What change do you propose to make?

  1. In dag_processing/processor.py add "save next SLA": 
    1. If this DAG has a Deadline configured, then calculate the “need-by” and store the timestamp in a sorted-by-closest-timestamp object (or db table like the old SLAMiss table?)
  2. In the scheduler loop add “check for Deadline misses”:
    1. If the first/soonest sla has passed: pop the timestamp, queue the callback to run on the worker [possible future work to add a new process similar to a Triggerer], recurse.
  3. Add the Deadline Alert handler back in where the previous SLA callback handler used to live.
  4. When a dagrun ends, attempt to remove its SLA timestamp(s).

Examples

I may either go with the class init() pattern or the builder pattern, that will be decided at implementation, but usage would look something like one of these.   I am currently leaning towards the format of Example 2.

Code Block
# Example 1
with DAG(
    dag_id="email_if_not_done_1_hour_after_dagrun_queued_example",
    schedule=None,
    start_date=yesterday,
    tags=["deadline_alerts_example"],
    deadline=Deadline(
        reference=DeadlineReference.DAGRUN_QUEUED_AT,
        interval=datetime.timedelta(hour=1),
        callback=send_smtp_notification(
            from_email="someone@mail.com",
            to="someone@mail.com",
            subject="[Error] The dag {{ dag.dag_id }} failed",
            html_content="debug logs",
        )
     )
) as dag:
    ...


Code Block
# Example 2
with DAG(
    dag_id="custom_callback_if_not_done_1_hour_after_dagrun_scheduled_example",
    schedule=daily,
    start_date=yesterday,
    tags=["deadline_alerts_example"],
    deadline=DeadlineAlerts.DAGRUN_SCHEDULED_AT.build(
        interval=datetime.timedelta(hour=1),
        callback=functools.partial(my_custom_callback, arg1="Something went wrong!", arg2="Error Code: 123"),
    )
) as dag:
    ...


Code Block
# Example 3
with DAG(
    dag_id="same_as_above_but_longer",
    schedule=daily,
    start_date=yesterday,
    tags=["deadline_alerts_example"],
    deadline=DeadlineAlerts.DAGRUN_SCHEDULED_AT
        	.with_interval(datetime.timedelta(days=1))
        	.callback(my_custom_callback)
        	.callback_kwargs({"arg1": "Something went wrong!", "arg2": "Error Code: 123"})
			.build()
) as dag:
    ...


Code Block
# Example 4
with DAG(
    dag_id="email_if_not_run_in_over_24_hours_example",
    schedule=daily,
    start_date=yesterday,
    tags=["deadline_alerts_example"],
    deadline=DeadlineAlerts.DAG_LAST_EXECUTION_TIME.build(
        interval=datetime.timedelta(hour=1),
        callback=send_smtp_notification(
            from_email="someone@mail.com",
            to="someone@mail.com",
            subject="[Error] The dag {{ dag.dag_id }} failed",
            html_content="debug logs",
        )
	)    
) as dag:
    ...

Metrics

New metrics will embrace an OTel-first design plans, meaning tags will be considered from the beginning, not an afterthought

  1. deadline_alerts_triggered: A counter tagged with the dag_id so a user can see how many times a given DAG has caused a Deadline Alert.
  2. Other New Metrics Here??

Manually-triggered DAG support

Code will be added to the dagrun trigger so that manually-triggered dag will set “scheduled_start_time” to “utcnow()”

Extensibility

Changes will be made in such a way to minimize the conflict with the other potential Deadline types which are mentioned below under "Future Work"[Appendix 1].

Current Reference Candidates [See also: Appendix 2]

  • dag_run - execution_date
  • dag_run - queued_at
  • job - start_date
  • serialized_dag - last_updated
  • dataset - created_at (future work)
  • dataset - updated_at (future work)


What problem does it solve?

Allowing the user to decide which point int he DAG's lifecycle the need-by time is based off will help clear up much of the confusion around the old SLA implementation.

Are there any downsides to this change?


Which users are affected by the change?

All users who currently use the SLA feature will need to migrate.  The SLA feature has been removed in 3.0 and replaced with a notification to this effect, and the documentation for the Deadline Alerts will include examples to set an Alert which works the same way as the old SLA to help minimize the transition effort for those users who liked the old way.

How are users affected by the change? (e.g. DB upgrade required?)

The SLAMiss table has been removed in 3.0 and this implementation will likely replace it with a similar DeadlineAlerts table which maps dagrun_id::timestamp.


What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3)

DAGs which implement the old SLA feature will need to be manually migrated.  Documentation will include examples showing the old vs new configurations to make that as easy as possible

What defines this AIP as "done"?

This AIP will only cover DAG-level Deadlines. 


Appendices

Appendix 1: Future Work

Future work could add:

  • Task-level Deadlines (Task within a DAG did not finish before a given time)
  • Dataset Deadlines (Dataset has not been modified before a given time)
  • Update the Alerts view on the frontend to include a user-defined failure code or message.
  • A new sidecar process, equivalent to a Triggerer, which can offload the processing of the Deadline Alert callbacks may be future work.

Appendix 2: Other timestamps in the databases that could possibly be used

This is a list of all timestamps in the DB, included purely to inspire ideas of other ways which this feature might be used.  Many of these are not practical to use as a basis for a Deadline, but user cases may come with with interesting ideas and these are included here for reference.  The list is formatted as table_name - field_name 

  • log - dttm
  • log - execution_date
  • job - end_date
  • job - latest_heartbeat
  • job - start_date
  • callback_request - created_at
  • dag_code - last_updated
  • dag_pickle - created_dttm
  • ab_user - changed_on
  • ab_user - created_on
  • ab_user - last_login
  • ab_register_user - registration_date
  • sla_miss - execution_date
  • sla_miss - timestamp
  • import_error - timestamp
  • serialized_dag - last_updated
  • dag_schedule_dataset_alias_reference - created_at
  • dag_schedule_dataset_alias_reference - updated_at
  • dataset - created_at
  • dataset - updated_at
  • dag_schedule_dataset_reference - created_at
  • dag_schedule_dataset_reference - updated_at
  • task_outlet_dataset_reference - created_at
  • task_outlet_dataset_reference - updated_at
  • dataset_dag_run_queue - created_at
  • dataset_event - timestamp
  • dag - last_expired
  • dag - last_parsed_time
  • dag - last_pickled
  • dag - next_dagrun
  • dag - next_dagrun_create_after
  • dag - next_dagrun_data_interval_end
  • dag - next_dagrun_data_interval_start
  • dag_warning - timestamp
  • log_template - created_at
  • dag_run - data_interval_end
  • dag_run - data_interval_start
  • dag_run - end_date
  • dag_run - execution_date
  • dag_run - last_scheduling_decision
  • dag_run - queued_at
  • dag_run - start_date
  • dag_run - updated_at
  • task_instance - end_date
  • task_instance - queued_dttm
  • task_instance - start_date
  • task_instance - trigger_timeout
  • task_instance - updated_at
  • dag_run_note - created_at
  • dag_run_note - updated_at
  • task_reschedule - end_date
  • task_reschedule - reschedule_date
  • task_reschedule - start_date
  • task_fail - end_date
  • task_fail - start_date
  • xcom - timestamp
  • task_instance_note - created_at
  • task_instance_note - updated_at
  • task_instance_history - end_date
  • task_instance_history - queued_dttm
  • task_instance_history - start_date
  • task_instance_history - trigger_timeout
  • task_instance_history - updated_at
  • trigger - created_date
  • session - expiry


Test Plan

Each DeadlineReference type is tested with both supported callback types (Async Notifier, Custom Async Callback) using either one or two 30-second tasks against short (10s) and long (100s) intervals to verify callback behavior when task duration exceeds or falls within the deadline interval. For Async Notifier tests, use SlackWebhookNotifier as it is currently the only notifier supporting async operations. For Custom Async Callback implementation details, refer to the Deadlines documentation page.


For example: A Dag with one task which sleeps for 30s and an interval of 100s should not execute the callback.  A Dag with one task which sleeps for 30s and an interval of 10s has missed the Deadline and should call the callback.

Test Procedure:

For each test case:

  1. Create a new DAG configured with the specified DeadlineReference, interval, and callback in the Dag's deadline 
  2. Configure the specified number of 30-second tasks within the DAG, run sequentially.
  3. Deploy and run the DAG in Airflow 3.1 or greater
  4. Monitor the deadline database table for changes

Expected Results:

  • For test cases where "Deadline Expired" is True:
    • The overall run time of the Dag exceeds the interval: 
      • The callback should be automatically executed
      • The corresponding row in the deadline table should be updated with a callback_state
  • For test cases where "Deadline Expired" is False:
    •  The overall run time of the Dag does not exceed the interval:
      • No callback should be executed
      • The corresponding row in the deadline table should be automatically removed by the system

Note: All database changes (state updates and row removals) are performed automatically by the system. The tester's role is to verify these changes occur as expected.

Baseline Test:

TC-000 should be creating a Dag without a Deadline and running it to confirm that the environment is working as expected.


Test CaseDeadlineReferenceCallback TypeNumber of 30s TasksInterval LengthDeadline Expired
TC-001DeadlineReference.DAGRUN_QUEUED_ATAsync Notifier1100sFalse
TC-002DeadlineReference.DAGRUN_QUEUED_ATAsync Notifier110sTrue
TC-003DeadlineReference.DAGRUN_QUEUED_ATAsync Notifier2100sFalse
TC-004DeadlineReference.DAGRUN_QUEUED_ATAsync Notifier210sTrue
TC-005DeadlineReference.DAGRUN_QUEUED_ATCustom Async Callback1100sFalse
TC-006DeadlineReference.DAGRUN_QUEUED_ATCustom Async Callback110sTrue
TC-007DeadlineReference.DAGRUN_QUEUED_ATCustom Async Callback2100sFalse
TC-008DeadlineReference.DAGRUN_QUEUED_ATCustom Async Callback210sTrue
TC-009DeadlineReference.DAGRUN_LOGICAL_DATEAsync Notifier1100sFalse
TC-010DeadlineReference.DAGRUN_LOGICAL_DATEAsync Notifier110sTrue
TC-011DeadlineReference.DAGRUN_LOGICAL_DATEAsync Notifier2100sFalse
TC-012DeadlineReference.DAGRUN_LOGICAL_DATEAsync Notifier210sTrue
TC-013DeadlineReference.DAGRUN_LOGICAL_DATECustom Async Callback1100sFalse
TC-014DeadlineReference.DAGRUN_LOGICAL_DATECustom Async Callback110sTrue
TC-015DeadlineReference.DAGRUN_LOGICAL_DATECustom Async Callback2100sFalse
TC-016DeadlineReference.DAGRUN_LOGICAL_DATECustom Async Callback210sTrue
TC-017DeadlineReference.FIXED_DATETIMEAsync Notifier1100sFalse
TC-018DeadlineReference.FIXED_DATETIMEAsync Notifier110sTrue
TC-019DeadlineReference.FIXED_DATETIMEAsync Notifier2100sFalse
TC-020DeadlineReference.FIXED_DATETIMEAsync Notifier210sTrue
TC-021DeadlineReference.FIXED_DATETIMECustom Async Callback1100sFalse
TC-022DeadlineReference.FIXED_DATETIMECustom Async Callback110sTrue
TC-023DeadlineReference.FIXED_DATETIMECustom Async Callback2100sFalse
TC-024DeadlineReference.FIXED_DATETIMECustom Async Callback210sTrue