DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Per conversation in both the Dev List and the comments in this AIP, the preliminary plan is to break this AIP into two stages. The first will be to create a model and interface to persist state data. The second will be integrating this with "Asset watching" and event-driven scheduling. More information about each of these stages can be found as sub-pages to this doc.
Motivation
Incremental processing is a very common pattern in Airflow use cases. In some pipelines, this is strictly necessary. Although external event driven scheduling is supported today in Airflow through AIP-82, incremental polling is not something that has been easy to implement. In more traditional DAG Authoring (Sensors, Operators, etc.), users are forced to use XCom or Variables for storing values (watermarks) for incremental processing, or inventing and implementing their own solution. In the case of event-driven triggering, it’s even more challenging for users to build their custom incremental processing implantation, due to the lack of XCom and Variable support.
There have been several attempts to store state within a child of the BaseEventTrigger to manage incremental processes, while none of these have proven to be effective or robust. This has seriously hampered the ability for the community to build logic to monitor Assets such as object stores, SQL databases, and other non-queue/stream-based Assets.
For Triggers built for Asset “watching”, it is helpful, if not essential to persist some state.
Considerations
What change do you propose to make?
At a high level, we propose to make the following changes:
- Introducing a
StateVariablemodel for persisting Asset/Trigger state metadata. This would be a general model, not limited to event-driven scheduling, allowing for Airflow users to store and retrieve state as specified in EventTrigger code, as well as within things like Operators, Sensors, etc. - This model should be reusable and extendable in other Airflow components (Workers, Scheduler, etc). The way it fetches and stores the data should also vary depending on the components.
- (Optional) Adding some sort of helper and/or updating BaseEventTrigger to include methods allowing users to store and retrieve values (watermarks).
- Create a pluggable backend, similar to the XCom model, to act as a state store for users not interested in using the “batteries included” model.
- Adding a UI component to surface
StateVariable's.
What problem does it solve?
This solves one of the most glaring problems with building Triggers compatible with "Asset-watching", or event-driven scheduling; the inability to persist state. Without a model like StateVariable , it's nearly impossible to do things like monitor an S3 bucket for new files landing or handle the addition/removal of a new row to a SQL database. Despite being one of the most touted features of Airflow 3.0, the community has been very slow to develop and distribute Event Triggers to be used for "Asset-watching". This AIP aims to address this problem in an elegant and "Airflow-onic" way, using a "batteries included" model, along with a pluggable backend.
Technical Details
Stage 1 - Building the generic StateVariable model
Building the async-aware StateVariable model would require minimal lift. Like the Variable model, this would be “unrelated” to other models, making the creation a bit easier. Such model should be usable by various Airflow components, other than triggers. Therefore, it should provide an abstraction layer that
This model contains two core fields: key and value. Below is the pseudo-code for the StateVariable model.
class StateVariable:
"""
StateVariable class allows various methods fetching and storing values.
"""
key: str
value: str
def set(self, key, value):
# Using db connection as an example here, this method will vary
db_backend.set_value(key=key, value=value)
def get(self):
return db_backend.get_value(key=key)
Despite their similarities, StateVariable will differ from Variable in a number of ways. These include:
StateVariablewill NOT use a traditional Secrets Backend. Instead, it will have its own type of backend.StateVariablewill have its own UI component(s), and can be integrated more "natively" into a DAG/Asset Watcher.StateVariablewill have a sort of "owner" field, something thatVariabledoes not have.
Stage 2 - Extending the backend of StateVariable
The state model will be reusable and extendable in other Airflow components, such as task, worker, scheduler, etc. It should support not only the API, but also the Task SDK for fetching and storing the value for different components.
Stage 3 - Implement the state-enabled Trigger/Asset Watcher
Once the StateVariable model has been created, it can be used within a BaseEventTrigger just like Variable would; using StateVariable.get() and StateVariable.set(...) . This pattern is quite intuitive for DAG authors.
It would require some sort of get and set logic to be implemented in a class that inherits from BaseEventTrigger in order to store and retrieve watermark value. This could be used in a standalone manner, or be implemented in the form of a "helper" method in the BaseEventTrigger class.
Examples
“Watching” an S3 Bucket
One of the most common use-cases for event-driven scheduling will most likely be “watching” an object store for changes. Ideally, each time that an AssetWatcher’s Trigger runs, it will not re-scan the entire bucket. The workflow would look something like this:
- After the first Trigger run, a timestamp of this “scan” for data is recorded.
- On the second run, the Trigger retrieves the timestamp (watermark) from the completion of the last run and only scans the object store for files after that watermark.
- Trigger writes the watermark again.
- Process repeats for N Trigger runs following.
Incremental Data in SQL Databases
Another common use-case for event-driven scheduling is “watching” tables in relational databases; we’ll use Postgres as an example. Outside of traditional CDC, it’s common for data teams to use an updated column in a Postgres database to upsert data. Using the StateVariable approach, Airflow users can use the following workflow. This should feel quite similar to the workflow for “watching” an S3 Bucket; that’s intentional, as we’re trying to implement a repeatable pattern.
- After the first Trigger run, a timestamp of this “scan” for data is recorded and stored to the
table_last_updated_atStateVariable. - On the second run, the Trigger retrieves the watermark (value) from the completion of the last run and only scans the updated column in the desired table for records with a timestamp greater than the watermark.
- Trigger writes the watermark again.
- Process repeats for N Trigger runs following.
Below is an example of a Trigger that executes a SQL query to incrementally read data, with a watermark. Implementing something like the new_watermark and get_watermark methods is not required; it just shows an example of how StateVariable can be used.
from airflow.sdk import StateVariable
class SQLIncrementalQueryTrigger(BaseEventTrigger):
state_variable_name = "table_last_updated_at"
...
@property
def new_watermark(self):
return datetime.now()
@property
def get_watermark(self):
return StateVariable.get(self.state_variable_name)
async def run(self):
while True:
results = conn.get_records("SELECT * FROM table WHERE updated_at > '%s'", self.get_watermark())
if results:
StateVariable.set(key=self.state_variable_name, value=self.new_watermark)
yield TriggerEvent({"status": "success", "results": results})
break
else:
await asyncio.sleep(self.poke_interval)
Why is it needed?
Allow for an Airflow-supported methodology for persisting state and unblocking the development of Event Triggers for Airflow users interested in "Asset-watching".
Are there any downsides to this change?
There is one theorized downside; Airflow users leveraging the StateVariable model as a sort of key-value store outside of an Event Trigger. However, this is not something that is overly concerning. It would be much more likely for Airflow users to abuse the Variable model instead. Along with this are the general downsides to added complexity, but those are minimal with this proposal.
Which users are affected by the change?
DAG Authors: these users will now have access to a tool that makes authoring Triggers, Sensors, and Tasks used to orchestrate incremental processes more accessible and unlocks the ability to further build out event-driven logic.
Deployment Managers: introducing a pattern with a pluggable backend may provide deployment managers an additional component of their Airflow “stack” to stand up and manage. This will also require a DB migration, which is outlined below.
How are users affected by the change? (e.g. DB upgrade required?)
Current users who don't use incremental event triggers are unaffected by this change. However, this new feature requires a DB migration, since a new model is being added. The only changes needed in this case would be the addition of a new table.
What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3)
There are not breaking changes included as part of this AIP. Added the StateVariable model will require a DB migration, but forces no breaking changes upon users.
Other considerations?
- This would be compatible with Asset partitions.
- For now, this sort of “state store” would be limited to Asset “watching”.
- Naming is going to be important, both for the state store, as well as the properties/methods used to store/retrieve state. Some options that have been throw around include
State,StateVariable,ProcessState,Watermark, and more. - One of the biggest challenges that we're facing is how this is tied to an Asset. A Trigger defined for an
AssetWatcheris in no way tied to the Asset itself. This disconnect makes the naming and implementation of this functionality challenging. Should this be something that is tied to an Asset? Should it be only applicable at the Trigger-level? Overall, the goal of this AIP is to make event-driven scheduling more intuitive for Airflow users. - One of the potential future works is to add StateVariable to the Airflow UI, so users can see exactly the StateVariable values, similar to how Airflow Variable is presented.
What defines this AIP as "done"?
This AIP will be considered “done” when the PR creates the model and the pluggable backend is merged and the solution is well-documented.
26 Comments
Vincent BECK
I definitely think this is a limitation we should take care of. I also agree this is a major blocker to build additional asset watchers based on non queue based assets. This is a solution to the Infinite scheduling issue mentioned in AIP-82.
I think I like the overall direction of the implementation, having a built-in Airflow way of handling the state/watermark of an asset being watched. Some clarifications and implementations details need, I think, to be ironed out, but overall I think is a great proposal. Happy to hear what others think.
Daniel Standish
You are calling it an asset watermark, but in the examples, I don't really see an asset involved? If an asset is not (or does not need to be) involved, then perhaps we should drop asset from the name?
Joffrey Bienvenu
I see two cases for which this AIP would be helpful:
I agree with Daniel:
ChangeStream can be setup to receive events from the entire database, or setup to receive events from a specific collection/table (via filters). Considering a table is an Asset: when ChangeStream is configured to receive all database's events, the Trigger (and its Watermark) are indeed not linked to an Asset. In the case of a file server, the Trigger could be configured to receive events from some (and not all) files living on the server (via filters ?). If those files are parquet, and are just a chunk/partition of the table, then the Trigger is not on the Asset either.
Jake Roach
This is a good callout. We changed what the
Watermarkclass looks like; now, it takes anasset_name, tying it more closely to an Asset.Daniel Standish
But you don't do anything with the asset name. What's the point of putting it there? Asset name is also, last i checked, not the identifier of the asset.
Just thinking that if you want this to be part of assets, maybe more thought is needed here and cleaner integration.
If this is a new feature of assets, shouldn't the methods be on the asset class rather than e.g. the trigger?
Tzu-ping Chung may have a thought
Guangyang Li
It's initially called Asset Watermark because we think the main use case is as an asset watcher like following:
trigger = SQLIncrementalQueryTrigger(asset_name, watermark_namespace, watermark_key) asset = Asset("example_asset", watchers=[AssetWatcher(name="test_asset_watcher", trigger=trigger)])While I also think the comments of Joffrey Bienvenu make sense, that it does not necessarily need to be linked to an Asset (like when multiple assets calling the same trigger, or a incremental SQL query with multiple table/asset used.)
Asset Watermark should not coupled with Asset, as we are already thinking it can be expanded to sensor or operator in the future. And we need to remove the argument asset_name from the model. I will update the examples above.
Vincent BECK
How to configure such trigger? Event driven scheduling works only with poll mechanism today in Airflow, the trigger is polling for an external resource status using API/SDK. When the resource status reaches a given status, the trigger fires an event which updates the asset which schedule the Dag. For any push mechanism (in order words, sending an event to Airflow from external system, such as the example you provided), Airflow API needs to be used but should not be taken into consideration into this AIP.
Joffrey Bienvenu
Indeed, by "receiving events", I was only talking about pull / poll mechanism, sorry for the bad lingo. The idea is basically: a Trigger query a system with some filters, and receives, at every call, None or one thing corresponding to that filter.
Actually, I just wanted to comment the naming, "Asset watermark":
Imagine I have multiple collections in a MongoDB, or multiple folders in a file server. I have defined 20 of them as 20 different Assets, and scheduled one different DAG per Asset. I want to define only one Trigger which polls for the 20 collections/folders at the same time (e.g. via filters built in the underlying SDK), to avoid creating too many clients polling the same source. And I want the Trigger to trigger the corresponding Asset depending on what he receives. Something like:
Trigger
AssetWatcher1 (Trigger, only for Asset1-related events) → Asset 1 → DAG A
AssetWatcher2 (Trigger, only for Asset2-related events) → Asset 2 → DAG B
AssetWatcher3 (Trigger, only for Asset3-related events) → Asset 3 → DAG C
...
This is indeed not possible (yet) in Airflow (unless I missed something ?). And yes, this is out of the AIP's scope (maybe a future improvement ? Some kind of filters logic on AssetWatchers ?). But IMO, a Trigger should not be linked to an Asset, because we could have Triggers decoupled from Assets. Furthermore, an Asset can watch for multiple Triggers: Will all the Triggers have the same asset name ? In current implementation, won't it conflict in the database ?
IMO: Watermarks don't need to be related to Assets. They are just related to Triggers
Vincent BECK
Correct, today any event fires by a trigger, updates all associated assets. It is not possible to configure which asset to update from the event sent by the trigger. This can be indeed a nice improvement
Jake Roach
For right now, I'm holding onto the
asset_watermarknaming convention. Yes, I think this is something that we'll want to circle back on. First, however, I'd like to validate whether or not this is something that should be tied to an Asset in any way. If it's something that is "independent" of any specific Asset, and is only tied to the Trigger, than we can adjust as needed.Zhe You Liu
It’s great to see an “Airflow-native” approach to the incremental processing use case.
This reminds me of the feat(postgres): Add PostgresCDCEventTrigger with AssetWatcher integration #51056 PR, which aims to achieve incremental processing using Variables. However, introducing another pluggable storage backend would certainly be more robust and reliable. The proposal is clear to me, and the core implementation seems manageable, most of it can be built on top of existing Airflow features. Essentially, this is about providing new Dag-level APIs for Airflow users to implement incremental processing more effectively.
Perhaps it would be helpful to include a list of suitable CDC/CDF use cases in the proposal. This could encourage the community to contribute implementations of AssetWatermark for various providers after the core part completed.
Hussein Awala
I’ve been working on a design for a state store in the triggerer (similar to what you are proposing), and I’d like to share my idea.
Below is an example:
class SomeTrigger(BaseEventTrigger): async def run(self, state: dict[str, Any]) -> AsyncIterator[TriggerEvent]: """ `state` is a dictionary loaded from the Airflow DB if the trigger was executed previously and yielded an event with a state. It represents the trigger’s prior state. """ starting_offset = state.get("offset", 0) if state else 0 # Example: connect to an external system and listen from starting_offset result = "some_result" new_offset = "some_new_offset" yield TriggerEvent(payload={"result": result}, state={"offset": new_offset})A few key points about this approach:
The "
state"parameter is only provided toBaseEventTriggerclasses, and only if theirrunmethod accepts astatekwarg (to avoid breaking changes).Since the state is yielded with the event, it will only be stored in the state store if the event is processed.
If there’s a failure before event processing ("everything fails all the time"), the state won’t be stored. In that case, the trigger restarts from the old state.
The state is persisted in the DB within the same session used to register the asset change, so both actions are committed or rolled back together.
Since our guarantee for triggers is at least once (not exactly once), the same applies to state: a new state may overwrite an old one, and all the data will be processed.
This solution could also be extended to normal triggers, since the state could be helpful for certain sensors in cron-based DAG runs.
What do you think?
Jake Roach
I definitely like where this is going. I know this is pseudo-code, but the one thing that we'll want to be able to handle is when a
TriggerEventis notyield'ed. In fact, that's one of the most important parts of this AIP. How can I incrementally "poll" an Asset since the last trigger run, regardless of aTriggerEventbeing fired.Hussein Awala
I think we can support yielding a `
TriggerEvent`without a payload to let Airflow update the state without triggering a DAG run. Since events are processed by the triggerer, it can simply update the state and keep the trigger running in that case.Daniel Standish
we could but why make it an event when you just want to update state? does the trigger object already have access to xcom and variable? if so, managing state would seem to be analogous. and i suspect we don't require trigger to emit an event for dealing with those interfaces.
Hussein Awala
Not necessarily an event — it could work like this:
yield StateUpdate(<new state>)→ when we just want to update the state without creating an eventyield TriggerEvent(<payload>, <new state>)→ when we want to update the state once the event is createdWhy do this? To keep state updates consistent with how events are created.
And why send the state in the event? To ensure the state is updated only if the event is created, which guarantees at-least-once delivery for users.
That said, I’m fine with any approach that ensures this guarantee, as long as it’s not too complicated for users (which is not the case in the solution proposed in this AIP, since the trigger/triggerer could fail either before yielding the event or before the event is processed).
Guangyang Li
I do see some trade-offs here, and yielding StateUpdate seems to be helpful indeed. However, it sacrifices some of the flexibility of the triggers if we don’t expose the method for setting states. For example, in some uncommon cases, a single trigger may require multiple watermarks, in which case more granular state updates would be necessary. So I lean towards letting users to update states in a dedicated method.
Jake Roach
As I think about it, I lean more and more towards a
TriggerWatermark. I think the biggest reason here is that a Trigger inheriting fromBaseEventTriggeris in no way tied to an Asset. Usingasset_namedoesn't really mean much in the current setup...Jarek Potiuk
Following the work that Daniel Standish dit on AIP-30 I think we should really split this one into two AIPs (or maybe stages of an AIP). The main difference vs the original AIP-30, however is that we should have a generic state api, storage, configuration that should be able to serve different purposes. Looking at the upcoming AI-bound use cases, I do think we have more and more needs for some kind of async-aware state storage that can be used in a number of situations and that we should work out a common API that can be used to serve all of those. Having a generic "primitive" API is something we can even expose to our users to use in the way have not imagined it yet.
So I think we should:
1) Implement a generic Async-aware state in Airflow that could be used by various components - I have a feeling we will need state in various places, not only asset watermarks in the near future. It's worth spending an extra time to have internal, generic API that can be used in different situations. We could (similarly to XCom) connect it to some backends eventually - starting with database - which should be the first stage.
2) this API should be independent on where it is run and how you get the data. It could be run in Worker, Triggerer, Dag Processor and scheduler - in Scheduler, it could use DB access, in those components that are behind the Task SDK, they should use Task SDK. The important thing is that we have reliable, robust enough API for state that allows us to make the state to be tied with different entities (Running trigger, Parsed Dag, running task, etc.).
3) then we could use that API to make "Asset Watcher" specific implementation - and to modify Hussein Awala proposal and respond to Jake Roach concerns, we could have a dedicated async api that trigger could call even if it is not yet yielding trigger Event - say `await AirflowState.updateState(type="trigger", self.hash, state={"offset". new_offset}` or smth similar.
I think very clearly we **need** to store state. Having a very simple, generic state storage, potentially much more configurable (say we could not use DB but some other types of storage there like we do with Xcom in the future) sounds like a good abstraction to have and apply to various cases.
Daniel Standish
Yeah I think I generally agree with Jarek Potiuk .
If you look at AIP-30, I struggled with these same questions, and did not really arrive at an answer. And no one else really showed interest in it at the time, and that's sorta why it died. That and I joined astronomer and had other priorities.
But, anyway, you can see in the original AIP (AIP-30: State persistence), I did not have a final interface proposed – I had options. If you look at solution #5, that was my "generic approach" — just arbitrary state store.
On the one hand generic is good cus people can do whatever they want and it is the simplest approach. On the other hand you could expose state storage as part of the interfaces of other objects (such as tasks or triggers, or dags or assets etc) and then it can be more convenient to use, but the tradeoff there is when you add it to existing interfaces it's more complicated to sort out what the behavior should be in different scenarios (such as clearing) and you have more decisions like that to make.
Guangyang Li
Makes sense to me. I don’t think this differs significantly from what was proposed above—a generic approach is exactly what we want to achieve. It’s a good point to highlight that it needs to be async-aware and compatible with components other than trigger and sensor.
I think we can update the AIP slightly to reflect these stages. For now, I don’t believe it’s necessary to break it down into sub-AIPs given the current scope, though I’m open to doing so if the scope expands, like applying this to multiple components.
Jarek Potiuk
Yeah. I think stages and explaining comon API introduced here is enough.
Jake Roach
I think that all makes sense - thanks, Jarek Potiuk. I like the idea of a "staged AIP", with the first stage encompassing the changes needed to introduce a generic, async-aware state store, while the second stage implements this for Triggers/"Asset watching".
Vincent BECK
I agree too that this issue is not specific to triggers/assets but this is something we need across different components. Having one solution generic enough to solve this state persistence in general would we awesome
Jake Roach
I'm in the process of creating a draft PR for
StateVariable. Once that is complete, I'll share with the community and we can move closer to a consensus/vote.Jake Roach
Vikram Koka, curious to get your thoughts on this!