Status

StateCompleted
Discussion Thread

https://lists.apache.org/thread/ztnfsqolow4v1zsv4pkpnxc1fk0hbf2p

Vote Thread
Vote Result Thread
Progress Tracking (PR/GitHub Project/Issue Label)https://github.com/apache/airflow/pull/60268
Date Created

2025.12.08

Version Released3.2.0
AuthorsDavid Blain

Motivation

At our company, we strive to avoid custom code in Airflow as much as possible to improve maintainability. For years this meant favouring dedicated Airflow operators over Python operators.

However, in Airflow 3, as the number of deferred operators in our DAGs continued to grow, we began facing severe performance issues with deferrable operators, which forced us to re-evaluate that approach.

Initially we expected deferrable operators to improve performance for I/O-related tasks—such as REST API calls—because triggerers follow an async producer/consumer pattern. But in practice we discovered the opposite.

Why Deferrable Operators Became the Bottleneck

Deferrable operators and sensors delegate async work to triggerers. This is perfectly fine for lightweight tasks such as polling or waiting for messages on a queue like sensors. But in our case:

  • MSGraphAsyncOperator performs long-running async operations.
  • HttpOperator in deferrable mode can perform long-running HTTP interactions, especially if pagination is involved.
  • FTPOperator doesn't even have a deferred mode, but there is an SFTPHookAsync.
  • Both can return large payloads.
  • Triggerers must store yielded events directly into the Airflow metadata database.

Triggerers are not designed for sustained high-load async execution or large data transfers. Unlike Celery workers, triggerers scale poorly and quickly become the bottleneck.

Yielded events from triggers are stored directly in the Airflow metadata database because, unlike workers, triggers cannot leverage a custom XCom backend to offload large payloads, which can lead to increased database load and potential performance bottlenecks.

Dynamic task mapping with deferrable operators amplifies the problem even further which AIP88 partially solves. Triggerers also cannot be run on the Edge Executor as triggerers are still tightly coupled with the Airflow metadata database (possibly addressed in AIP92).

Rethinking the approach: Async hooks + Python tasks

These limitations led us to reconsider calling async hooks directly from Python @task decorated functions or PythonOperators, thus avoiding deferrable operators and thus triggerers entirely.

Operators are wrappers around hooks. Wellwritten operators should contain little logic and delegate all the work to the hooks which do the real work, so why not call them directly?  This idea is also a bit in line with what Bolke already presented in 2023.

Advantages of this approach include:

  • No dynamic task mapping needed when iterating—just loop in Python, unless you really need to track each individual step but that comes with a cost.
  • Massive reduction in scheduler load.
  • No triggerers involved.
  • Async code can run on Edge Workers.
  • Celery workers scale far much better than triggerers, so by moving from deferred operators and thus triggerers to async operators on celery workers, our performance issues on the triggerer were gone and run times were much shorter probably because the trigger mechanism also puts more load on the scheduler.
  • Sync or async doesn’t make any difference in performance, unless you have to execute the same async function multiple times, that’s when async shines compared to sync especially with I/O related operations, as there you make sure the Python isn't blocked waiting on IO-related resources, while on task is waiting on an IO-resource, Python can already prepare another IO call or process the response from an already completed IO call, which avoids waiting times and thus optimise the compute time.


But why would we need native support for async in PythonOperator

Why not just use asyncio.run() in a regular non-async python method?

Well yes, in Airflow 2 that would have worked without any issue, as we didn't have Task SDK and thus also didn't have the comms supervisor.  But in Airflow 3 that's a different story as we don't have direct access anymore to the Airflow database as this is one of the things we wanted to avoid in Airflow 3 as this was dangerous.  Now every retrieval of a connection or a variable for example is done through the comms supervisor, which underneath will do a REST call to the API server which in it's turn will get the data from the database, which perfectly isolates the user code from the Airflow database.  So yes, doing so in Airflow 3 will lead to an Not Implemented Error, as the CommsDecoder in a sync operator will use the synchronous send method to do it's API calls, but in an asynchronous context it will try to use the asend method which exists but is not implemented yet.  This problem is also resolved within the PR associated to this AIP, as this has some implications as the CommsDecoder can be used in both context's, so some safeguards had to be implemented to make sure it works in both cases, something an Airflow DAG authors doesn't want to be bothered with.

