Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Mark AIP as complete

Status

Page properties


StateIn Progress / Under DevelopmentCompleted
Discussion Thread
Vote Threadhttps://lists.apache.org/thread/mkdlskz6tb0rbw36vglh54kfghl69kxs
Vote Result Threadhttps://lists.apache.org/thread/kjj0x3h3w44yb5gvyr3wksm8dsfg1cos
Progress Tacking (PR/GitHub Project/Issue Label)
Date Created

 

Version Released2.10
AuthorsNiko Oliveira



Motivation

With the completion of AIP-51 - Executor Decoupling, it is now easier than ever to create new Airflow executors. Each executor has its own set of pros and cons, often they are tradeoffs between latency, isolation and compute efficiency among other properties. Currently only one executor can be configured for an entire Airflow environment. This means users must pick one executor, with its specific set of pros and cons, that attempts to best fit the entire set of tasks for the environment. This often involves compromising for some set of tasks which do not work well with that particular executor.

This design describes a proposal to allow supporting multiple executors concurrently in a single Airflow environment. This allows users to make better use of the strengths of all the available executors and avoid their weaknesses. Using a specific executor for a specific set of tasks where its particular merits and benefits make the most sense for those tasks and so on.

Users will be able to configure an executor by default for their entire environment (today’s standard) and if they wish: a specific executor for each DAG or even for individual tasks.

Considerations


What problem does it solve?

To set the context, there are three main types of Airflow executors (at the time of writing):

  • Local Executors: Airflow tasks are executed on the same host that the executor (i.e. scheduler) is running on. E.g.: LocalExecutor
    • Very easy to use, fast, very low latency, and few requirements for setup
    • But also limited in capabilities and shares resources with the Airflow scheduler
  • Remote Queued/Batched Executors: Airflow tasks are sent to a central queue where remote workers pull tasks to execute. Often workers are persistent and run multiple tasks at once: E.g.: CeleryExecutor
    • More robust since you’re decoupling workers from the scheduler process, workers can be large hosts that can churn through many tasks (often in parallel) which is cost effective, and latency can be relatively low since some workers can be provisioned to be running at all times to take tasks immediately from the queue.
    • Shared workers have the noisy neighbour problem with tasks competing for resources or for how the environment/system is configured and can be expensive if your workload is not constant, you may have workers idle and not being used (or you have to manage scaling them up and down).
  • Remote Containerized Executors: Airflow tasks are executed ad hoc inside containers/pods. Each task is isolated in its own environment. E.g.: KubernetesExecutor or AwsEcsExecutor
    • Each Airflow task is isolated to one container so no noisy neighbour problem, the execution environment can be customized for specific tasks (system libs, binaries, dependencies, amount of resources, etc), cost effective as the workers are only alive for the duration of the task.
    • Latency on startup since the container or pod needs to start, can be expensive if you’re running many many short/small tasks, no workers to manage but you must manage something like a kubernetes env.


Perhaps you have a small subset of your tasks that need to run with very low latency SLA but use very minimal resources (e.g. simple fire and forget tasks) you could use a local type executor. Or perhaps you have a larger burst of those low latency SLA tasks that need a bit more resources, you could use a queued based executor with a big worker waiting to take tasks and scale up workers if required. The last portion of your tasks are maybe some that need a very specific execution environment or cannot be run alongside other tasks, but you don’t mind paying the container startup time (no strict latency targets) you could use a containerized executor.

Currently in Airflow, you will have to choose just one executor that does its best to satisfy all those use cases, but it will often not excel at all tasks.

Hybrid executors solves this problem by allowing multiple executors to be used concurrently for the tasks that they’re best suited for.


Why is it needed?

There are currently two “hardcoded” hybrid executors, the LocalKubernetesExecutor and the CeleryKubernetesExecutor.
Their implementation is not native or intrinsic to core Airflow, the solution is hacked together. These hybrid executors make use of the queue field on task instances to indicate and persist which sub-executor to run on, which is a misuse of this field and makes it impossible to use it for its intended purpose when using hybrid executors.
Also, executors such as these require hand crafting new “concrete” classes to create each permutation of possible combinations of executors. This is untenable as more executors are created and is a source of coupling of sorts. It would lead to tens to hundreds of classes to support all possible future combinations. Bespoke coding effort should not be required to use any combination of executors.

However, the existence and usage of these hardcoded hybrid executors is indeed evidence that shows a desire for such a natively supported feature from the community. Together they form ~20% of the executors used in production by our users (according to the 2023 Airflow survey). Some anonymous user anecdotes that describe the value achieved from hybrid execution include:

