Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

Executors, as they are now, are quite pluggable/extensible but there remains several places in the core Airflow logic (e.g. scheduler, backfill, settings/config, etc) where the code is opinionated about specific Executors. These are areas where the Executor interface/API is leaking into core Airflow code. However, the core Airflow logic should not need to know the properties of any one specific Executor, it should be able to interact with a generic interface which all Executors implement. Put another way, if one is looking to add a new Executor, they should not have to modify any core Airflow code/logic.

A concrete example helps illustrate the point:

Code Block
languagepy
titleairflow/airflow/configuration.py
linenumberstrue
 is_executor_without_sqlite_support = self.get("core", "executor") not in ( 
     'DebugExecutor', 
     'SequentialExecutor', 
 ) 
 is_sqlite = "sqlite" in self.get('database', 'sql_alchemy_conn') 
 if is_sqlite and is_executor_without_sqlite_support: 
     raise AirflowConfigException(f"error: cannot use sqlite with the {self.get('core', 'executor')}") 
 if is_sqlite: 

Here you can see code in Configuration.py maintaining a set of specific Executors that do not support SQLite. If someone was to add a new Executor (e.g. as a plugin to Airflow), which does not support SQLite, then they would need to modify this piece of core Airflow code (and the other places where SQLite compatibility are checked). This is Executor coupling!

Considerations

What change do you propose to make?

Below is a set of coupling types/areas that were identified after a deep dive into the Airflow code base. Each include a list of examples along with a proposal to remediate the coupling:

1) Local Executor Compatibility Checks

Executors that run locally have certain limitations and requirements, many of which are currently hardcoded in core Airflow code. To add a new Executor, that would run locally, one would be required to change this core Airflow code.

Examples

  1a) Whether or not to start the serve_logs sub-process, currently hardcoded to LocalExecutor and SequentialExecutor (code link).

  1b) When running in standalone mode, asserts a local Executor is being used, currently hardcoded to LocalExecutor and SequentialExecutor (code link).

  1c) Local Executors need a copy of Airflow configuration created in /tmp , currently hardcoded to LocalExecutor and SequentialExecutor (code link).

Proposal

A static method or attribute on the Executor base class which can be overridden by child classes which can then be checked by core Airflow code.

There is a precedent already set with the supports_ad_hoc_ti_run attribute. Here you can see it currently being set on the KubernetesExecutor:

Code Block
languagepy
titleairflow/airflow/executors/kubernetes_executor.py
linenumberstrue
class KubernetesExecutor(BaseExecutor):
    """Executor for Kubernetes"""

    supports_ad_hoc_ti_run: bool = True

An example usage of the supports_ad_hoc_ti_run attribute looks like the following:

Code Block
languagepy
titleairflow/airflow/www/views.py
linenumberstrue
 if not getattr(executor, "supports_ad_hoc_ti_run", False): 
     msg = "Only works with the Celery, CeleryKubernetes or Kubernetes executors" 
     return redirect_or_json(origin, msg, "error", 400) 

Static methods and class attributes are convenient for this purpose since the Executor class would not need to be instantiated but are difficult if not impossible to enforce implementation as one would with instance methods. This can be mitigated by ensuring an implementation or default value is present in the base class (which would map to the current assumptions in Airflow code) and updating existing Executors as part of this AIP.

Note: that while the above usage of supports_ad_hoc_ti_run is a good example of uncoupled Executor logic, the logging msg used does technically contain some Executor coupling because it mentions specific Executor names. It should be updated to something like the below
Code Block
languagepy
linenumberstrue
if not getattr(executor, "supports_ad_hoc_ti_run", False): 
    msg = f"{executor.__class__.__name__} does not support ad hoc TI runs" 
    return redirect_or_json(origin, msg, "error", 400) 

2) Single Threaded Executors (SQLite support)

Some Executors, currently a subset of the local Executors, run in a single threaded fashion and have certain limitations and requirements, many of which are hardcoded. To add a new single threaded Executor would require changes to core Airflow code.

Note: This coupling often shows up with SQLite compatibility checks since it does not support multiple connections.

Examples

  2a) SQLite check done in configuration.py (code link).

  2b) When running in standalone mode SQLite compatibility is checked (code link).

  2c) Sensors in poke mode can block execution of DAGs when running with single process Executors, currently hardcoded to DebugExecutor (although should also include SequentialExecutor) (code link).

Proposal

Similar to the proposal for coupling 1, a static method or attribute for single threaded compatibility can be added to the BaseExecutor interface, then implemented by child classes, then checked by core Airflow code instead of hardcoding specific Executors.


3) Misc Compatibility Checks

Coupling around other compatibility checks, similar to coupling 1 and 2, but which don't fall into either of the above categories.

Examples

  3a) Some Executors support pickling, some don’t, there is logic in both backfill and scheduler jobs that checks before submitting tasks to the Executor queue. Applies to DaskExecutor, LocalExecutor and SequentialExecutor as of now (backfill code link, scheduler code link).

  3b) Sentry has a celery integration, if the Executor is Celery, the Sentry integration is imported and configured (code link).

Proposal

The above could be resolved in a similar way to couplings 1 and 2.

4) Executor Coupling in Logging