Also, just allowing the PythonOperator to execute synchronous as asynchronous code out of the box, without bothering about the event loop, just makes it easier and nicer to read.  Now you can just decorate an Aiflow task as async, which makes the intend directly clear at a higher level.  Now we can just await async functions directly in the Airflow decorated tasks.

Concrete Example: Async SFTP Downloads

Below is an example comparing the download of ~17,000 XML-files and storing into our Datawarehouse. A single Celery worker can orchestrate many concurrent downloads using asyncio. A semaphore (here used internally by the SFTPClientPool)  protects the SFTP server from being overloaded.

Benchmark results:

Approach

Environment

Time

Mapped SFTPOperator

production

3h 25m 55s

PythonOperator + SFTPHook

local laptop

1h 21m 09s

Async Python task + SFTPHookAsync (without pool)

local laptop

8m 29s

Async Python task + SFTPClientPool

production

3m32s


Example with async @task decorated python task and async hook:

@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
	import logging
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPHookAsync

    logging.info("number of files: %d", len(files))

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
    async def download_file(file):
        logging.debug("downloading: %s", file)
        buffer = BytesIO()
        ssh_conn = await SFTPHookAsync(sftp_conn_id=sftp_conn)._get_conn()
        async with ssh_conn.start_sftp_client() as sftp:
        	async with sftp.open(file, encoding=xml_encoding) as remote_file:
            	data = await remote_file.read()
	            buffer.write(data.encode(xml_encoding))
    	        buffer.seek(0)
        	return buffer

	for batch in chunked(files, cpu_count() * 2):
    	tasks = [asyncio.create_task(download_file(f)) for f in batch]

        # Wait for this batch to finish before starting the next
        for task in asyncio.as_completed(tasks):
        	result = await task
			# Do something with result or accumulate it and return it as an XCom


Example with async @task decorated python task and async hook pool:

