Status

StateDraft
Discussion Thread

https://lists.apache.org/thread/zxpbhwb0nb28st4q9ddnf2900d828xsm

Vote Thread
Vote Result Thread
Progress Tracking (PR/GitHub Project/Issue Label)
Date Created

17.12.2024 07:36

Version Released
AuthorsDavid Blain

Short summary

Allow Triggerers to yield multiple events in a single trigger run, enabling true streaming-style workflows for async operators (e.g., HttpOperator/MSGraphAsyncOperator). This reduces repeated context switching and "ping-pong" between scheduler → worker → triggerer, enabling efficient in-trigger pagination and lazy task expansion.

Motivation (why)

  • Many async operators implement pagination by deferring to triggers and then reentering the operator repeatedly. Even with start_from_trigger, the current system processes only the first TriggerEvent yielded; additional pages require repeated defer/resume cycles.

  • Repeated context switches create scheduler/worker/triggerer overhead and slow throughput for high-volume paginated APIs.

  • A streaming-capable triggerer can yield many events in one run; letting Airflow process those events (without full defer/resume for each page) produces substantial performance gains and allows lazy expansion of downstream tasks as pages arrive.

Goals

  1. Allow triggers to yield many events during a single run and have those events processed incrementally by the operator runtime/scheduler without repeated operator deferral cycles.

  2. Allow operators that know how to consume events to expand mapped tasks or XCom-driven iterables lazily as events arrive.

  3. Preserve serializability guarantees (no unserializable callables in trigger args).

  4. Be backward-compatible: existing triggers/operators continue to work.

Non-goals

  • Making arbitrary XComs iterable across the cluster. (That requires additional XCom semantics; out of scope.)

  • Allowing triggers to carry non-serializable callables/closures. Triggers must remain serializable.


Demo



High-level design

Key idea

Extend the triggerer-to-scheduler event path so that a single trigger run may produce a stream of events which the scheduler can incrementally deliver to the operator context (worker/scheduler) and allow the operator to handle each event without a full defer/resume roundtrip per page.

Components & responsibilities

  • Triggerer: can yield multiple TriggerEvents during run(). It still must be serializable. If the trigger performs pagination, it should do that inside its run() loop and yield each page as an event.

  • Scheduler: must accept and process multiple events from the same trigger-run. Instead of ignoring all but the first yielded event, scheduler accepts the events and binds them to the operator’s next_method handler incrementally.

  • Operator / Worker:

    • Operators supporting streaming implement a handle_trigger_events(events: Sequence[TriggerEvent], context) callback (or extend next_method to accept a batch/stream), which may:

      • process events and optionally expand mapped tasks or create XComs,

      • decide whether trigger should continue producing (e.g., requests more pages) — but triggers are the ones that control pagination inside run().

    • Operators still raise TaskDeferred to start the trigger if they cannot proceed synchronously.

  • Start-from-trigger: still used — when present we skip the initial worker-run step. The AIP assumes start_from_trigger exists and is used (Airflow 3.0+).

API changes / additions

  1. Triggerer run behavior: no signature change, but scheduler processing semantics change: treat yield as a stream of events, not just the first.

  2. Scheduler/Triggerer protocol:

    • Add an event envelope with fields {trigger_run_id, sequence_index, payload, is_last} for each yielded event. This helps ordering and safe incremental processing.

  3. Operator callback:

    • Add next_method(self, event: TriggerEvent | List[TriggerEvent], context) accept either single or batched events, or an optional handle_trigger_events(self, events, context) method.

    • For backward compatibility, if operator implements the old next_method(event, context), the scheduler will call it for each event individually.

  4. DAG parsing checks: if start_from_trigger is enabled and start_trigger_args contains a non-serializable callable, raise at parse time (existing check you already mentioned).

Ordering & consistency concerns

  • Use sequence_index to process events in order from the same trigger run.

  • If operator processing of an event fails, the scheduler should:

    • surface the failure (normal task retry semantics) and

    • stop accepting further events for that run until retry/resume behavior is resolved.

  • If the trigger signals is_last, scheduler can mark the trigger-run completed.

