Status

StateAbandoned in favour of AIP-40: Deferrable ("Async") Operators
Discussion Threadapache-airflow.slack.com - sig-async-operators
JIRA

AIRFLOW-6395 - Getting issue details... STATUS

Created

2019-12-29

WIP STATUS

  • This AIP is a work in progress to gather requirements and scope, together with sketches of possible implementation details

Motivation

For long-running tasks (e.g. jobs on cloud providers), operators and sensors often poll for task status and/or task outputs to determine the success or failure of a task.  These task monitoring processes are often blocking operations that can incur various problems, including:

  • blocking wait operations that needlessly occupy a worker

    • limited concurrency on local executor

    • wasted resources on distributed executors

  • db-sync operations for rescheduling

  • passing XCom task-ID data

To enable the use of various non-blocking async options for hooks, sensors and operators, an async ecosystem is required and especially an async event loop (executor), task scheduler, and associated asyncio libraries for db-connections etc.  Along with that, various ways to enhance existing blocking code with async options is required.

One possibility to explore is to first add an option for an AsyncExecutor that can be used like the LocalExecutor.  The goal of an initial POC is to enable a Sensor and/or an Operator to use async methods for blocking operations.  For example, when a blocking process is polling for status information from a remote service (cloud operator), the process might invoke a `time.sleep` call between polling periods.  For this AIP to work, any of those `time.sleep` calls should have an option to be replaced with an `asyncio.sleep` call; maybe something like:

async def delay(pause, use_async=None):
    if use_async is None:
        use_async = os.environ.get('AIRFLOW_USE_ASYNC', False)
    if use_async:
        await asyncio.sleep(pause)
    else:
        # blocking function in async function, probably better design patterns than this
        time.sleep(pause)

See also:

This AIP is different than the following, but they share similar goals for optimize concurrency and performance.

Related work on operators that can be stateful and reschedule job status pokes (like sensor-rescheduling)

Related work specifically invoking an event loop (asyncio):

  • WIP for dask in https://github.com/apache/airflow/pull/6984
    • very early days in this WIP, may change substantially depending on tests and additional refactoring
    • note that it might already support all operators due to dask-distributed process management
    • the operators are not required to use asyncio/coroutine implementations to work
    • maybe think of it like asyncio to spawn multiprocessing jobs
  • https://github.com/apache/airflow/pull/5788
    • POC for LocalExecutor to use a worker with asyncio to launch command processes
    • note that processes are spawned and whatever they run cannot yield all the way back up to the main event loop (AFAIK) 
  • Note that asyncio requires an event loop and this programming model is very different from most of the subprocesses used for existing executors, hooks, sensors and operators
    • See also https://pymotw.com/3/asyncio/executors.html
      • asyncio can spawn threads and processes to run blocking tasks using concurrent.futures for thread/process pools
      • AFAIK, the spawned threads and processes cannot yield all the way back to the main loop, they will occupy CPU/RAM resources to run blocking operations

Tangential work

Considerations

Is there anything special to consider about this AIP? Downsides? Difficulty in implementation or rollout etc? 

What change do you propose to make?

For higher concurrency in some areas of interaction with cloud providers, e.g. AWS services, using asyncio patterns might improve performance, esp. for blocking operations like polling for task status on external systems.  

  • start with a small set of functionality to enable an asyncio event loop that can execute tasks with high concurrency for non-blocking operations
    • it should be a small, isolated way to configure and use an asyncio ecosystem in Airflow
    • it might require a special package-install option to add asyncio-compatible libs
    • it might require an example config file specific to asyncio settings
  • Consider https://github.com/aio-libs/aiobotocore as an alternative or supplement to botocore/boto3
    • be wary of compatibility and maintenance issues between the async version and the regular versions
    • be wary of API throttle limits (use async semaphores to limit concurrency?)