CeleryKubernetes to accommodate large/spiky workloads
We changed it from Celery to CeleryKubernetes to open to possibility we can run the DAG in an isolated pod.
Changed CeleryExecutor to CeleryKubernetesExecutor to run some heavy and long tasks in isolated environment
switched from Kubernetes to CeleryKubernetes as delays due to pod spin-up time is unacceptable for some tasks/workflows


Many of these fall into the categories discussed in section What problem does it solve?


What change do you propose to make?

Configuring multiple executors

The first user touch point to the hybrid executor feature will be the configuration of multiple executors. The current executor configuration is documented here.

A short name alias can be used to configure core Airflow executors:

export AIRFLOW__CORE__EXECUTOR='LocalExecutor'

Or a module path to an executor class:

export AIRFLOW__CORE__EXECUTOR='airflow.executors.local_executor.LocalExecutor'

To maintain backwards compatibility and to not introduce new Airflow configuration, the same configuration option will be used for multiple executors, using a comma separated list notation. Where the first executor in the list (either on its own or along with other executors) will behave the same as Airflow does today. In other words, this will be the default executor for the environment. Any Airflow Task or DAG that does not specify/override a specific executor will use this environment level executor. All other executors in the list will be initialized and ready to run tasks if specified on an Airflow Task or DAG. If you do not specify an executor in this configuration list, it cannot be used to run tasks.

A notation will also be added to allow users to specify a short name alias for custom executors along with the module path. This gives users an easier way to specify their executor in Task and DAG configuration, rather than using the full module path or class name. Some examples:

export AIRFLOW__CORE__EXECUTOR='LocalExecutor'
export AIRFLOW__CORE__EXECUTOR='LocalExecutor,CeleryExecutor'
export AIRFLOW__CORE__EXECUTOR='LocalExecutor,CeleryExecutor,KubernetesExecutor'
export AIRFLOW__CORE__EXECUTOR='KubernetesExecutor,my.custom.module.ExecutorClass'
export AIRFLOW__CORE__EXECUTOR='LocalExecutor,my.custom.module.ExecutorClass:ShortName'

New field on Task Instance DB Table and ORM object and BaseOperator

A new field executor will be added to the BaseOperator/TaskInstance models (type str). This will represent the executor that the user wishes to run the task on. This new field will be persisted to the Airflow database as a new field/column on the TaskInstance table. This will allow future retries for example to be executed on the same executor that the Airflow task ran with, for consistency.

To specify an executor for a task/Operator, the user will make use of the new executor parameter:

@task(executor="LocalExecutor")
def hello_world():
print("hello world!")

or

BashOperator(
task_id="hello_world",
executor="LocalExecutor",
bash_command="echo 'hello world!'",
)


To specify an executor for an entire DAG, we simplify by also using the existing Airflow mechanism of default arguments. These are a group of key word arguments passed to the DAG definition, which are then passed to every task in that DAG (unless explicitly overridden by a specific task):

 def hello_world():
print("hello world!")

def hello_world_again():
print("hello world again!")

with DAG(
dag_id="hello_worlds",
default_args={"executor": "AwsEcsExecutor"}, # Applies to all tasks in the DAG
schedule_interval=None)
) as dag:
# All tasks will use the executor from default args automatically
hw = hello_world()
hw_again = hello_world_again()

IMPORTANT NOTE: task instances that run on the default/environment executor (i.e. with no specific override provided) will not persist the executor in the same way. This is to maintain compatibility with the current Airflow behaviour where it is assumed that any task can run on any currently configured executor (see section How are users affected by this change). So if a task/DAG does not configure specifically an executor to use, the tasks will run on whatever the currently configured default/environment executor is (just as today). However, if the user intentionally marks a task for a particular executor (via the DAG level or the task level) we persist this and then only run the task on that executor (when it is configured and available).

Providing Configuration Overrides to Executors

Hybrid executors will also make use of the existing executor_config field to allow users to pass through configuration/runtime options to their configured executor per task or per DAG. This config is passed as a dictionary of key/value pairs to a given Operator or in the default arguments of a DAG, similar to the executor field above. The executor config will be passed to the explicitly specified executor if one exists, otherwise the default/environment level executor.

Scheduler Job Runner and Backfill job runner changes

The Scheduler and Backfill job runners are the main components that need modification to support multiple executors. The changes will be focused on enabling/configuring multiple executors per job runner and iterating over those executors for many of the scheduling decisions.

Care will need to be taken during development not to slow down the scheduler loop with excessive DB queries. Focus will be put into testing the impact of either using separate queries per executor (e.g. for tasks ready to be executed) or single large queries for all executors then locally processing which tasks should be sent to which executor within the scheduler code.

Executor Loader

Changes to the Executor Loader module will handle the upgrades to the core.executors config for parsing, loading/importing/initializing multiple executors; as well as the short name aliases.