Security / serializability

  • Triggers and all trigger args must remain serializable (no lambdas/closures).

  • DAG parsing validates start_trigger_args types and raises early if a non-serializable callable is present (your existing behavior).

Backward compatibility / migration

  • Existing triggers/operators continue to work: scheduler defaults to single-event processing if operator doesn’t opt-in.

  • Operators that want streaming implement the new batched handle_trigger_events or accept repeated next_method calls. Provide an adapter shim that calls an operator’s single-event next_method repeatedly if the operator hasn’t implemented streaming (so no breakage).

  • Update common async operators (MSGraphAsyncOperator, HttpAsyncOperator) to implement in-trigger pagination and yield many TriggerEvents.

Tests & acceptance criteria

  • Unit tests for:

    • multiple-event processing ordering,

    • failure during mid-stream event handling,

    • serializability check during DAG parse.

  • Integration tests:

    • MSGraphAsyncOperator with 10+ pages yields vs old approach measured in scheduler/worker/triggerer traces (assert fewer context-switch cycles).

  • Performance benchmarks demonstrating reduced scheduler <-> worker <-> triggerer hops.

Example operator migration notes

  • Refactor operator to:

    • Move pagination into the trigger run() loop,

    • Yield each page as a TriggerEvent envelope,

    • Ensure start_trigger_args are serializable,

    • Implement handle_trigger_events to consume events, create XComs, and expand tasks lazily.

Open questions / future work

  • Native iterable XComs (separate AIP).

  • Fine-grained backpressure between triggerer and scheduler (if extremely high event rates).

Event batching strategies (time / size) to trade off latency vs. scheduler invocation overhead.


Sequence diagrams

Below are two diagrams:

  • Current/present behavior (with start_from_trigger but only first yielded event processed leading to repeated defer/resume cycles).
  • Proposed behavior (trigger yields many events in one run; scheduler forwards them incrementally to the operator; fewer context switches).


vs

