DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Motivation
The critical section in Airflow's scheduler uses an optimistic strategy: it fetches a batch of scheduled task instances from the database (up to max_tis_per_query), then filters them in Python against all concurrency limits:
- pool slots
max_active_tasks_per_dagmax_active_tis_per_dag,max_active_tis_per_dagrun- executor slots (exception as it's local to scheduler).
This works well enough for standard workloads. However, in large-scale deployments with thousands of tasks—often driven by dynamic task mapping—it causes starvation, where the scheduler fixates on tasks from one constrained group, discards most after checks, and queues far fewer than possible per cycle. The issue appears identically across every one of these limits. It was first noted with prioritized pools nearly full, starving lower-priority ones despite free slots (Issue 45636). Christos Bisias highlighted the same with huge DAGs hitting max_active_tasks. (Mail discussion). The pattern repeats for symmetrically for other concurrency limits: instead of skipping to viable tasks, the scheduler loops over the same ineligible set.
Narrow fixes like PR 54103 tackle one limit but ignore the rest, and production environments differ widely—from DAG run floods to mapped task surges or complex priorities. Building all limits into the initial query has been tough, often infeasible in the current design. PR 53492 tried window functions and lateral joins for a full solution, but orthogonal limits (pools independent of DAG caps) led to poor SQL performance and unresolved edges. PR 55537 utilized pessimistic DB procedures, which looks promising as it solves the problem, but still needs thorough examination and community discussion. Rethinking the priorities in airflow seems to be the simplest method, not yet examined practically (no POC created), where dag-level priorities are introduced, while the current task-level priorities will become internal to each dagrun, this methodology relies on changing the task-creation method, creating only tasks we can run with all concurrency limits considered per dagrun, and simplifying the main scheduler loop, fetching only max_tis tasks once, and moving them to the executor, in a single iteration, as any task in the scheduled state, is guaranteed to be able to run without further checks (other than executor_slots ).
Large-scale reliability is the goal here. Dynamic mapping enables massive DAGs for per-item batching (one task per file or data pointer), so Airflow has to handle it without breaking. Community benchmarks on real heavy loads would help confirm solutions, and we need queuing that's truly resistant to starvation under any concurrency limit. This proposal aims to serve the updated summary of our research in improving the scheduler's throughput and eliminating starvation. As there's still uncertainty regarding the desired outcome, we present all the strategies we tried or thought of, and expect a fruitful discussion.
Terminology
Optimistic scheduling refers to planning based on the best-case scenario assumptions, or in other words, making a single simple query, and iterating over the results, choosing the tasks which can be scheduled from the tasks returned by the given query.
Pessimistic scheduling is based on the worst-case scenario assumptions, or in other words, performing an exhustive scan on all tasks untill we either run out of tasks to examine or until we get to the desired amount of tasks which can run.
Hybrid scheduling is a general term for an adaptive strategy that can behave like optimistic or pessimistic algorithm, depending on the actual workflow.
Task scheduling in this AIP refers to the process of enqueueing tasks (sending tasks to executor, making them run)
Considerations
What change do you propose to make?
Change the methodology in which the scheduler currently decides which tasks to run, allowing for an increased throughput of the scheduler and running more tasks with a wider variety of workflows without causing noisy neighbours during scheduling.
This will be done through changing the semantics of the priorities in Airflow, given that now they are not clearly defined, yet they are at the root cause of scheduler starvation or cluster slowness, as they decide the ordering of tasks to be looked at, which does not change dynamically, meaning that if we have long running tasks blocked by a concurrency limit (i.e max_active_tasks_per_dag), the slot will not be available for scheduling until it gets to run, meaning that even if we have all tasks which can run, we will not schedule max_tis tasks, rather at the best case scenario, the scheduler will only be able to schedule max_tis - 1 tasks.
The proposed change is to introduce dynamic priorities, meaning that priorities will be defined during querying, according to multiple factors, including the time since last examined and task urgency (how close are we to missing the sla) with some kind of obsolescence algorithm along with general heuristics to allow for better scheduling decisions, where we do not starve tasks which can run, while allowing higher priority tasks to run before lower priority ones.
This AIP is open for discussion, the chosen method might change when new ideas arrise, currently, the proposed solution is for priority semantics to change.
Why is it needed?
Large-scale Airflow deployments with dynamic task mapping—generating thousands of tasks per DAG—suffer severe scheduler starvation. The optimistic strategy fetches batches of tasks but discards most due to concurrency limits, queuing almost nothing per cycle while workers idle and tasks pile up in scheduled state indefinitely. This change ensures scalable, equitable queuing across all limits, avoiding partial fixes and enabling reliable performance at extreme volumes.
Integration With Deadline Alerts
With the new addition of deadline alerts, where the task callbacks are moved away from the dag-processor into a task of their own, meaning that they will now take up an executor slot and act just like any other task, and as part of this AIP, we need to allow the callbacks to run as quick as possible (or at least be able to configure the behaviour), and so we will need to, as part of the new priority algorithm, add some kind of configurable weight to those callbacks, where we can decide if they will be considered as higher priority tasks or lower priority tasks.
The callbacks are assumed to not rely or not subject to any concurrency limits (as with regular tasks) and so if we prioritize the, as soon as a query selects the callback, the callback will be guaranteed to be moved to the executor to prepare for running, simplifying the problem with the new addition, yet for this, we will need to be able to distinguish between callbacks and regular tasks, both from inside the database and from python.
Maintaining backwards compatibility
In order to maintain backwargs compatability, we are required to design an algorithm such that the user may opt-in to the new additions of the algorithm, meaning that we have to introduce some kind of "weights" which decide the influence of the given scheduling parameters (i.e urgency, time since last viewed etc) having the default set to 0 (other than the current priority_weight).
HITL Urgency - manual intervention
TBD
How is the airflow scheduler different?
Many of the studied and optimized schedulers (along with the algorithms) mainly refer to schedulers with preemptive abilities, being able to preempt tasks as needed and let them run when resources are available, in airflow this is not the case, as airflow does not have the ability to preempt a task mid-run, and rather, has to wait for the task to completly finish.
This introduces challanges such as and edge cases which do not exist in other schedulers, in example, what happens when there are a lot of tasks running, and a high priority task occupying more than 1 pool slot (i.e 10) and it constantly gets postponed due to not having enough pool slots to run, this is an issue with the general optimistic approach, where if you cannot run any task, you ignore it, yet here, the effect might get magnified (if urgency is not a factor) where tasks constantly run not allowing the big task to run, a mitigation can be to move the task to a different pool, having other tasks not prevent the big task to run.
The variety of the airflow jobs makes the challenge even bigger, as we can have tasks that run for seconds, and tasks that run for days, and a given task may run for more or less time, depending on the amount of data, they type of the task and even from external changes (such as a slow k8s cluster, data unavailable and more).
Are there any downsides to this change?
There are strategy-specific downsides documented below. The overall effect should be positive for all users.
Which users are affected by the change?
- Large-scale deployments
- Deployments shared by several teams
- Power users - clients that heavily utilize concurrency limits and priority weights
How are users affected by the change? (e.g. DB upgrade required?)
A DB upgrade might be required, depending on the solution of choice.
- Single-limit pre-filtering:
- No db migration is needed.
- Window functions:
- 2 integer fields need to be added to the task_instance field, nullable and slowly inserted as tasks are created.
- SQL Procedures:
- Same as window functions.
- Priority Semantic Changes:
- Possibly adding a priority field to the dag and or dagrun table.
- Possibility adding a new table to collect heuristics about the tasks, such that they won't need to be computed every time during runtime.
Implementation Considerations
Priority Semantics Changes
A consensus within the community suggests that rethinking priorities is a viable option among the presented alternatives.
The approach where the semantics of priorities change, in order to allow for new prioritization algorithms, which will allow the scheduler to make better decisions, based on multiple parameters, without starving tasks.
Priority use cases
Real use cases must be considered before introducing a mechanism. While prioritizing individual tasks appears to bring a nice-to-have granularity, it should be asked whether this granularity has real use cases. Is there a reason we want to define a total order on all the tasks running in a cluster? Do we think about comparing priority of an email-sending task in one DAG to a data pipeline triggered in another one?
Some may think that priorities, if at all, should be defined on the DAG level. It follows from the notion of DAG being a single, irreducible workflow from the user's point of view.
Current task priorities are used opportunistically - they have impact just on the tasks that are eligible to run in one iteration. There's nothing that prevents a less-prioritized task to "sneak" into an executor slot just one cycle before the more prioritized one comes in. There are even some edge cases where a low-prioritized task "steals" slot of a more prioritized one.
A crucial fact to remember is:
Priorities in Airflow have an impact when a cluster has insufficient resources or when concurrency limits are reached.
This is true, because otherwise we would expect things to "run on time". Priorities are helpful when we don't have enough pool slots, or the scheduler is choking on too many tasks.
Difficulties with task-level priorities
Pessimistic scheduling
Current task-level priorities make it challenging to implement a pessimistic scheduling algorithm in Python code. If we assume:
(0) A large amount of tasks, i.e. too large to be fetched entirely into code in one iteration.
(0') We want to schedule exactly max_tis task instances in every scheduler cycle if available. If not, schedule every task whose concurrency limits allow them to run.
The problem is described in points:
(1) The goal is to get N of the most prioritized tasks whose concurrency limits are met.
(2) Due to high workflow variety, it's hard to maintain a consistent view of tasks that are eligible to be scheduled.
(3) From (0'), (1) and (2) it follows that in every iteration we have to build a view of all the tasks sorted by their priority, and validate for every task if it's eligible to run until max_tis tasks are found that can run.
(4) Doing (3) in code considering (0) means possibly multiple fetches from the DB and in-code iteration. RTT and memory-buffer allocations are factors that make it extremely slow (from benchmarks).
If these points are correct (subject to discussion), then we must change the behavior of task priorities or remove them altogether in order to achieve a pessimistic in-code scheduling.
Non-pessimistic scheduling
As purely optimistic scheduling with constant priorities leads to starvation and decreased throughput mostly because of priority weights and a lack of variety between scheduler iterations, to continue with non-pessimistic scheduling, we still must change the way priorities work.
Theoretical summary
| Strategy Type | Optimistic | Pessimistic | Hybrid |
|---|---|---|---|
| Problem | Constant priority weights lead to starvation due to dropped tasks and low variety between cycles. | Can't be done in Python code due to technical difficulty in processing large amounts of tasks stored in SQL. | |
| Known solutions | SQL stored procedures. | Change priorities mid-run, use task obsolescence or other heuristics that allow more variety for the scheduler choosing tasks to run. |
Rethinking Priorities
Due to the complications with introducing a pessimistic approach (mainly maintnance and readability constraints), and after a discussion in the community, we have decided to choose the option where we rethink and redefine the meanings of priorities, where we give a concrete definition to what priorities mean in airflow, including expected behaviour examples, while mixing the definition with an aging algorithm (along with possibly other heuristics), this is a major semantic change to priorities which now, just means that a higher priority task will be considered more often than a lower priority one, while giving lower priority tasks preference while they get older (looked at less often).
Deciding first on what priorities will define is a first step before we can move on to the implementation, while the aging algorithm can be implemented either way, regardless of the chosen semantic for priorities.
Choosing the priority definition is a crucial part of the solution, and there are a few proposed strategies, arranged by the behavioral change they introduce, least to most.
Proposed Priority Strategies
As we approached on a solution, we decided to move on with changing the priorities in airflow, we have a few possible strategies, each with their upsides and downsides.
In order to decide on the proposed solution, we need the community to help decide on the strategy of choice.
The strategies listed bellow are:
Keeping task-level priorities but making them weaker
Introducing DAG-level priorities, changing task-level priorities
Introducing DAG-level priorities, giving up task-level priorities
Completely Removing All Priorities At All Levels
Keeping task-level priorities but making them weaker
Retain task-level priority weights, but make them flexible to avoid starvation.
Use last_scheduling_decision field of a task or a DAG and apply a mid-run "weight rule" that changes task priorities based on obsolescence heuristic.
Mitigate starvation by using round-robin style scheduling where the priority of the tasks determine the order in which dagruns will be looked at.
An example is taking the highest priority task (awaiting scheduling) of each dag, and multiplying the priority by the delta since the dagrun was considered.
This is a classic solution against starving less-prioritized tasks - punish tasks that couldn't be scheduled for too long.
This causes the least api-breakage for users, while providing a good solution to starvation.
Though it is harder to implement, requires SQL queries to update the priorities mid-run.
Adds even more to opportunistic nature of priorities and draws us away from the original use-case - DAGs with long but important tasks can be deprioritized at critical moments.
Starvation is mitigated, but the algorithm still remains optimistic - meaning some time may be needed to adjust for starved, highly-prioritized tasks and move forward.
Although the feature remains, logic still changes which can lead to possible breakages.
Though, even with all of that,to avoid deprioritizing important tasks, the user will be able to define a policy that controls how quickly we're going to allow such deprioritization.
The logic change matches the opportunistic nature of task priorities that become even more opportunistic. You apparently can't rely much on priority weights anymore, but you couldn't rely on them before either.
Conclusion
API breakage is minimal, though logic still changes considerably. Imposes higher maintenance burden and more implementation complexity, yet mitigates starvation in a classic way of punishing long-awaiting tasks.
Introducing DAG-level priorities, changing task-level priorities
Introduce DAG-level priorities, where each DAG run created inherits the priority of the DAG it belongs to.
DAG-level priority acts as a global priority deciding which DAGs will be looked at first.
Task-level priority acts as an inter-dagrun prioritization, where tasks of the same dagrun are considered in order of the priority of the given tasks
Use heuristics to punish DAG runs whose tasks couldn't be scheduled for too long.
This stems from a use case of prioritizing entire workflows. Classic solution against starving less-prioritized DAGs - punish DAGs that couldn't be scheduled for too long.
While also adding more control over the ordering in which tasks are considered, giving the user fine-grained priority control.
This allows us to accurately match the workflow model of Airflow, starvation is mitigated by deprioritizing DAG runs that waited for too long. Meets well the prioritization use case.
This is not without drawbacks, behavioral API breakage is added (long-existing feature changed). Unexpected reliance on task-level priorities may change the execution of users' workflows.
Adds even more to opportunistic nature of priorities and draws us away from the original use-case - DAGs with long but important tasks can be deprioritized at critical moments (depending on the algorithm of choice).
Starvation is mitigated, but the algorithm still remains optimistic - meaning some time may be needed to adjust for starved, highly-prioritized tasks and move forward.
Though to solve this, we implement solutions from the previous idea as well of keeping the priorities without adding new priorities and to avoid deprioritizing important DAGs, the user will be able to define a policy that controls how quickly we're going to allow such deprioritization.
Conclusion
API breakage is minimal, though logic still changes considerably. Imposes higher maintenance burden and more implementation complexity, yet mitigates starvation in a classic way of punishing long-awaiting tasks.
Introducing DAG-level priorities, giving up task-level priorities
The approach suggests to introduce DAG-level priorities, where each DAG run created inherits the priority of the DAG it belongs to.
DAG-level priority acts as a global priority deciding which DAGs will be looked at first. To retain optimistic scheduling and prevent starvation at the same time, as in the strategy on the left, a notion of obsolescence is needed.
Task level priorities are ignored and not taken into account while performing scheduling decisions, and are kept solely for backwards compatibility.
The approach stems from a use case of prioritizing entire workflows. Classic solution against starving less-prioritized DAGs - punish DAGs that couldn't be scheduled for too long, just as before.
Accurately matches the workflow model of Airflow, starvation is mitigated by deprioritizing DAG runs that waited for too long. Meets well the prioritization use case.
Behavioral API breakage is introduced (long-existing feature removed). Unexpected reliance on task-level priorities may break the execution of users' workflows.
It adds even more to opportunistic nature of priorities and draws us away from the original use-case - DAGs with long but important tasks can be deprioritized at critical moments (depending on the algorithm of choice).
Starvation is mitigated, but the algorithm still remains optimistic - meaning some time may be needed to adjust for starved, highly-prioritized tasks and move forward.
just like the previous strategy proposed, to avoid deprioritizing important DAGs, the user will be able to define a policy that controls how quickly we're going to allow such deprioritization.
Conclusion
The approach that matches the use case of prioritizing important workflows in conditions of resource deficit, makes priorities more comprehensive and easier to control for the end user despite giving up some granularity. Imposes some Behavioral API breakage, as task-level priorities won't do anything anymore.
Completely removing all priorities at all levels
Suggests to remove all priorities, use plain round-robin scheduling by last_scheduling_decision of the DAG run.
Check only the concurrency limits, and create tasks without looking or caring about ordering.
This approach is plain and simple, easy to implement and does not induce any complexity, leaving the priority_weight without actually using it retains backwards compatibility.
The issue with the given approach is that API breakage is introduced (long-existing feature removed). Unexpected reliance on task-level priorities may change or break users' workflows.
The use case of prioritizing on low resources is not met which requires users to carefully adjust workflow to existing CPU power, risking important tasks stalling.
A decision can be made that running Airflow on insufficient resources is an inherent problem not addressed by the project.
Conclusion
Simplest implementation and maintenance, yet removes a widely used feature. Makes scheduling the simplest job possible. The use case of prioritizing on low resources is not met.
API breakage in the long run, the priority_weight field will just be a placeholder in the task until it is removed.
Migration
A major change such as refactoring priority weights requires a careful consideration for possible breakages and user migrations. As it's hard to know how people use priority weights, there is a need for a community brainstorm regarding possible breakages and how we can mitigate them.
A hybrid approach where task-level priority weights are retained is the easiest one to handle, as a toggle for enabling the hybrid approach may be added. For example, the obsolescence algorithm may be turned on/off or tuned with user-specified policy, see Separation of mechanism and policy.
Theoretically, every approach taken may be enabled or disabled with a toggle for several releases, allowing users to tests their workflows with the new strategy (in case task-level priority weights are removed).
Proposed Obsolescence Algorithms
Obsolescence Algorithm WORK IN PROGRESS
Implementing any change in the priority semantics still leaves place for starvation, where same prioritized dagruns are considered over and over, without considering other lower-priority dagrun's tasks, even if we can run the given tasks, and so in this case, an obsolescence algorithm comes into play.
Having an obsolescence algorithm means that even if there are more prioritized dagruns, their relative priority changes, as we take into a count the time since last scheduling descision on the current dagruns, meaning that the older the dagrun, the higher priority it will be, while a higher priority dagrun which is old, will still be looked at before a lesser prioritized dagrun waiting for the same amount of time.
This introduces the complication of choosing the obsolescence algorithm, such that it remains simple, easy to implement and understand while still allowing for round-robin style prioritizd scheduling.
As part of the algorithm, we might also look into dag defined sla's, and prioritize dags with a closer sla, in addition to all the aging part, where we also consider last_scheduling_decision minus the current time.
A few algorithms emerge, that allow the scheduler to make smarter decisions, and ensuring that tasks do not starve, in order, from simplest to most complex.
| Algorithm | Simple Weighted Aging | Improved Weighted Aging | Weighted Aging and SLA Urgency |
|---|---|---|---|
| Motivation and Explanation | The simplest algorithm, which will work for most cases. To get the runtime priority of a workflow, take the defined priority of the workflow, apply a simple arithmetic operation (such as multiply the values), and order by the result. This allows for higher priority tasks to be looked at more often, though can cause issues when a priority is to high. The choice of the arithmetic operation can vary the behaviour drasticly, from giving the priority more effect on the scheduling decisions or less. | An approach similar to the simple weighted aging aproach, yet the choice of the aging algorithm is a smarter algorithm allowing for complex scheduling behavior with simple logic, where no workflow types are left to starve. This is a promising choice, as it is both simple and effective at solving the issue, yet it will most likely lack tunability, as there will most likely be only 1 or 2 variables to tune, which will change either the weight of the priority or the weight of the waiting time. | A weighted aging algorithm with urgency of tasks, which is needed to solve the "starvation of highly prioritized tasks in critical moments", where we consider the sla urgency of a given dag along with the priority, this allows for a complete optimistic solution for task scheduling while significantly mitigating starvation across the workflows. This is the more complex solution, yet this solution is the one which solves all of our issues, and can be extended to various directions, with countless improvements which can be built uppon the given solution. Possibly add the average dagrun or task runtime to the urgency heuristic to make more accurate predictions, yet it might introduce more complexity than the potential performance gain. All while allowing for inter-dagrun priorities with the current A proposed aging algorithm is shown below. score(J,t)=wp⋅Pbase(J)+wa⋅waiting_time(J,t)+wd⋅urgency(J,t) wa - arrival weight; wd - deadline/sla weight; t - current time. Pbase(J) - base priority of J(ob) urgency - how close the sla is (sla end time - current time ([possibly] - average Job/workflow runtime)) waiting time - time since last examined score - runtime priority |
Advantages |
|
|
|
Drawbacks |
|
|
|
| Summary | The simplest of the bunch, yet might not fully solve all issues for all possible scenarios. | An improved alternative to the simple weighted aging, fitting a wider spectrum of workflows, while not hindering implementation complexity. | The most complex behavior-wise yet solves the widest variety of use cases, allowing for tunability and behavioral control. Not more complex than any of the other proposals, while solving starvation of tasks in a relatively optimistic manner. |
The proposed solutions are simplest to implement at the task creation stage: concurrency limits can be assessed once per scheduler loop, ensuring only eligible tasks are created. This approach streamlines the current scheduler loop by executing a single database query and dispatching tasks directly to their respective executors, bypassing redundant checks. Furthermore, this design enables more flexible, round-robin-inspired scheduling and reduces the risk of task starvation. Re-evaluating priority logic also allows for future innovation; the concepts considered to date are as follows:
What defines this AIP as "done"?
The Airflow scheduler's performance improves in terms of queued tasks over a time frame. Any unnecessary starvation is eliminated after improving the algorithm of the task queueing logic.
Appendix
Alternatives considered
| Strategy | Current optimistic | Single-limit pre-filtering | Window functions | SQL procedures / exhaustive linear scan | Priority Semantics Changes |
| Description | Fetch up to | Use a single limit, like max_active_tasks and eliminate starvation on that limit using lateral join / window function. | Nested window functions/lateral joins incorporate multiple limits in a single query. | DB-stored procedures (PostgreSQL/MySQL) or Python (SQlite) scan sorted TIs exhaustively until enough queueable ones found. | Increase the scheduler's variety of choice by giving up the concept of task-level priorities. |
| Motivation | Simple, fast for small/medium workloads; no complex SQL. | Improves performance by considering just one most common concurrency limit which works for many users. | Theoretically, could allow checking multiple concurrency limits in a single SQL query. | Universal solution for all orthogonal limits; eliminates post-fetch drops entirely. Runs on DB side and prevents RTT latencies / slow Python processing. | Priorities make it very hard to come up with a viable in-code solution as every single task should be considered in each iteration. Allowing for a change in the priorities defenition (which as of now is not clearly defined) will allow us to consider tasks in a simple manner without causing a maintnance overhead while solving starvation |
| Advantages | Minimal query overhead; works across DBs including SQLite; easy to maintain/debug, as most logic is in Python. | Boosts throughput for large DAGs (hundreds of tasks); avoids wasting cycles re-querying saturated DAGs. | Theoretically comprehensive, single query for all DB vendors. | Throughput gain even with multiple limits, fewest scheduler iterations, blazingly-fast linear scanning. Solves all starvation cases by using a simple and algorithmically efficient scan on DB side where the data resides. | Per-task priorities are incomprehensible and significantly complicate scheduling. Directly removing any scheduling strategy relying on obsolescence, meaning round-robin style scheduling is not possible. Using an obsolescence based priority algorithm allows for a clearly defined priority behaviour, while keeping scheduling descisions effiecient and fair. |
| Drawbacks | Throughput degradation that leads to starvation in case the average number of schedulable tasks per query is low across iterations. | Considers just one limit and solves just part of the problem, while some limits continue to starve. For example, while max_active_tasks is handled in the query, pool limits may cause tasks to be dropped. | Window functions' behavior is not suitable for Airflow task concurrency model, and they don't solve all cases while imposing a large performance overhead. | Requires several implementations for different DB vendors, may be considered a technical debt. The logic is difficult to test. SQLite doesn't support procedures at all. | Requires a complete refactor of priority logic which is a breaking change. Breakage in the current expected behaviour, may have an effect on existing workflows. Defining a good obsolescence scheduling heuristic can be hard and tedious, as it requires trial and error. |
| Drawback mitigations | Deploy multiple Airflow clusters for large workloads. | It solves the common case of DAG run level limit. Don't define complex concurrency constraints in workloads. | Cases where tasks are dropped with this approach are rare. | DB vendors are not added too often, as critical section algorithm isn't frequently updated, if at all. If it is, adding the logic in several places isn't a big overhead. Keep the logic as simple as possible so no testing is needed. Mimic the algorithm in Python for SQLite. | The change is beneficial to the project as it both contributes to solving starvation and provides a more useful priority feature, while allowing to control which workflows are more important than the others, without starving other workflows. |
| Summary | Serves typical workloads well but fundamentally incapable of handling large-scale starvation without repeated post-fetch drops. | Helpful short-term throughput boost for one common limit, but can't be a long-term solution since other concurrency limits like pools and max_active_tis_per_dag/dagrun remain frequently used and broken. | Theoretically elegant but ruled out—unsuitable for Airflow's multi-dimensional concurrency model and imposes a slight performance overhead (up to 10x slower on large tables). | A viable path forward; benchmarks show 1.7-2x throughput gains across single/dual/triple limit workloads. Remaining technical issues (metrics, SQLite, DB fields) are resolvable, drawback mitigation is possible. | Promising alternative if community reaches consensus on desired priority semantics; enables simpler in-code round-robin scheduling by reducing per-task priority complexity. |
| POC | Runs in production | PR 54103 | PR 53492 | PR 55537 | Still to come, may be several POCs depending on the number of implementations |
27 Comments
Ash Berlin-Taylor
Given this is such a power-user-only feature, this is a firm No from me right now.
`
package:"com.my.scheduler.NewScheduler"`If you can write your own scheduler class already, you can make it runnable with `python -m
com.my.scheduler.NewScheduler` and not runairflow scheduler– we don't need to change anything in Airflow to support this.Jarek Potiuk
Yeah. Quite agree with it. If you want to make your own scheduler that works differently but still uses airflow DB, you can definitely do it.
natanel
Hmm, I understand, the point of the AIP is not to run my own scheduler, but rather allow for faster development of the scheduler, while allowing all airflow users who experience a problem to try the scheduler for themselves without full commitment of changing all the schedulers together.
Are there any changes that, in your opinion, would make it more appealing? As the main problem I am trying to solve is that scheduler changes are sensitive and need a lot of testing prior, while forking is an option, from my experience people tend to not use a fork, because they do not want to be out of sync from airflow, and allowing it to be used as such, will open up the testing for a larger part of the community.
I am looking at what was done with the edge executor, and I want to be able to apply the same principals here.
I would more than appreciate if I could receive additional information and clarification, as I feel like I do not fully understand the given feedback.
Is there anything that can turn the "no" into a "yes" from your side?
Thank you.
Jarek Potiuk
I think the main point is that by replacing scheduler, you have different airflow and the issue with it is that you basically replace **everything** scheduler does. I think if you would like to have different scheduling algoithm (and implementation) done - this should not be done via creating a generic "pluggable" scheduler option, but by proposing a completely new, complete scheduling algorithm that you want to implement, describing all the details and logic, and adding a new
airflow experimental_xxx_schedulercommand→ but that would have to:* have a concrete implementation POC PR of it
* have a design and description of the scheduling algorithms used and logic of the new scheduler
* have an explanation of the use case where current scheduler limitations show and how the other scheduler implements it (and explain why current scheduler cannot be improved)
* have some benchmark comparision
This would be a good start and something that could spark discussion - for example if instead we can modify the current scheduler to do similar things or maybe it would give us understandaing that maybe we should redesign the scheduler and either merge the new scheduler into old, or maybe extend the new scheduler to be more flexible. Eventually maybe even replace the old scheduler with it, or maybe indeed realise that there are some common parts and attempt eventually to make scheduler pluggable if we see that there is enough overlap and common parts that can be reused.
My rule of thumb for such things is that you come up with a good abstraction if you have 3 working implementations that you can look at commonalities and turn into common abstraction.
Starting from adding new kind of abstraction when we even have no idea what could be common between such classes and what "scheduler" API could look like is just plain wrong.
And really - the abstraction you propose is alraedy there - it's called "Job".
Your proposal does not do anything else that creating another Job type - we alread have SchedulerJob, TriggererJob and other Jobs - what you are adding is another "OtherSchedulerJob". If you want "another scheduler" run there you need to create the JobRunner anyway and implement all of it. And in your proposal the only thing you propose is instead of creating new class and implementing new command in the code (which is easy), is adding more declarative way of choosing which job should be run when you run
airflow scheduler. Which makes very little sense and only adds confusion.Simply - first propose implementation of the new scheduler, and then we can start worrying on what kind of abstraction (if any) we want to do for it.
Jarek Potiuk
Also, If you want to **really** iterate on your own scheduler as 3rd-party component, We already have this one merged: https://github.com/apache/airflow/pull/59805
This allows you to develop your own provider, that simply adds new CLI command to airflow. So you can totally implement and iterate over your own scheduler implementation by creating a provider, which has a new CLI command added (say `experimental-scheduler` . All that you will need in this case is to expose the new CLI command and implement the execution of the command.
That is - I think - adding exactly the kind of declarative (in the form of provider metadata) - way of adding new "whatever" command to airflow - including new scheduler command that you will run instead of regular scheduler, which I understand the gist of your proposal is.
natanel
I understand, so as of in your opinion, it is better to turn this AIP into a scheduler improvement AIP, and using the providers as an experimental scheduler as a provider, to get the ease of running the new scheduler along with running our benchmarks?
Did I understand correctly?
Jarek Potiuk
If you have a concrete improvement to scheduler proposals - yes,
natanel
Alright, I have already tried to do it before, held discussions, showed good benchmarks, I will do it here to hold a more organized discussion, as to not have everything scattered around discussions
Jarek Potiuk
As mentioned above - it does mean that you have a POC implementation, and having your own provider with CLI that does what you want to do - is really necessary prerequisite.
Theo S.
I've recently done some job to rewrite this AIP so it clearly reflects the end goal of eliminating starvation rather than making the scheduler injectible.
All the queueing strategies that have been tried until now are documented in one place, and I hope we can finally start a community discussion about those. After some time of trial and error, we can consider this list as a comprehensive set of possible strategies in the current setting. Now that we have them on the table, let's take a closer look and decide on our next step.
As I have some bias towards the stored procedures approach which is the most efficient so far, I would certainly like to refine the implementation and make it production-ready as a provider package so it can be tested by the community. In mail discussions, the approach was met with some friction due to its drawbacks summarized in the AIP, so I wonder if there is interest in testing it further. In addition, the provider will have to contain an alembic script if the two fields for the
TaskInstanceconcurrency limits are not added to the DB before that.Theo S.
I will take a closer look at the priority refactoring as there is some agreement about the need to do so.
natanel
Looks like a great direction for us to investigate, I think that this allows something, I have proposed in one of the discussions, where we only create tasks which can be run, all other tasks stay in none state, as at task creation, we can decide without much overhead if we create the given task.
This does require priorities to be set at dagrun level, having dags also include a priority, this is a breaking change, possible to keep backwards compatibility, and is a pretty minimal change that can be done.
Instead of, as proposed, finding a better algorithm to fetch tasks, we can simplify it by only creating tasks that are guaranteed to be able to run.
[~asquator] do you agree with this approach?
I would love to hear other opinions about the given proposal.
Theo S.
Well, with priorities completely removed we can do the good old round robin scheduling.
Priorities on DAG level are somewhat problematic, because filtering is still needed.
natanel
Removing priorities altogether still requires you to check concurrency limits, meaning round robin still won't work, as now you don't have priorities of weight, but you still have a sorting order, so removing priorities won't help here.
changing the semantics of priorities, meaning task priorities are internal to the dagrun and in the global scope dagruns decide the priority, we can ensure no starvation occurs during task creation.
this is a change in semantics, yet comes with a promise of no starvation (up to the limit of the executor).
Jens Scheffler
Thanks for the write-up. Some points from my side on top of inline comments:
Take a 2-CPU machine and a Dag with N tasks in a (some) Layout, scheduling them is possible and done in <time compared to ? time today. OrWith the Dag of type XYZ the Scheduler crashes today in OOM after this change can run ans schedule at least N tasks per minute on a 2-core machine.Some additional questions:
One additional proposal:
Have you considered the strategy that data is aligned ONCE when tasks are visited first time and a (shadow) table is built-up that allows effective querying based on needed values to decide scheduling - a bit of redundancy to the TI table - but preparing indexes and data dependencies such that querying is optimized for this? This would prevent looping over-and-over for the same tasks and would shift the complexity to a linear problem at time of task creation (trigger Dag run where DB entries are created or when Dag is starting to run).
I have worked in the past in a project where we implemented a large scale "fair" scheduler. We were scheduling ~500k tasks with limits per type. Scanning over 500k tasks of course was not possible until we created multiple sorted queues (which in theory you could map from algorithm to a Redis or DB table queue) and if tasks are pre-sorted in queue then pulling from multiple/many queues might be easy if you know which pools/limits are exhausted. We had multiple queues so that if one type had too much workload we just took to the next queue. You could say the tasks were sorted in queues of queues or a tree-like structure. In your text above this would be similar like a table containing the "total order" which then can be directly selected on.
As an example, might not be fully matching here: In the solution we had in the past we called it "SLA Driven Scheduling" which you also thought a bit about - each priority class had an SLA like "High: Need to be processed in 5min in 95% of cases", "Medium: Need to be processed in 60min in 95% of cases" and "Batch: Need to be processed in 12h in 95% of cases" - as the priority was based on SLAs the queues were sorted by "target time" and a Batch task pending for 11:55h had effectively the same priority like a "High". Besides the classes of "fairness" (==Pools) we could directly pull tasks in SLA order from queue. High prio tasks were injected by SLA time to could "overtake" tasks which were less important.
One further consideration/option:
Have you considered re-implementing the Scheduler in Rust? As UV and RUFF were real deal-breakers in performance with tight Python binding... and natively support better parallelism with Multi-Threading... Today the Scheduler is very CPU intensive. But also very single-threaded. If I could be 10-times faster in native code and could throw 16 cores on the problem I might be 160-times faster. If we talk about mapped tasks which are the same class of dependency and pool then a caching of scheduling evaluations could also speed up by a factor.
Of course re-writing Scheduler in Rust would be a larger endeavor but might be lowering a lot of CPU footprint (assuming the DB is not the limiting factor).
Taking into consideration that the Scheduler has multiple loops also to schedule CRON/Timetable expressions and creates Dag runs... just the part of task scheduling could be extracted into a parallel loop.
Theo S.
Thank you for the review!
> We also suffer from scheduler slowness and this mainly limits us from mapped tasks >1024. We would like to have 100k mapped possibilities
Glad to hear we're not alone, this is exactly the use case that inspired this discussion.
> As comments above I assume also a lot of CPU work today in Scheduler is spent in loading Dag structure and making a dependency check.
This is an interesting issue, but I haven't looked into it yet. I believe there's a place for improvement (as in every part of the scheduler), but I don't have enough expertise in this part. This discussion mostly deals with starvation caused by concurrency limits which are known as:
Typical log
Not scheduling since the slots of (concurrency limit X) (for task Y) are exhausted
The OOM issue you experience probably deserves a discussion of its own. The task group issue you described also fits there.
> I am missing for the AC's a concrete metric to measure.
The first metric you proposed is the one helpful here. OOM means insufficient resources or unoptimized memory management (may be relevant to what you said about loading large DAGs into memory). Starvation is dealing with throughput, so in benchmarks we're looking at throughput metrics:
It will be helpful to look into the stored procs PR just to see the benchmark data and graphs (very beautifully formatted and thoroughly explained) to get a notion of the desired improvement. The DAGs themselves also appear there.
> Have you considered re-implementing the Scheduler in Rust?
I head about this idea from the community. Simply said - it IS worth it. It's a big endeavor indeed, and one day resources will be found to do it (well, it's real money cost that someone has to have the will to pay).
> Today the Scheduler is very CPU intensive. But also very single-threaded.
Multi-threaded/async scheduler is a great idea. The scheduler has to be divided into several independent parts which run in different execution contexts. The task enqueuer is one of them. I outlined the idea in AIP-70 with the motivation for it. In short - there should not be a linear dependency between code parts that can run on their own, maybe in parallel.
> Have you considered the strategy that data is aligned ONCE when tasks are visited first time and a (shadow) table is built-up that allows effective querying based on needed values to decide scheduling - a bit of redundancy to the TI table - but preparing indexes and data dependencies such that querying is optimized for this? This would prevent looping over-and-over for the same tasks and would shift the complexity to a linear problem at time of task creation (trigger Dag run where DB entries are created or when Dag is starting to run).
It sounds promising, we have to understand what the priority model should be to allow this. In the SLA scheduling you described you had a fixed amount of SLA pools. Airflow allows you to create any number of pools you like, priorities are completely decoupled from pools, and there are concurrency limits orthogonal to pools. Actually, a full linear scan is FINE here, it would work exceptionally well. If we could make a full linear scan of the
task_instancetable, we could later think of optimizing it to avoid scanning some parts of the table. The problem is more technical - doing a linear scan from Python withfetchmanyis slow due to memory allocations and RTTs. Updating the shadow table at runtime is also pretty slow (tried it to implement a linear scan while avoiding fetching same tasks). If the tasks are stored in Redis but you don't know how many times you're going to fetch from Redis, you can get pretty slow execution times in similar fashion. Maybe coupling an execution pool withpriority_weights may ease the job here? I mean define priority per pool, like many would do.Theo S.
Other important decisions to make are:
Current priority behavior doesn't guarantee anything, as it may or may not spare a slot for prioritized tasks depending on other tasks.
natanel
I think that as for the first point, there is not much change, as now priorities are not strict, the only thing a priority does is make the task be considered more times, before other tasks, just like we are proposing here, where we consider higher priority dagruns more than lower priority dagruns.
As for the second point, it can be configured with the weights of the dynamic scoring algorithm of choice, a user may even set the weights to be 0, making workflow consideration rely strictly on the user set priority.
Any other suggestions are welcome, as this is a very important part of the AIP.
natanel
Thank you Jens Scheffler for the review! you have given valuable insights on the problem from areas we have not experienced much issues with.
It seems like we are suffering from mainly the same issues that cause us to start separating clusters
> We also suffer from scheduler slowness and this mainly limits us from mapped tasks >1024. We would like to have 100k mapped possibilities
As Theo S. said, this is mainly the reason we created this AIP.
About the OOM, we also seem to be getting an OOM, once every 3 days the scheduler dies because of an OOM, where the memory slowly creeps up, yet I think it might be unrelated as it happens due to a memory leak, in addition, the solution will in theory reduce memory usage, as we query less from the database (a single query in the scheduler rathet than the loop with the 'task_id not in' part of the query, which is quite memory heavy), as we move more processing to the database, while having the smallest amount of data on the scheduler as we can, to make correct scheduling decisions.
The problem with having to load the entire serialized_dag object to memory, and parse the json could also be part of the solution, yet it depends on how big of a change we are aiming at, as I see it as a major performance bottleneck, I will think of possible solutions, maybe normalizing the table a little and removing the JSON schemas in favour of sql joins, or even a table of dependencies between task_ids, I will think about this issue, as I think we can benefit from a fix here.
About the SLA driven scheduling, it is proposed here, where we check how close we are to missing the SLA, the closer we are, the more prioritized a workflow becomes, we can also include the average runtime of the workflow to aid in this calculation, the problem being we are strongly coupled to the sql database, meaning all computation will have to be done either on the same query, or on an additive base where we do continues upserts to given statistics we want to collect.
> Have you considered the strategy that data is aligned ONCE when tasks are visited first time and a (shadow) table is built-up that allows effective querying based on needed values to decide scheduling - a bit of redundancy to the TI table - but preparing indexes and data dependencies such that querying is optimized for this? This would prevent looping over-and-over for the same tasks and would shift the complexity to a linear problem at time of task creation (trigger Dag run where DB entries are created or when Dag is starting to run).
If I understood correctly, I think I have proposed this in the mailing discussion, where we do a single DB query before we go over all dagruns to decide which dagruns will be looked at, along with adding another query that gets us all the data about the tasks of given dagruns that is needed in order to decide if we can run the task or not, which will make it a linear problem, is that what was meant?
> Have you considered re-implementing the Scheduler in Rust?
Hmm, we have considered this yet only in short discussions, we have not thought too much about this, it might be a good idea, as it might solve memory issues along with performance gains, and it can also be done in parts, where every time we extract one part of the scheduler loop to rust, it is an idea to think about, yet I am afraid the community or other Commiters/PMC members might be against such a change sadly, I am all for such a change as an incremental improvement.
Ash Berlin-Taylor
Am I reading this right, there are four alternate strategies that are being considered in this AIP?
natanel
I am not quite sure I understand what you mean by strategy, we showed 4 strategies, 3 of which we tried and have decided to not move on with them, and we continue with 1 of the given strategies where we change the semantics of priorities and then proposed a few ways in which we do it (i.e introducing dag level priorities), this is the part where we ask the community what is preferred, and later we have a section for obsolescence algorithms which will happen regardless of the way we choose before
I hope this clarifies the concerns, if not, let me know and I will explain more and change the document as needed
Ash Berlin-Taylor
Oh that didn't come across to me at all.
I think the "Alternatives considered" table confuses me. I'm not sure what to suggest as I'm not sure what the approach suggested is.
natanel
Hmm, okay, I will try to move things around to clarify the suggested aproach
natanel
I have made a few changed, I would appreciate a review, I have hopefully made it clearer as to what the proposed approach is.
Ash Berlin-Taylor
Honestly, no, not really.
There are 3 or 4 tables (which don't render that well as they are so wide, so it's hard to read them), and it's not clear what is actually being proposed. Should I expect a concrete proposal of "we will do X", or is this still at the "here are the options as we see them" stage?
natanel
Hmm, I will try changing a few more things, I really see it as so:
We started with a vague general idea of an improvement, tried a few solutions and decided to move on with one of them (rethinking priorities)
Now, we are in a state where we decided what we will do, and we need to decide how it will be done, and so now we have "multiple options as we see them", and need to choose which option we will go forward with, the last table (before the appendix) describes an obsolescence algorithm, which can be chosen during implementation, and is not impacted by the earlier choice
Theo S.
Hello,
Actually yes, this document was created as a summary of long mail and git discussions, so it's easier for the community to review and share opinions. My goal was to objectively describe the trade-offs in the most complete way, and this is the reason for the large table.
No solution here is ideal, and the preferred one strictly depends on the desired properties - whether it's the best performance/extendability/determinism - at the cost of others. As the change is rather radical, community engagement is a key factor. It's better to stop for a while and discuss things, before jumping into more PRs. As something concrete is shaped, the chosen strategy can be elaborated on more, and implementation will follow.