@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
    import logging
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPClientPool

    logging.info("number of files: %d", len(files))

    async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
        @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
        async def download_file(file):
            async with pool.get_sftp_client() as sftp:
                logging.debug("downloading: %s", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                return buffer

        for batch in chunked(files, cpu_count() * 2):
            tasks = [asyncio.create_task(download_file(f)) for f in batch]

            # Wait for this batch to finish before starting the next
            for task in asyncio.as_completed(tasks):
                result = await task
		 	# Do something with result or accumulate it and return it as an XCom

Of course in above example we went even a bit further by introducing an SFTPClientPool which not only limits the concurrency through an asyncio Semaphore but also reuses SFTP connections which avoids recreating a connection per asyncio task which greatly improves performance, something which is impossible to achieve through dynamic mapped tasks.

Example with async @task decorated python task and msgraph hook with pagination:

@task(
	retries=5,
    retry_delay=60,
    max_active_tis_per_dag=1,
    retry_exponential_backoff=False,
    show_return_value_in_logs=False,
)
async def get_json_data(url: str):
	from airflow.providers.microsoft.msgraph.hooks.msgraph import KiotaRequestAdapterHook

	hook = KiotaRequestAdapterHook.get_hook(
    	conn_id=MSGRAPH_CONN_ID, hook_params={"api_version": "beta"}
	)
    return await hook.paginated_run(
		url=url,
        query_parameters={"$format": "application/json", "$top": 10000},
    )


Another example would be while using the SmtpHook, as there the hook supports as well synchronous as asynchronous operations but there is no deferrable EmailOperator:

@task(task_id="bcc_recipients")
async def send_mail(
	recipients: list[str],
    subject: str,
    html_content: str,
    conn_id: str = "smtp",
):
	import asyncio
	import logging
    from more_itertools import chunked
    from os import cpu_count

    from airflow.providers.smtp.hooks.smtp import SmtpHook
    from airflow.sdk import get_current_context

    context = get_current_context()

	# Create a Jinja environment (same as Airflow uses)
    jinja_env = dag.get_template_env(force_sandboxed=False)

    # Render the subject (title)
    rendered_subject = jinja_env.from_string(subject).render(context)

    logging.debug("rendered_subject: %s", rendered_subject)

    # Render the HTML template file (Airflow finds it via template_searchpath)
    rendered_html = jinja_env.get_template(html_content).render(context)

    logging.debug("rendered_html: %s", rendered_html)

    async def send_email_smtp(to: str):
    	# Send the email
    	with SmtpHook(smtp_conn_id=conn_id) as hook:
			await hook.asend_email_smtp(
            	to=to,
                subject=rendered_subject,
                html_content=rendered_html,
                conn_id=conn_id,
            )

	for batch in chunked(recipients, cpu_count() * 2):
    tasks = [asyncio.create_task(send_email_smtp(recipient)) for recipient in batch]

    # Wait for this batch to finish before starting the next
    for task in asyncio.as_completed(tasks):
    	await task


Conclusion

Using async hooks inside async Python tasks provides better performance, scalability, and flexibility, and avoids reliance on triggerers entirely. This hybrid approach—'async where it matters, operators where they make sense'—may represent the future of highperformance Airflow data processing workloads.

So what I wanted to achieve here for Airflow is:
- using asynchronous code in Airflow is just as easy as using synchronous code
- avoid "boiler plate" code to make asynchronous work in synchronous code
- solve the CommsDecoder issue so it supports both synchronous and asynchronous calls

Considerations

In my opinion there are no special considerations to be made, except the new functionality (e.g. supporting async code natively in PythonOperator) will only be available in the new Airflow versions.

What change do you propose to make?

Also allow the PythonOperator (and thus @task decorator) to execute async python code.
What problem does it solve?

It allows DAG authors to natively use async Phyton code and thus async hooks in the PythonOperator, which in some cases offers much better functionality than the deferred operator, especially in cases where the same operation has to be done mutiple times.

Why is it needed?

It allows Airflow DAG authors to natively use async Phyton code and thus async hooks in the PythonOperator without the need of handling event loop themselves or going through a deferred operator which can be limited in the functionalities it offers and the poor performance in some use cases.

Are there any downsides to this change?

No

Which users are affected by the change?

None

How are users affected by the change? (e.g. DB upgrade required?)

No

Other considerations?


What defines this AIP as "done"?

When this PR is merged: https://github.com/apache/airflow/pull/59087

5 Comments

  1. I like the proposal very much! Leightweight and good to use.

    I fear the tradeoffs might need to be added to the documentation but in my view this should not hold-off having this feature:

    • People might be confused by the different execution options. So some guidance is needed to describe which way (Sync, Deferred, Async) to be used.
    • If iterations are made in a task itself, if something breaks in between you have no chance to partly recover. Re-running would mean re-running all. But compring 3h mapped execution with 3m in async I assume it is just a disclaimer, when it just needs 3min then just re-run all.
      (Which might lead to the point that Jarek Potiuk raised to devlist in the other thread that a cool feature woul dbe to have an option to expose a progress in a longer running task... but this is another AIP in my view independent of this here)
  2. David Blain You should update the status of the AIP

    1. Done, I also moved it to Completed AIP, but I don't really know if there was a vote thread, I know Jarek Potiuk posted/asked about voting for this AIP but I don't think there was much response to that.

      1. You can ask for lazy consensus retro-actively  now . That would be best to keep track of it in the devlist David Blain 

  3. Interesting.

    You are making a strong case for supporting async within the PythonOperator pattern over Deferrable Operators. 

    What I am missing is when should users be using Deferrable Operators instead? 

    Also, are you advocating deprecating Deferrable Operators entirely? I am not opposed to it, but definitely something I am curious about your viewpoint here.