Status
Motivation
This AIP is a proposition to support an alternative way of expanding multiple XCom’s on operators without the need to know how may tasks will be expanded in advance, which is the case with streams or iterables.
That’s why I would also like to introduce then notion of streamable XCom's. In the current Airflow implementation, only list and dicts are supported as XCom collection types on which can be expanded on (e.g. SchedulerDictOfListsExpandInput and SchedulerListOfDictsExpandInput).
I would like to add support for iterable's (e.g. lazy evaluation collection). The reason why I would like to introduce the support of iterables is to allow the implementation of streamable XCom's. In case of the HttpOperator or the MSGraphAsyncOperator for example, when those return pages results, the operator depending on the results of that operator needs to wait until all pages have been loaded before being able to process them.
This has 4 disadvantages:
- performance, as depending operators wait until all pages have been loaded before being able to start
- this also means the iteration is done twice as opposed to a streaming solution as there both steps (e.g. tasks) could be done within the same iteration
- memory usage, as all pages have to be loaded into memory as a materialized XCom before the next operator can consume it
- as XCom's have to be able to determine their length in advance, it's impossible to implement the filter functionality (like map or zip or concat) in the current implementation, as filter will dynamically affect the length of the returned XCom length.
Proposal
Possible workaround?
You could argue that you could write a PythonOperator or a task decorated method in which you loop over the multiple inputs from the XCom to pass as an argument to the operator or even a hook. While the later would be a valuable solution, the first one wouldn’t as it’s a bad practise to execute an operator from within a PythonOperator, see the discussion about this topic on the devlist.
There is already an safeguard implemented for this which checks if an operator is executed from a PythonOperator, and if so, logs a warning stating an operator cannot be called outside of a TaskInstance. In the future this will probably become prohibited and will raise an AirflowException in that case.
But let’s hypothetical assume this would still be allowed, how would you loop the inputs from an XCom to an operator if that operator is deferrable? You would need to take multiple aspects into account.
First of all, you would need to catch the raised TaskDeferred exception, as this is how a deferrable operator works. The TaskDeferred exception contains the triggerer associated to the deferred operator to be executed, which behinds the scenes returns an async generator. This means that you will have to cope with the event loop of asyncio to be able to run the async triggerer from your PythonOperator, as the later one isn’t executed in an async way. Maybe this is also a good moment to start thinking of natively supporting async method’s in the PythonOperator without worrying about coping with the event loop (e.g. PythonTriggerer)?
Next to that, once you achieved to execute the triggerer, you will also have to check if a next_method was specified, which has to be executed on the deferred operator once the trigger has completed.
And last but not least, the execution of the next_method could also re-raise a TaskDeferred exception if the deferred operator implements the producer/consumer pattern, which means you’ll have to take into account recursion. For example the MSGraphAsyncOperator implements the producer/consumer pattern in such a way that the worker triggers the request to the MS Graph API, but instead of blocking the worker waiting for the response to arrive, releases it and delegates it to the triggerer, avoiding blocking workers unnecessarily. That way when the triggerer receives the response, it gives the received response back to the operator (e.g. worker) without blocking the worker while awaiting for the response.
That’s already a lot of technical challenges you have to solve if you want to execute a (deferrable) operator from within a loop in a PythonOperator.
Beside that, you maybe would also like to introduce some multithreading to speed up the processing instead of just looping in a sequential manner, unless you want sequential execution in the given input order, which is also a issue raised in this discussion. But there you will also have to be careful, because you just can’t execute a deferable operator having async code through a ThreadPoolExecutor.
What problem does it solve?
Faster execution of downstream operators if the initial operator is returning a paged XCom result.
In the screenshot above, you can see the new lazy expandable task mapping implementation in Airflow 3. It demonstrates how the MSGraphAsyncOperator
returns a deferred iterable (as an XCom), allowing downstream tasks like SQLInsertRowsOperator
to expand lazily as the scheduler (e.g. TaskMap) iterates over the paged results.
This means the SQLInsertRowsOperator
does not have to wait for all pages to be fetched before expanding into mapped tasks. Instead, the MSGraphAsyncOperator
initially returns only the first page, and as the scheduler expands the mapped tasks, it fetches additional pages on demand.
This approach offers significant improvements over the traditional model, where the MSGraphAsyncOperator
would have to fetch all pages upfront before expansion could begin—resulting in slower performance and higher memory usage.
Why is it needed?
Improves performance and significantly reduces waiting times.
Are there any downsides to this change?
With the current proposal, task expansion is fully managed by the scheduler using the TaskMap
. Traditionally, this requires knowing the length of the XCom in advance in order to expand the mapped task. To support lazy XCom evaluation, however, the XCom must now be resolved by the scheduler rather than the operator. The key advantage of this change is that the scheduler no longer needs to know the total length of the XCom upfront, enabling partial or streaming-style task expansion. This means the scheduler won't create mapped tasks instances anymore, but fully unmapped task instances, hence why it has to be resolved with the scheduler as opposed to the original implementation. A downside in the current POC is that the evaluation of the deferred itrable XCom is being done within the scheduler instead of the worker/triggerer, but maybe there we can check with Ash Berlin-Taylor how this could be improved.
In this model, the scheduler initially expands only the first n task instances from the MappedOperator
. The remaining task instances are expanded asynchronously by the TaskExpansionJobRunner
, which allows new tasks to be created incrementally as more data becomes available. Meanwhile, the executor can begin running the already-expanded tasks immediately, enabling a producer-consumer style execution and improving overall throughput.
This leads to a more dynamic and efficient execution model, especially useful for deferred or paginated iterables, where loading all data upfront could be slow, memory-intensive, or infeasible.
Which users are affected by the change?
None
How are users affected by the change? (e.g. DB upgrade required?)
None
What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3)
None, as this is just an alternative way of expanding XCom's on a operator, instead of calling the existing partial method of the MappedOperator, you can now call the iterate method.
11 Comments
Jens Scheffler
When I was reading the proposal in the devlist I thought I understood it but re-reading the AIP I have some questions again. Might be I did not understand the use case fully or the description is drifting into a solution space too early where I am lost.
(1) From the problem description I understand you have a large XCom and want to drive thousands of mapped tasks. You see a large overhead in the DB by mapping/creating and the during processing updating them. Correct?
(2) If I understand (1) correct then if you make a node local mapping/execution then you still would like to track all mapped tasks from the UI respectively, correct? Then I would not understand how the DB load would be reduced except that you save round-trips of scheduler→worker. Still all results (and in a crash a failover to another worker) should be recorded on each mapped task?
(3) If I was wrong in (1) and you want a task local mapping w/o tracking all mappings in the DB then you "just" want to run some mapped tracking within one large task. I think you are free to implement "whatsoever" iterator operator - but if it fails in between then only one atomic state is recorded in the DB and can be seen in the UI and increments of how far the iteration went are not known for the scheduler. So I assume you still want to have the mapping in the UI?
(4) Consequence of (2) would be that a kind of "mini scheduler" would be needed on the worker to track the progress of individual mapped tasks (which would prevent to round-trip to the central scheduler). And of course this would need to have DB access to track/add all tasks... and this is actually one thing we want/need to get rid of in Airflow 3
(5) Another consequence of (2) would be that you have challenges in case of failover (the worker just dies - who is taking over? What about the progress?) as well as you task mapping is mostly limited to the capacity of one node. How would another scheduler take over and know that on a partial local expanded result where to take over?
Can you maybe add a diagram (UML communication flow or so?) to have a better understanding of the current flow and the future/target communication flow that you want to achieve?
Also the problem description introduces with UI problems - how do you think the change will resolve these?
You describe a lot about async, is the desired feature mainly an async polling for thousands of mapped async external tasks or is it "real work" that is computed e.g. on a celery worker with concurrency (=sync) that is to be tracked. Or an universal feature (sync+async)? (I am asking because long term there was also a demand to track results from external sub-workflows in airflow like Cosmos with dbt such that the external orchestrator runs and just reports to Airflow further details but Airflow is delegating the details so something more native - I just believe we have no AIP for this?)
David Blain
Hello Jens, thank you for your elaborate response.
The proposal I've put down now is indeed already oriented more into the solution I came up with yet it doesn't mean it has to be that way, I just used it to describe what the was the problem we wanted to solve and how we solved it with was already available in Airflow 2.
(1) Yes you are correct.
(2) We personnally don't need the tracking of the individual tasks within the UI, but I think Jarek and Ash would have like to see a more generic solution in which we have both, so local execution of all mapped tasks but also being able to track the progress/state visually within the UI, so that you don't loose that aspect in Airflow when using this alternative approach, which is understandable in a product standpoint. I don't know how this could be accomplished efficiently without overloading the database, unless you have a distributed postgres behind the scenes with partitioned tables, but maybe there is also another possible solution to solve this.
(3) Yes, in our case we don't have the need to individually track the progress of all mapped tasks, so it's an all or nothing, comparable with a for loop within a PythonOperator. An improvement I haven't implemented yet in case of failure, is to track the already executed mapped tasks through XCom, so that when we would re-run that same iterated operator task again, it would only re-run the failed ones, instead of just re-running them all. I think this feature is technically feasible.
(4) That would be the ideal solution and that's also I think what Jarek was after, but that would mean we would have 2 types of schedulers, the global one and a specialized micro/local scheduler per worker, which by default would always delegate to the global one unless you want the alternative local execution. In the solution I came up with the local scheduling is done within the IterableOperator, which of course means code duplication for the scheduling aspect, but that problem will always exist even if we would introduce the notion of a local scheduler. The later would be a better approach, as you could introduced mutiple local schedulers with different implementations.
(5) That I described in point 3, I think with the current solution we could keep track of it through a list of mapped indices stored in an XCom, as an XCom is linked to a DagRun, we could retrieve it when re-running the task. Of course XCom's also need DB access, unless we will have alternative XCom backend providers in the future, we for example use a Kubernetes PVC XCom backend, so XCom's aren't a real problem in our case.
I mention the async aspect a lot, because we use a lot the MSGraphAsyncOperator, which is a deferrable operator, as the msgraph core library from Microsoft is implemented as async only, hence why it's implemented that way but then also gives you the opportunity to apply the producer/consumer pattern and avoids blocking workers while waiting for responses, which in case of paged results can be very handy (I also started a PR to do the same with the GenericTransfer operator), as you can distribute the load between the workers and triggerers. The solution I've implemented and proposed here is universal, as we use the IterableOperator as well for async operations (IO related) as well as for real multithreaded computing/processing tasks written in pure Python using the PythonOperator. The current implementation doesn't support @task decorated methods yet nor task groups, the later would be more challenging to implement with the current solution I've done, so that's why a micro/local scheduler would maybe be a better approach.
If you don't do mapped task tracking, you don't have the database/scheduling problem, the UI performance problem is a side effect of the former one.
I'll try to draw a diagram to visually represent the alternative flow.
Jens Scheffler
Okay, yeah so if you don't want to have tasks tracked in UI, then you could all build into a custom Operator w/o the need to touch anything in the scheduler. So a custom operator could just maintain an (in memory or somewhat serialized) list to digest and make the task list async. Maybe if you want to have it re-usable can be a base class of an operator and you can inherit for the specific function. But this is also too deep into solution space already.
Just to feedback on (3) please be aware that current model is that if a task has failed and is restarted, all XCom is purged. There is no versioning on XCom (currently) and the purging is made before the next attempt starts.
David Blain
Hello Jens, with the new PR, everything will work as it's already the case with the classic partial/expand mechanism, except the task unmapping won't be done with the operator itself but by the scheduler, which means this is a "huge" design change but will allow us to support iterable XCom's (e.g. streaming). This is already explained in the updated description of the AIP above. This means there won't be a need of a custom IterableOperator anymore, but as suggested by Jarek Potiuk in the past, it was best to wait until Airflow 3 was released, hence why I've now started implementing it. I've also already discussed a bit with Ash Berlin-Taylor , so now we will have to see have with continue on this as I'm sure there will still be some design changes needed. So on a user point a view nothing changes, it just how the partial/expand is being done under the hood that will change and the support for deferred iterable XCom's which can be handy when you have operators returning large paged results which will lead to faster execution and less memory usage. I've already tested this solution in our Airflow 3 devcontainer and it works.
Jarek Potiuk
Just to clarify. I do not want to have "mini" or "remote" scheduler at all. I think that with Airflow 3 architecture, a LOT (if not all) concerns that lead to the proposal will be alleviated, and with certain optimisations, when you mark all mapped task instances of the same tasks (and in more general case, for example, all tasks belonging to a task group) to be executed on the same machine, you can achieve pretty much the same optimisations you are after - while not introducing a new "scheduling construct" - i.e. kind of sub tasks within a single airflow task. I think this is what the proposal boils down to, and I still think it can be done better from the product perspective.
I think a lot of overhead that is experienced now, will be much, much less in the new Architecture. That is one of the main goals of Airlfow 3 - to get rid of the DB connections overload, unnecessary excessive communication and allow for certain optimisation on the DB side of the things by "concentrating" the DB access in a few central components. Simply speaking the performance gains you see with Airflow 2 will be largely gone in Airlfow 3, because of the architecture. And I would not even consider elaborating too much about this proposal before we see Airflow 3 in action and see how much improvement you can get there, and most importantly, whether similar improvements cannot be achieved by optimising some of the existing flows. For example I can imagine - when we use some kind of "task affinity" - that the communication Task API → Workers might be heavily optimized - for example batched eventually and achieve pretty much comparable performance as when iterating over mapped operators in the same task. If we know that mapped tasks are going to all be executed in a single machine, this is completely no problem (just a bit of engineering effort) to optimize things away:
All that could be done at the level of Task SDK API and Airflow code, without impacting the fact that "individual task instances" keep their individual task instance status, can be monitored and individually cleaned /interacted with from the UI etc.
For me this proposal is ONLY about optimising performance, and we are **just** doing it for Airflow 3 - and IMHO we should build on top of those optimizations rather than looking at the current Airflow 2 performance issues. This is already past. Let's look into the future.
David Blain
Hello Jarek, then I think it's best to wait until those optimisations in Airflow 3 are done before continuing on this AIP imho? Are there already branches/PR where we can follow the work that's being done there, because that's really interesting me.
Jarek Potiuk
Main
. AIP-72 is the main indicator.
Zhe You Liu
Hi David,
I've also considered that a large volume of XComs could become a performance bottleneck. One idea I had was to implement a custom local XCom backend as a simple and efficient solution. For example, using something like RocksDB as a low-level embedded key-value store for the XCom backend could help eliminate roundtrips to the metadata database and reduce the load caused by storing or setting large amounts of XCom data.
Hope this idea helps!
David Blain
Hello Zhe, your idea would also be a good one to explore, but the problem I'm trying to solve here is not how the XCom's are being handled/persisted, but how the scheduler interacts with the XCom's to expand tasks and avoiding the need to know all XCom values (and thus their length) to be able to expand the tasks upon, so that we have like a producer/consumer or streaming behavior. The current implementation doesn't support (deferred) iterable XCom's, I've create a new PR which implements this, but I'm sure a lot will have to change to make this into a definitive solution.
Jens Scheffler
Hi David Blain after re-reading from a "longer time ago" and also having some use cases in mind I am now 50% with you that a "Lazy Mapping" or "Deferred Operator Mapping" might be useful. I am not sure if compatible but I see the following use cases:
→ So in this case I assume an incremental mapping (e.g. Map always 50 instances, once they are done map the next 50) would be helpful to reduce latency of first tasks landing as well as reduced scheduler overhead in looping over a big list of open tasks. This would solve our current problem that in Mapped Task Groups we are not sure if we can get very much over the default limit of 1024. But we have demand to make it rather 10k or 100k tasks in a very long and big flow.
→ How about if such mapping is "mis-used" ==leveraged to define the Mapped Task Group as the Loop base. You run one cycle and at the end of the group you check for results. If not good enough you add one element to the mapping such that another set is scheduled. At the end after N loops you still have all tasks with logs and XCom and you can still via the UI inspect the overall duration and results. The only "problem" is that the "Map Index" is mis-used to be a kind of "Loop Iteration Counter".
Does also not mean that the Mapping at the end needs to be made via a distinct XCom but assuming there might be a generated task in the Mapped Group which implements an interface to check and potential add elements to mapping as increment
Jarek Potiuk in Devlist you also had a "loud thought" about support for non Dag type workflows for have loops. Would the second point be in a direction you thought about?
So TLDR, do we maybe need to re-phrase the AIP for "Late Mapping" which can b used for "Lazy Mapping", "Incremental Batch Expansion" and as well as "Mapping for Workflow Loops"?
David Blain
Hi Jens,
Thanks for your feedback — I’ve made some further updates to the AIP in the meantime.
Regarding your first point: I believe the issue with the DB "dying" is expected behavior, since DAG run config parameters are persisted directly in the Airflow metadata database. As far as I know, this ignores the XCom backend in Airflow 3 (as dagrun config isn't an XCom, unless that changed recently), which becomes problematic when trying to pass large payloads like a list of parameters. Maybe DataSets or Assets in Airflow 3 could also be an option to solve this issue?
To work around this, we pass the
dag_id
,task_id
, andrun_id
of the triggering DAG to the config of the triggered DAG. Then, in the triggered DAG, we pull the XCom value from the triggering DAG using those identifiers — this leverages the XCom backend instead of persisting everything in the DB, which avoids overloading it.We ran into a similar issue in Airflow 2 when expanding around 10k mapped task instances, especially when rendering the UI, which often crashed the database. Airflow 3 seems to handle this better, though canceling a running DAG run with many mapped tasks still causes problems. That could be improved by having the scheduler perform more periodic checks (e.g. is DagRun not cancelled?) when executing mapped tasks, instead of the current fire-and-forget strategy.
That being said, I’m not entirely sure this AIP directly addresses your issue — it seems more related to how DAG run configs are persisted rather than task expansion logic. Passing large config payloads via the DB will remain problematic regardless, and using XComs as an indirection (as we did) is likely the safer approach, even though it may feel like a workaround.
On your second point, I believe it's a different use case that falls outside the scope of this AIP — unless I misunderstood. If you're referring to dynamically looping over a DAG run until a certain condition is met, that's more of a control-flow issue. However, if the loop happens within the DAG itself, then yes: with a deferred iterable XCom, you could implement a streaming/producer-consumer pattern where new values are pulled incrementally until your exit condition (e.g., "enough good results") is satisfied — which aligns with the goals of this proposal.
Let me know if I misunderstood anything!
Best regards,
David