Consider options for async event loops, in addition to the asyncio module, e.g.

  • https://distributed.dask.org/en/latest/asynchronous.html
    • The DaskExecutor has an optional init param to use a Client instance, which could be instantiated with async behavior, but it might require additional async wrapping on it that can await the client. Hence, I need to understand how the Airflow scheduler spins up one or more executors to schedule a DAG run, and whether or not a DAG might have a property that indicates that it wants to use async, so that the scheduler can try to use an async executor.
    • The `DaskExecutor.execute_async` could use a distributed client with an async event loop enabled, rather than async futures that might spawn new processes.  It could manage an event loop for async execution, but it's not clear how it would work with tasks (hooks, sensors, operators) that are not specifically designed for asyncio behavior.

Consider options for async-db drivers, e.g.

Async options to watch the DAG-BAG

What problem does it solve?

  • High concurrency with coroutines
    • optimal use of single cores
    • advantages of coroutines:
      • in-memory state for hook/sensor/operator (less db-overheads?)
      • explicit breakpoints for blocking operations

Why is it needed?

  • An async ecosystem
    • async event loop
    • async compatible libraries
  • async loop(s) obtained or started, as necessary, in various places:
    • airflow/jobs might need a new `AsyncJob`
    • anything that uses `ExecutorLoader.get_default_executor()` where it could return an async executor might need an async loop and whether it gets it from some global scope or from an executor is not entirely clear
      • `airflow.models.dag.DAG.run`
      • `airflow.models.dagbag.DagBag.executor` could be an async executor
      • `airflow.www.views.Airflow.run` could get an async executor
      • `airflow.cli.commands.task_command._run_task_by_executor` likewise
  • Clear documentation and examples of how to configure Airflow for optimal async executor(s)
    • including warnings about compatible db backends
    • include notes about how to configure various concurrency settings
      • note what concurrency settings are ignored and which ones are applied
    • in the first instance, the config settings could be very specific to an AsyncExecutor

Are there any downsides to this change?

  • Creating an easy solution for backward compatible systems
    • Easily running blocking code in an AsyncExecutor
      • adding warnings when async is an option?
    • Refactoring base classes to enable async options?
      • ensuring async is not the default behavior?
  • A potential breaking change is the deprecation/removal of `airflow.executors.base_executor.BaseExecutor.execute_async` because it should not be assumed that all executors can implement async tasks.  The addition of an `AsyncExecutor` may imply changes that introduce an additional base class that is not entirely compatible with the API already defined in the `BaseExecutor` and it should not be assumed that the existing base class should capture all of the possible base classes for executors.  If a new `AsyncExecutor` must inherit from the existing base class (or a modified base class), it could be that it simply overrides a common `execute` method with an async version of it (unless the application of `async def execute` significantly changes the inheritance properties of the method; note that the existing base class defines `execute_async` without any `async def` declaration). 
    • e.g. the `CeleryExecutor` has to throw an exception for `execute_async`

Which users are affected by the change?

  • Mostly backend execution code and configuration
  • Need to consider impacts on UI status indicators
    • may need async compatible db drivers

How are users affected by the change? (e.g. DB upgrade required?)

  • Added configuration options
  • Added documentation for an AsyncExecutor
  • Compatibility with synchronous code
    • A pathway to enable async systems

Other considerations?

  • Debugging
  • CI test suites with/without async

What defines this AIP as "done"?

  • When 1000's of AWS batch jobs can be launched and monitored using an AsyncExecutor on a single CPU core
    • the point is that an async executor should support high concurrency for long-running tasks with blocking operations
    • multiple cores might help with db-connection pooling for all the concurrent async-task-coroutines
  • When both async and sync executors have a common API with properties that allow things to handle them appropriately
    • async executors might need to be used in the context of an existing event loop (or create a new one)
    • sync executors should not be able to submit tasks (jobs) on any async event loop


3 Comments

  1. Consider options for async-db drivers

    We also need to support other databases. MySQL and PostgresSQL are officially supported, but I know that some users also use MSSQL. The use of SQLAlchemy allows us to easily add support for other databases e.g. Google Spanner

  2. Async options to watch the DAG-BAG

    Please keep in mind, that the DAG might change shape, without changing the DAG-file. For example, if the dag is dynamic, and when it executes, it is being pulled from some external system.

    Also, when it comes to performance, Ash did an excellent blog profiling the application, to see where time is being spend: https://www.astronomer.io/blog/profiling-the-airflow-scheduler/