Status
Motivation
With the completion of AIP-51 - Executor Decoupling, it is now easier than ever to create new Airflow executors. Each executor has its own set of pros and cons, often they are tradeoffs between latency, isolation and compute efficiency among other properties. Currently only one executor can be configured for an entire Airflow environment. This means users must pick one executor, with its specific set of pros and cons, that attempts to best fit the entire set of tasks for the environment. This often involves compromising for some set of tasks which do not work well with that particular executor.
This design describes a proposal to allow supporting multiple executors concurrently in a single Airflow environment. This allows users to make better use of the strengths of all the available executors and avoid their weaknesses. Using a specific executor for a specific set of tasks where its particular merits and benefits make the most sense for those tasks and so on.
Users will be able to configure an executor by default for their entire environment (today’s standard) and if they wish: a specific executor for each DAG or even for individual tasks.
Considerations
What problem does it solve?
To set the context, there are three main types of Airflow executors (at the time of writing):
- Local Executors: Airflow tasks are executed on the same host that the executor (i.e. scheduler) is running on. E.g.: LocalExecutor
- Very easy to use, fast, very low latency, and few requirements for setup
- But also limited in capabilities and shares resources with the Airflow scheduler
- Remote Queued/Batched Executors: Airflow tasks are sent to a central queue where remote workers pull tasks to execute. Often workers are persistent and run multiple tasks at once: E.g.: CeleryExecutor
- More robust since you’re decoupling workers from the scheduler process, workers can be large hosts that can churn through many tasks (often in parallel) which is cost effective, and latency can be relatively low since some workers can be provisioned to be running at all times to take tasks immediately from the queue.
- Shared workers have the noisy neighbour problem with tasks competing for resources or for how the environment/system is configured and can be expensive if your workload is not constant, you may have workers idle and not being used (or you have to manage scaling them up and down).
- Remote Containerized Executors: Airflow tasks are executed ad hoc inside containers/pods. Each task is isolated in its own environment. E.g.: KubernetesExecutor or AwsEcsExecutor
- Each Airflow task is isolated to one container so no noisy neighbour problem, the execution environment can be customized for specific tasks (system libs, binaries, dependencies, amount of resources, etc), cost effective as the workers are only alive for the duration of the task.
- Latency on startup since the container or pod needs to start, can be expensive if you’re running many many short/small tasks, no workers to manage but you must manage something like a kubernetes env.
Perhaps you have a small subset of your tasks that need to run with very low latency SLA but use very minimal resources (e.g. simple fire and forget tasks) you could use a local type executor. Or perhaps you have a larger burst of those low latency SLA tasks that need a bit more resources, you could use a queued based executor with a big worker waiting to take tasks and scale up workers if required. The last portion of your tasks are maybe some that need a very specific execution environment or cannot be run alongside other tasks, but you don’t mind paying the container startup time (no strict latency targets) you could use a containerized executor.
Currently in Airflow, you will have to choose just one executor that does its best to satisfy all those use cases, but it will often not excel at all tasks.
Hybrid executors solves this problem by allowing multiple executors to be used concurrently for the tasks that they’re best suited for.
Why is it needed?
There are currently two “hardcoded” hybrid executors, the LocalKubernetesExecutor and the CeleryKubernetesExecutor.
Their implementation is not native or intrinsic to core Airflow, the solution is hacked together. These hybrid executors make use of the queue
field on task instances to indicate and persist which sub-executor to run on, which is a misuse of this field and makes it impossible to use it for its intended purpose when using hybrid executors.
Also, executors such as these require hand crafting new “concrete” classes to create each permutation of possible combinations of executors. This is untenable as more executors are created and is a source of coupling of sorts. It would lead to tens to hundreds of classes to support all possible future combinations. Bespoke coding effort should not be required to use any combination of executors.
However, the existence and usage of these hardcoded hybrid executors is indeed evidence that shows a desire for such a natively supported feature from the community. Together they form ~20% of the executors used in production by our users (according to the 2023 Airflow survey). Some anonymous user anecdotes that describe the value achieved from hybrid execution include:
CeleryKubernetes to accommodate large/spiky workloads
We changed it from Celery to CeleryKubernetes to open to possibility we can run the DAG in an isolated pod.
Changed CeleryExecutor to CeleryKubernetesExecutor to run some heavy and long tasks in isolated environment
switched from Kubernetes to CeleryKubernetes as delays due to pod spin-up time is unacceptable for some tasks/workflows
Many of these fall into the categories discussed in section What problem does it solve?
What change do you propose to make?
Configuring multiple executors
The first user touch point to the hybrid executor feature will be the configuration of multiple executors. The current executor configuration is documented here.
A short name alias can be used to configure core Airflow executors:
export AIRFLOW__CORE__EXECUTOR='LocalExecutor'
Or a module path to an executor class:
export AIRFLOW__CORE__EXECUTOR='airflow.executors.local_executor.LocalExecutor'
To maintain backwards compatibility and to not introduce new Airflow configuration, the same configuration option will be used for multiple executors, using a comma separated list notation. Where the first executor in the list (either on its own or along with other executors) will behave the same as Airflow does today. In other words, this will be the default executor for the environment. Any Airflow Task or DAG that does not specify/override a specific executor will use this environment level executor. All other executors in the list will be initialized and ready to run tasks if specified on an Airflow Task or DAG. If you do not specify an executor in this configuration list, it cannot be used to run tasks.
A notation will also be added to allow users to specify a short name alias for custom executors along with the module path. This gives users an easier way to specify their executor in Task and DAG configuration, rather than using the full module path or class name. Some examples:
export AIRFLOW__CORE__EXECUTOR='LocalExecutor'
export AIRFLOW__CORE__EXECUTOR='LocalExecutor,CeleryExecutor'
export AIRFLOW__CORE__EXECUTOR='LocalExecutor,CeleryExecutor,KubernetesExecutor'
export AIRFLOW__CORE__EXECUTOR='KubernetesExecutor,my.custom.module.ExecutorClass'
export AIRFLOW__CORE__EXECUTOR='LocalExecutor,my.custom.module.ExecutorClass:ShortName'
New field on Task Instance DB Table and ORM object and BaseOperator
A new field executor
will be added to the BaseOperator/TaskInstance models (type str). This will represent the executor that the user wishes to run the task on. This new field will be persisted to the Airflow database as a new field/column on the TaskInstance table. This will allow future retries for example to be executed on the same executor that the Airflow task ran with, for consistency.
To specify an executor for a task/Operator, the user will make use of the new executor
parameter:
@task(executor="LocalExecutor")
def hello_world():
print("hello world!")
or
BashOperator(
task_id="hello_world",
executor="LocalExecutor",
bash_command="echo 'hello world!'",
)
To specify an executor for an entire DAG, we simplify by also using the existing Airflow mechanism of default arguments. These are a group of key word arguments passed to the DAG definition, which are then passed to every task in that DAG (unless explicitly overridden by a specific task):
def hello_world():
print("hello world!")
def hello_world_again():
print("hello world again!")
with DAG(
dag_id="hello_worlds",
default_args={"executor": "AwsEcsExecutor"}, # Applies to all tasks in the DAG
schedule_interval=None)
) as dag:
# All tasks will use the executor from default args automatically
hw = hello_world()
hw_again = hello_world_again()
IMPORTANT NOTE: task instances that run on the default/environment executor (i.e. with no specific override provided) will not persist the executor in the same way. This is to maintain compatibility with the current Airflow behaviour where it is assumed that any task can run on any currently configured executor (see section How are users affected by this change). So if a task/DAG does not configure specifically an executor to use, the tasks will run on whatever the currently configured default/environment executor is (just as today). However, if the user intentionally marks a task for a particular executor (via the DAG level or the task level) we persist this and then only run the task on that executor (when it is configured and available).
Providing Configuration Overrides to Executors
Hybrid executors will also make use of the existing executor_config
field to allow users to pass through configuration/runtime options to their configured executor per task or per DAG. This config is passed as a dictionary of key/value pairs to a given Operator or in the default arguments of a DAG, similar to the executor
field above. The executor config will be passed to the explicitly specified executor if one exists, otherwise the default/environment level executor.
Scheduler Job Runner and Backfill job runner changes
The Scheduler and Backfill job runners are the main components that need modification to support multiple executors. The changes will be focused on enabling/configuring multiple executors per job runner and iterating over those executors for many of the scheduling decisions.
Care will need to be taken during development not to slow down the scheduler loop with excessive DB queries. Focus will be put into testing the impact of either using separate queries per executor (e.g. for tasks ready to be executed) or single large queries for all executors then locally processing which tasks should be sent to which executor within the scheduler code.
Executor Loader
Changes to the Executor Loader module will handle the upgrades to the core.executors
config for parsing, loading/importing/initializing multiple executors; as well as the short name aliases.
DAG and SLA callbacks
Possible modifications to DAG success/failure and SLA callbacks may be required. For the new task-based SLA callbacks (being developed as part of AIP-57) it will be possible to determine the executor to use, since the TI context will be passed to the callback. However, for a DAG level callback, it is not as clear as to which executor should be used, since many different executors may be used across the tasks of a DAG. A safe proposal is to use the default/environment executor for DAG SLA callbacks.
Similarly, success/failure callbacks at the Airflow task level, will use the executor that ran the task and DAG level success/failure callbacks will use the default/environment executor.
The existing, and soon to be deprecated, SLA callback mechanism will not be upgraded to support multiple executors (beyond ensuring that the existing behaviour continues to work as normal).
Are there any downsides to this change?
The change should be fully backwards compatible and if users are not interested in using this new feature there will be no behaviour change to their Airflow environments (see section How are users affected by the change). This should mitigate most downsides.
Other downsides include the increase in complexity to the scheduling/backfilling code within Airflow. Care will need to be taken to not introduce any egregious latency increases to scheduling decisions within Airflow.
Which users are affected by the change?
No specific users are targeted by this proposal, but a couple user roles who will have touch points with this proposal include:
- DAG authors: These users will now have the ability to configure which executor runs their DAG and individual tasks
- Administrators: Users who have access to modifying Airflow config, will now have the ability to configure multiple executors for their environment.
How are users affected by the change? (e.g. DB upgrade required?)
To maintain backwards compatibility, the user experience will not change unless the user intentionally configures multiple executors. If the user is interested in using this feature they will have to specify multiple executors in their Airflow Config. Once configured users may specify which executors they’d like to run their DAGs and tasks. Only then will the behaviour of Airflow change and affect the users. Tasks will run as usual, no experience change in that regard (other than which executor executes each task, if configured).
A DB upgrade is required to add the new field which will track the executor to be used for an individual task instance.
Other considerations?
Warning logging will be added to the scheduler and backfill job runners to notify users if we arrive in a state where tasks are set to run on a particular executor but that executor is not configured/enabled. However, future UI based warnings may be warranted. There is a precedence for this for several other failure states (e.g. missing scheduler heartbeats, DAGs failing to parse, etc) which throw a dialogue/banner in the Airflow web UI to indicate an issue or misconfiguration. A similar mechanism could be used for this in the future.
Airflow executor metrics (open_slots
, queued_tasks
, running_tasks
) will be updated to add a dimension/tag for each configured executor.
Note: The current metrics (with no executor name dimension/tag) could be continued to be used for the default/environment executor alone, or could be used to represent the sum of all executors (global state). I’m curious to hear input on this topic.
Updates to the Airflow Executor documentation will be made in the Airflow version that the feature is fully “released” in. This documentation will describe how to enable and configure Airflow to use multiple executors, how to specify executor overrides in DAGs and for Airflow tasks, and how to monitor the executor metrics.
It may be a desired behaviour for users to run multiple instances of the same executor and it would be especially helpful to isolate tenants from each other at runtime in a multi-tenanted environment. Take the example of two Celery Executors, this will actually be partially possible as this proposal is currently designed, however, there are some blockers prohibiting first class support for this at the moment. For example, there is no convenient way to set global configuration for each of these two executors, since they will both read from the same sections and keys of Airflow configuration (e.g. AIRFLOW__CELERY__BROKER_URL
). It is possible to provide different configuration at DAG author time by leveraging the executor_config
field on tasks, but this is cumbersome. This and other blockers need to be fleshed out before full support for duplicate/multi-tenant executors and it will not be delivered by this AIP but nothing in this AIP will be a one-way door blocking the future adoption of such a future.
What defines this AIP as "done"?
For the initial phase of this AIP, the proposal will be “done” when the following in-scope requirements are complete:
- The Scheduler and Backfill jobs are able to run tasks with multiple executors.
- Users have the ability to configure executors at three levels of fidelity:
- A default executor for their entire Airflow environment (in essence, the previous/current behaviour of Airflow), unless overridden.
- A default executor for all tasks in a specific DAG, unless overridden
- An executor for a specific task in a DAG
- Executor CLI commands are supported for all configured executors in the environment.
- DAG and SLA callbacks are supported and run on the executor that the task ran on.
- The feature is sufficiently described in Airflow documentation
Out of Scope
- This will not cover adapting the current system test architecture to support hybrid execution.
- UI related changes. One can currently view all parameters of a task instance via the Airflow web UI. This, along with logs, will suffice for now to be able to assess which executor ran a particular task. Future UI representations could be added to better visualize this, particularly in the graph view.
- First class support of duplicate executors, especially in a multi-tenanted manner.
15 Comments
Hussein Awala
I think it's a good idea to support multiple executors natively without the need to hardcode the combinations, but I'm wondering if we need to support multiple executors of the same type (for example, multiple Kubernetes executors with different pod_template or different Kubernetes connection, or different Celery cluster instead of creating multiple queues in the same cluster).
Jarek Potiuk
Actually now when you mentioned it Hussein Awala The answer is - I think - yeah that would be great to have that ....
This is a - I think - great way of implementing multi-tenancy feature which I was talking about at the https://airflowsummit.org/sessions/2023/multi-tenancy-state-of-the-union/ talk.
When we talked about it with Vincent and Mateusz I had the idea that we will need one more AIP which will make it easier to map per-tenant DAGs to workload. And initial idea was to use Queues for that, but there were few problems: you could not join queues and tenants, and we would have to figure out how to do the same for K8S executor (and all others).
But your comment made me realise that Multiexecutor IS the way to go. We simply have to add (in the follow-up AIP) figure out how to map tenant to executor - and we are basically done.
I wil take a closer look at that one, but I think that one might be a perfect solution to complete multi-tenancy work.
Niko Oliveira - what do you think about this? I am not sure if you would like to implement and design it from the beginning following this idea, I think it could be done in two stages where initially we have only executor types but then we could extend it by also having a possibility of having multiple executors of a given type configured as a generic solution?
Shubham Mehta
Completely agree with Hussein and Jarek here. Fwiw this came up in our internal discussions as well and I think it is crucial to support multi-tenant deployments. As Airflow becomes more suitable for supporting different business units in one instance, there will be a need to have multiple executors of same type. I also think it is not a must for this AIP to cover the feature in its scope, but it SHOULD consider it as a future work and should not make any design decisions that could result in conflicts later on.
Niko Oliveira
Thanks for the feedback folks! There is nothing stopping multiple executors of the same type from working at a basic level. In fact, the way this is designed supports that (and this already works in the PoC I have). BUT the tricky part comes with things like configuration for those executors, not how to run them. Currently executors read their configuration from the same place everything else does, airflow config. There is no easy way at the moment to specify multiple distinct sets of the same configs to each executor (at least not without some more paradigm breaking changes). But with multi-tenancy I think this will soon change, as tenants are likely to want their own individual configs.
So long story short: I think we're not walking through any one-way doors here (folks should yell if they think otherwise!) and once we have multi-tenancy support for other things like config, logs and metrics, the support for multi-tenant executors will fall into place. Until then, support won't be great (we should probably software limit it, in all honesty, but we could think about allowing it).
Jarek Potiuk
Niko Oliveira Shubham Mehta → that's cool. yep. I understand that configuraiton is currently NOT supporting per-tenant configuration, but for me that's the right solution - to add the option to have per-tenant configuration options as a generic solution. Which should be rather easy. I am just about to start draft AIP for the final stage of multi-tenancy and I think reliance on AIP-61 will be part of it
Shubham Mehta
Not to digress too much from Executors, but just to close this out.
<multi-tenancy> Yes, I think we covered it during our Multi-tenancy discussions (https://docs.google.com/document/d/1n23h26p4_8F5-Cd0JGLPEnF3gumJ5hw3EpwUljz7HcE/edit#heading=h.s80zaabehf4y) - some configurations, like Core, would be Env-level and others, like Metrics, would be Tenant-level. Hybrid executor was not an option at that time, so we had kept it env-level. Overall, I think what you're suggesting is the right solution and we may need to restructure configs to have tenant-specific sections. </multi-tenancy>
Okay, back to Hybrid executors...!
Jens Scheffler
Thanks for the AIP - I am also looking forward for the implementation! I was also thinking longer to raise an AIP on this but always had it on my backlog list ... good that you now made it!
My comments:
Looking forward to Voting, my +1 will be there!
Jarek Potiuk
My comments after reading it in detail (and starting with what Jens Scheffler ) wrote:
Thinking loud:
I was not sure about the choice of using AIRFLOW__CORE__EXECUTOR and coma-separated list of executors. But I think it's the easiest approach for now. I think to make it work for tenants, we will have to - likely convert our .ini format of cfg files into - likely - toml which became the de-facto standard for Python, is mostly compatible with ini format that ConfigParser recognizes and supports things like tables natively and can be validated with json schema. But this should be also the next step and after considering few options, I think coma-separated value is pretty good choice for now.
Also I have another point:
One important aspect that we should consider is "core/parallelism". I THINK this one should be per-executor value. I think already with hardcoded ones, this value often made no sense, because there is a difference between running 32 celery tasks, 16 celery tasks 16 kubernetes tasks, 32 kubernetes tasks. So while we are adding multiple executors, I think we should limit number of tasks per-executor not per whole Airflow deployment. Not sure how to configure it without more structured configuration though.
Niko Oliveira
Re config: Yeah, as we've discussed above, I think doing anything advanced like migrating the config specification and parsing in this AIP is a bit too much scope creep. Using the existing config in backwards compatible way allows us to increment forward and teach the core internals of Airflow to support multiple executors. We can then iterate on that in future AIPs that are more related to multi-tenancy/running many instances of the same executor.
Re: core/parallelism: "Not sure how to configure it without more structured configuration though." Yeah, this to me is similar to the above. I think it depends on an overhaul of our configuration. I think it's reasonable to ship a MVP version of hybrid execution without this until we have a more expressive way to configure Airflow. Thoughts?
Jarek Potiuk
> Re: core/parallelism: "Not sure how to configure it without more structured configuration though." Yeah, this to me is similar to the above. I think it depends on an overhaul of our configuration. I think it's reasonable to ship a MVP version of hybrid execution without this until we have a more expressive way to configure Airflow. Thoughts?
Agree
Daniel Standish
i think per-executor parallelism makes sense too. the problem of configuration is a solveable one. i'm sure we all could come up with a few ideas. i think we should probably not defer it, but rather sort it out before launch in 2.10.
e.g. maybe
AIRFLOW__CORE__EXECUTOR='{"CeleryExecutor": {"parallelism": 1000000}}'
maybe it should also be changed to something with slightly better name like
max_tasks
or something.Amogh Desai
Thank you for the AIP, Niko Oliveira
It was a great read, I came across the comments and most of my questions have been answered here.
Few comments from me:
some load on the scheduler here. Initialise an executor during its first usage, if you will
page should be the executor used. Makes it easy in case of failures
what are we doing to take care of fault tolerance in such scenarios? Do we by any chance, run on the other configured executors?
Great read again, looking forward for the voting thread
Niko Oliveira
Thanks for taking the time to read! Replied in kind to each bullet:
Daniel Standish
re this one
We might want to fail it in the dag processor but not at parse time because the config'd value might not be there locally when you're developing and you just want to parse your dag. We do something similar for "nonexistent pool" right now. And it raises a dag configuration warning but not a hard failure. You could plug in to that framework.
Kaxil Naik
Marking it for Airflow 2.10 release, Niko Oliveira to get it done in the next few weeks