DAG and SLA callbacks

Possible modifications to DAG success/failure and SLA callbacks may be required. For the new task-based SLA callbacks (being developed as part of AIP-57) it will be possible to determine the executor to use, since the TI context will be passed to the callback. However, for a DAG level callback, it is not as clear as to which executor should be used, since many different executors may be used across the tasks of a DAG. A safe proposal is to use the default/environment executor for DAG SLA callbacks.

Similarly, success/failure callbacks at the Airflow task level, will use the executor that ran the task and DAG level success/failure callbacks will use the default/environment executor.

The existing, and soon to be deprecated, SLA callback mechanism will not be upgraded to support multiple executors (beyond ensuring that the existing behaviour continues to work as normal).

Are there any downsides to this change?

The change should be fully backwards compatible and if users are not interested in using this new feature there will be no behaviour change to their Airflow environments (see section How are users affected by the change). This should mitigate most downsides.

Other downsides include the increase in complexity to the scheduling/backfilling code within Airflow. Care will need to be taken to not introduce any egregious latency increases to scheduling decisions within Airflow.


Which users are affected by the change?

No specific users are targeted by this proposal, but a couple user roles who will have touch points with this proposal include:

  • DAG authors: These users will now have the ability to configure which executor runs their DAG and individual tasks
  • Administrators: Users who have access to modifying Airflow config, will now have the ability to configure multiple executors for their environment.


How are users affected by the change? (e.g. DB upgrade required?)

To maintain backwards compatibility, the user experience will not change unless the user intentionally configures multiple executors. If the user is interested in using this feature they will have to specify multiple executors in their Airflow Config. Once configured users may specify which executors they’d like to run their DAGs and tasks. Only then will the behaviour of Airflow change and affect the users. Tasks will run as usual, no experience change in that regard (other than which executor executes each task, if configured).

A DB upgrade is required to add the new field which will track the executor to be used for an individual task instance.


Other considerations?

Warning logging will be added to the scheduler and backfill job runners to notify users if we arrive in a state where tasks are set to run on a particular executor but that executor is not configured/enabled. However, future UI based warnings may be warranted. There is a precedence for this for several other failure states (e.g. missing scheduler heartbeats, DAGs failing to parse, etc) which throw a dialogue/banner in the Airflow web UI to indicate an issue or misconfiguration. A similar mechanism could be used for this in the future.

Airflow executor metrics (open_slots, queued_tasks, running_tasks) will be updated to add a dimension/tag for each configured executor.

Note: The current metrics (with no executor name dimension/tag) could be continued to be used for the default/environment executor alone, or could be used to represent the sum of all executors (global state). I’m curious to hear input on this topic.

Updates to the Airflow Executor documentation will be made in the Airflow version that the feature is fully “released” in. This documentation will describe how to enable and configure Airflow to use multiple executors, how to specify executor overrides in DAGs and for Airflow tasks, and how to monitor the executor metrics.

It may be a desired behaviour for users to run multiple instances of the same executor and it would be especially helpful to isolate tenants from each other at runtime in a multi-tenanted environment. Take the example of two Celery Executors, this will actually be partially possible as this proposal is currently designed, however, there are some blockers prohibiting first class support for this at the moment. For example, there is no convenient way to set global configuration for each of these two executors, since they will both read from the same sections and keys of Airflow configuration (e.g. AIRFLOW__CELERY__BROKER_URL). It is possible to provide different configuration at DAG author time by leveraging the executor_config field on tasks, but this is cumbersome. This and other blockers need to be fleshed out before full support for duplicate/multi-tenant executors and it will not be delivered by this AIP but nothing in this AIP will be a one-way door blocking the future adoption of such a future.



What defines this AIP as "done"?

For the initial phase of this AIP, the proposal will be “done” when the following in-scope requirements are complete:

  1. The Scheduler and Backfill jobs are able to run tasks with multiple executors.
  2. Users have the ability to configure executors at three levels of fidelity:
    1.  A default executor for their entire Airflow environment (in essence, the previous/current behaviour of Airflow), unless overridden.
    2. A default executor for all tasks in a specific DAG, unless overridden
    3.  An executor for a specific task in a DAG
  3. Executor CLI commands are supported for all configured executors in the environment.
  4. DAG and SLA callbacks are supported and run on the executor that the task ran on.
  5. The feature is sufficiently described in Airflow documentation

Out of Scope

  1. This will not cover adapting the current system test architecture to support hybrid execution.
  2. UI related changes. One can currently view all parameters of a task instance via the Airflow web UI. This, along with logs, will suffice for now to be able to assess which executor ran a particular task. Future UI representations could be added to better visualize this, particularly in the graph view.
  3. First class support of duplicate executors, especially in a multi-tenanted manner.