The FileTaskHandler logging class has a special branch for Kubernetes, to fetch logs from the pod (Note: this is currently hardcoded to KubernetesExecutor so this would fail to run for the CeleryKubernetesExecutor or LocalKubernetesExecutor).

Example

  4a) See the relevant kubernetes code here.

Proposal

Adding a get_task_logs(ti: TaskInstance | None) method on the BaseExecutor class, which Executor subclasses can implement is one viable approach, since other Executors could certainly make use of such a paradigm, particularly containerized or cloud based executors.


5) Executor Specific CLI Commands

Some Executors have their own first class CLI commands (now that’s coupling!) which setup or modify various components related to that Executor.

Examples

  5a) Celery Executor commands (code link).

  5b) Kubernetes Executor commands (code link).

  5c) Default CLI parser has hardcoded logic for Celery and Kubernetes Executors specifically (code link).

This is a behaviour that Airflow has had for quite some time, but is a form of coupling that feels odd when looking in retrospect. However, that is not to say that these CLI commands are not useful for our users. So while simply deprecating them (making Airflow CLI unopinionated about Executor management) is possible, it does not seem like the right solution (though I would like to hear some thoughts on this!). If we are committed to this paradigm, it could be a very clean and straightforward solution to update the BaseExecutor interface with a pluggable mechanism to vend CLI GroupCommands and parsers. Executor subclasses would then implement these methods, if applicable, which would then be called to fetch commands and parsers from within Airflow Core cli parser code. We would then migrate the existing Executor CLI code from cli_parser to the respective Executor class.

Pseudo-code updates to the cli-parser logic which consumes the vended GroupCommands:

Code Block
languagepy
titleupdated cli_parser.py (pseudo code)
linenumberstrue
# Existing code in cli_parser.py
...
airflow_commands: List[CLICommand] = [
GroupCommand(
        name='dags',
        help='Manage DAGs',
        subcommands=DAGS_COMMANDS,
    ),
    ...
]
# New code to add groups vended by executor classes
executor_cls, _ = ExecutorLoader.import_executor_cls(conf.get('core', 'EXECUTOR'))
airflow_commands.append(executor_cls.get_cli_group_commands())
...


6) Misc Executor Coupling in UI

There are a few webpages in the Airflow UI which are specifically coupled to particular Executors.

Examples

  6a) Kube yaml can be rendered in the UI for KubernetesExecutor and CeleryKubernetesExecutor, at the /rendered-k8s url (views code, associated TI code).

  6b) A warning popup is show in the UI when using the SequentialExecutor (views code, associated dags.html code).

Proposal

One could imagine a pluggable mechanism, similar to coupling 5, for Executor subclasses to vend UI views to then be used in views.py. This would allow all Executors to have UI components without needing to modify core Airflow code. There are not many instance of this coupling and it would be a more involved implementation which might not be used often. It isn't as high priority as other coupling types, but should still be addressed. There are also projects underway to migrate Airflow UI to a different framework, which would likely affect how this coupling would be solved.

7) Specific Executors as Defaults

There are places in core Airflow code where specific Executors are the default value for some specific workflow or action.

Examples

  7a) The DAG.run() method, when local=True and no other Executor is provided, is hardcoded to use local Executor (code link)

Proposal

It is arguable that in this case, and other cases like it, using a specific Executor as a default does not count as strict Executor coupling. No part of the Executor interface is being leaked. This type of coupling doesn't fail the test of "if I wanted to add a new Executor, would I need to update this code?". In these cases I'm inclined to leave these defaults in place and simply add ways for users to override which Executor is used where applicable and not already done so. In the case of example 7a, where it is already possible to provide the Executor directly as a parameter, then deprecating the local  parameter and setting the LocalExecutor as the default value for the executor  parameter would clean up that logic.

What problem does it solve?

With this work it will be easier, safer, and cleaner to create new executors and manage existing executors. The changes will improve code quality for Executor logic and it will allow fully featured executors to be built by third parties as plugins.

Why is it needed?

The current implementation, which is leaking the Executor interface into core Airflow code, reduces the code quality of the Airflow project. Contributors are required to be aware that Executor logic is spread throughout the core Airflow codebase and that such code requires updating; this is brittle and prone to error.
It is also not possible to implement a full featured Executor as a plugin, in all cases, without modifying core Airflow code.

Are there any downsides to this change?

This change should be backwards compatible and add no new features/regressions to Executor behaviour. However, this will be a medium-large refactoring of some Airflow core code which will need careful review and testing.

Which users are affected by the change?

End users of airflow will not be affected. If this proposal is executed properly they should have no awareness that anything has changed under the hood.

Power users, organizations, and contributors who are developing Executors (or features which affect Executors) will be positively affected, see the next section.

How are users affected by the change? (e.g. DB upgrade required?)

It will now be easier and more straightforward for developers to create new Executors. No modifications to core airflow code should be required and an Executor can be delivered entirely through a plugin which implements the BaseExector interface (which will now contain all features of executors).

Other considerations?

One consideration, with respect to coupling 6 specifically, is that Airflow is currently considering migrating to a new UI platform, away from Flask based to React (see AIP-38 Modern Web Application). This may mean that work on remediation for coupling 6 should be deferred until after this migration.

What defines this AIP as "done"?

The above proposals for coupling categories 1-7 are agreed upon, implemented, reviewed, and merged to apache/airflow:main.