11 Comments

  1. When I was reading the proposal in the devlist I thought  I understood it but re-reading the AIP I have some questions again. Might be I did not understand the use case fully or the description is drifting into a solution space too early where I am lost.

    (1) From the problem description I understand you have a large XCom and want to drive thousands of mapped tasks. You see a large overhead in the DB by mapping/creating and the during processing updating them. Correct?

    (2) If I understand (1) correct then if you make a node local mapping/execution then you still would like to track all mapped tasks from the UI respectively, correct? Then I would not understand how the DB load would be reduced except that you save round-trips of scheduler→worker. Still all results (and in a crash a failover to another worker) should be recorded on each mapped task?

    (3) If I was wrong in (1) and you want a task local mapping w/o tracking all mappings in the DB then you "just" want to run some mapped tracking within one large task. I think you are free to implement "whatsoever" iterator operator - but if it fails in between then only one atomic state is recorded in the DB and can be seen in the UI and increments of how far the iteration went are not known for the scheduler. So I assume you still want to have the mapping in the UI?

    (4) Consequence of (2) would be that a kind of "mini scheduler" would be needed on the worker to track the progress of individual mapped tasks (which would prevent to round-trip to the central scheduler). And of course this would need to have DB access to track/add all tasks... and this is actually one thing we want/need to get rid of in Airflow 3 (sad)

    (5) Another consequence of (2) would be that you have challenges in case of failover (the worker just dies - who is taking over? What about the progress?) as well as you task mapping is mostly limited to the capacity of one node. How would another scheduler take over and know that on a partial local expanded result where to take over?

    Can you maybe add a diagram (UML communication flow or so?) to have a better understanding of the current flow and the future/target communication flow that you want to achieve?

    Also the problem description introduces with UI problems - how do you think the change will resolve these?

    You describe a lot about async, is the desired feature mainly an async polling for thousands of mapped async external tasks or is it "real work" that is computed e.g. on a celery worker with concurrency (=sync) that is to be tracked. Or an universal feature (sync+async)? (I am asking because long term there was also a demand to track results from external sub-workflows in airflow like Cosmos with dbt such that the external orchestrator runs and just reports to Airflow further details but Airflow is delegating the details so something more native - I just believe we have no AIP for this?)

  2. Hello Jens, thank you for your elaborate response.

    The proposal I've put down now is indeed already oriented more into the solution I came up with yet it doesn't mean it has to be that way, I just used it to describe what the was the problem we wanted to solve and how we solved it with was already available in Airflow 2.

    (1) Yes you are correct.

    (2) We personnally don't need the tracking of the individual tasks within the UI, but I think Jarek and Ash would have like to see a more generic solution in which we have both, so local execution of all mapped tasks but also being able to track the progress/state visually within the UI, so that you don't loose that aspect in Airflow when using this alternative approach, which is understandable in a product standpoint.  I don't know how this could be accomplished efficiently without overloading the database, unless you have a distributed postgres behind the scenes with partitioned tables, but maybe there is also another possible solution to solve this.
        
    (3) Yes, in our case we don't have the need to individually track the progress of all mapped tasks, so it's an all or nothing, comparable with a for loop within a PythonOperator.  An improvement I haven't implemented yet in case of failure, is to track the already executed mapped tasks through XCom, so that when we would re-run that same iterated operator task again, it would only re-run the failed ones, instead of just re-running them all.  I think this feature is technically feasible.
        
    (4) That would be the ideal solution and that's also I think what Jarek was after, but that would mean we would have 2 types of schedulers, the global one and a specialized micro/local scheduler per worker, which by default would always delegate to the global one unless you want the alternative local execution.  In the solution I came up with the local scheduling is done within the IterableOperator, which of course means code duplication for the scheduling aspect, but that problem will always exist even if we would introduce the notion of a local scheduler.  The later would be a better approach, as you could introduced mutiple local schedulers with different implementations.
        
    (5) That I described in point 3, I think with the current solution we could keep track of it through a list of mapped indices stored in an XCom, as an XCom is linked to a DagRun, we could retrieve it when re-running the task.  Of course XCom's also need DB access, unless we will have alternative XCom backend providers in the future, we for example use a Kubernetes PVC XCom backend, so XCom's aren't a real problem in our case.


    I mention the async aspect a lot, because we use a lot the MSGraphAsyncOperator, which is a deferrable operator, as the msgraph core library from Microsoft is implemented as async only, hence why it's implemented that way but then also gives you the opportunity to apply the producer/consumer pattern and avoids blocking workers while waiting for responses, which in case of paged results can be very handy (I also started a PR to do the same with the GenericTransfer operator), as you can distribute the load between the workers and triggerers.  The solution I've implemented and proposed here is universal, as we use the IterableOperator as well for async operations (IO related) as well as for real multithreaded computing/processing tasks written in pure Python using the PythonOperator.  The current implementation doesn't support @task decorated methods yet nor task groups, the later would be more challenging to implement with the current solution I've done, so that's why a micro/local scheduler would maybe be a better approach.

    If you don't do mapped task tracking, you don't have the database/scheduling problem, the UI performance problem is a side effect of the former one.

    I'll try to draw a diagram to visually represent the alternative flow.

    1. Okay, yeah so if you don't want to have tasks tracked in UI, then you could all build into a custom Operator w/o the need to touch anything in the scheduler. So a custom operator could just maintain an (in memory or somewhat serialized) list to digest and make the task list async. Maybe if you want to have it re-usable can be a base class of an operator and you can inherit for the specific function. But this is also too deep into solution space already.

      Just to feedback on (3) please be aware that current model is that if a task has failed and is restarted, all XCom is purged. There is no versioning on XCom (currently) and the purging is made before the next attempt starts.

      1. Hello Jens, with the new PR, everything will work as it's already the case with the classic partial/expand mechanism, except the task unmapping won't be done with the operator itself but by the scheduler, which means this is a "huge" design change but will allow us to support iterable XCom's (e.g. streaming).  This is already explained in the updated description of the AIP above.  This means there won't be a need of a custom IterableOperator anymore, but as suggested by Jarek Potiuk in the past, it was best to wait until Airflow 3 was released, hence why I've now started implementing it.  I've also already discussed a bit with Ash Berlin-Taylor , so now we will have to see have with continue on this as I'm sure there will still be some design changes needed.  So on a user point a view nothing changes, it just how the partial/expand is being done under the hood that will change and the support for deferred iterable XCom's which can be handy when you have operators returning large paged results which will lead to faster execution and less memory usage.  I've already tested this solution in our Airflow 3 devcontainer and it works.

  3. Just to clarify. I do not want to have "mini" or "remote" scheduler at all. I think that with Airflow 3 architecture, a LOT (if not all) concerns that lead to the proposal will be alleviated, and with certain optimisations, when you mark all mapped  task instances of the same tasks (and in more general case, for example, all tasks belonging to a task group) to be executed on the same machine, you can achieve pretty much the same optimisations you are after - while not introducing a new "scheduling construct" - i.e. kind of sub tasks within a single airflow task. I think this is what the proposal boils down to, and I still think it can be done better from the product perspective.

    I think a lot of overhead that is experienced now, will be much, much less in the new Architecture. That is one of the main goals of Airlfow 3 - to get rid of the DB connections overload, unnecessary excessive communication and allow for certain optimisation on the DB side of the things by "concentrating" the DB access in a few central components. Simply speaking the performance gains you see with Airflow 2 will be largely gone in Airlfow 3, because of the architecture. And I would not even consider elaborating too much about this proposal before we see Airflow 3 in action and see how much improvement you can get there, and most importantly, whether similar improvements cannot be achieved by optimising some of the existing flows. For example I can imagine - when we use some kind of "task affinity" - that the communication Task API → Workers might be heavily optimized - for example batched eventually and achieve pretty much comparable performance as when iterating over mapped operators in the same task. If we know that mapped tasks are going to all be executed in a single machine, this is completely no problem (just a bit of engineering effort) to optimize things away:

    • send all xcom values as single message
    • heartbeat only the "worker instance" not individual tasks within the worker instance
    • batch sending status updates back to Task SDK
    • and so on

    All that could be done at the level of Task SDK API and Airflow code, without impacting the fact that "individual task instances" keep their individual task instance status, can be monitored and individually cleaned /interacted with from the UI etc. 


    For me this proposal is ONLY about optimising performance, and we are **just** doing it for Airflow 3 - and IMHO we should build on top of those optimizations rather than looking at the current Airflow 2 performance issues. This is already past. Let's look into the future.

  4. Hello Jarek, then I think it's best to wait until those optimisations in Airflow 3 are done before continuing on this AIP imho?  Are there already branches/PR where we can follow the work that's being done there, because that's really interesting me.

    1. Main (smile). AIP-72 is the main indicator.

  5. Hi David,

    I've also considered that a large volume of XComs could become a performance bottleneck. One idea I had was to implement a custom local XCom backend as a simple and efficient solution. For example, using something like RocksDB as a low-level embedded key-value store for the XCom backend could help eliminate roundtrips to the metadata database and reduce the load caused by storing or setting large amounts of XCom data.

    Hope this idea helps!

    1. Hello Zhe, your idea would also be a good one to explore, but the problem I'm trying to solve here is not how the XCom's are being handled/persisted, but how the scheduler interacts with the XCom's to expand tasks and avoiding the need to know all XCom values (and thus their length) to be able to expand the tasks upon, so that we have like a producer/consumer or streaming behavior.  The current implementation doesn't support (deferred) iterable XCom's, I've create a new PR which implements this, but I'm sure a lot will have to change to make this into a definitive solution.

  6. Hi David Blain after re-reading from a "longer time ago" and also having some use cases in mind I am now 50% with you that a "Lazy Mapping" or "Deferred Operator Mapping" might be useful. I am not sure if compatible but I see the following use cases:

    • We had  a medium sized Mapped Task Group with ~6 tasks where Dag Authors attempted to map via a list that had 2000 elements. That makes 12000 tasks in a DAG run and because the list was passed in as a DagRun.conf the scheduler died in OOM becasue it attempted to load 12k tasks with a cross product of the DagRun.conf into memory. Reducing the list to a couple of hundred reduced the problem footprint but still scheduling a very large mapped lis takes excessive time in Scheduler. (Especially in Mapped Task groups as it needs to check many times in the loop next elegible tasks to schedule+queue and looping over the list many many times.
      → So in this case I assume an incremental mapping (e.g. Map always 50 instances, once they are done map the next 50) would be helpful to reduce latency of first tasks landing as well as reduced scheduler overhead in looping over a big list of open tasks. This would solve our current problem that in Mapped Task Groups we are not sure if we can get very much over the default limit of 1024. But we have demand to make it rather 10k or 100k tasks in a very long and big flow.
    • One other Use Case we have seen is also a rather "Non Dag" case where actually we are lagging an option to implement a Loop in a Dag. Assume you make a model training and based on the results you need to decide if you run another training epoch. Basically you want to loop the ML pipeline until either you have good results or you give-up the experiment. Today you would need a driver Dag that controls a training Dag that calls in a loop.
      → How about if such mapping is "mis-used" ==leveraged to define the Mapped Task Group as the Loop base. You run one cycle and at the end of the group you check for results. If not good enough you add one element to the mapping such that another set is scheduled. At the end after N loops you still have all tasks with logs and XCom and you can still via the UI inspect the overall duration and results. The only "problem" is that the "Map Index" is mis-used to be a kind of "Loop Iteration Counter".
      Does also not mean that the Mapping at the end needs to be made via a distinct XCom but assuming there might be a generated task in the Mapped Group which implements an interface to check and potential add elements to mapping as increment

    Jarek Potiuk in Devlist you also had a "loud thought" about support for non Dag type workflows for have loops. Would the second point be in a direction you thought about?

    So TLDR, do we maybe need to re-phrase the AIP for "Late Mapping" which can b used for "Lazy Mapping", "Incremental Batch Expansion" and as well as "Mapping for Workflow Loops"?

    1. Hi Jens,

      Thanks for your feedback — I’ve made some further updates to the AIP in the meantime.

      Regarding your first point: I believe the issue with the DB "dying" is expected behavior, since DAG run config parameters are persisted directly in the Airflow metadata database. As far as I know, this ignores the XCom backend in Airflow 3 (as dagrun config isn't an XCom, unless that changed recently), which becomes problematic when trying to pass large payloads like a list of parameters.  Maybe DataSets or Assets in Airflow 3 could also be an option to solve this issue?

      To work around this, we pass the dag_id, task_id, and run_id of the triggering DAG to the config of the triggered DAG. Then, in the triggered DAG, we pull the XCom value from the triggering DAG using those identifiers — this leverages the XCom backend instead of persisting everything in the DB, which avoids overloading it.

      We ran into a similar issue in Airflow 2 when expanding around 10k mapped task instances, especially when rendering the UI, which often crashed the database. Airflow 3 seems to handle this better, though canceling a running DAG run with many mapped tasks still causes problems. That could be improved by having the scheduler perform more periodic checks (e.g. is DagRun not cancelled?) when executing mapped tasks, instead of the current fire-and-forget strategy.

      That being said, I’m not entirely sure this AIP directly addresses your issue — it seems more related to how DAG run configs are persisted rather than task expansion logic. Passing large config payloads via the DB will remain problematic regardless, and using XComs as an indirection (as we did) is likely the safer approach, even though it may feel like a workaround.

      On your second point, I believe it's a different use case that falls outside the scope of this AIP — unless I misunderstood. If you're referring to dynamically looping over a DAG run until a certain condition is met, that's more of a control-flow issue. However, if the loop happens within the DAG itself, then yes: with a deferred iterable XCom, you could implement a streaming/producer-consumer pattern where new values are pulled incrementally until your exit condition (e.g., "enough good results") is satisfied — which aligns with the goals of this proposal.

      Let me know if I misunderstood anything!

      Best regards,
      David