Status
Motivation
Apache Airflow is primarily designed for time-based and dependency-based scheduling of workflows. However, modern data architectures often require near real-time processing and the ability to react to events from various sources, such as message queues. This proposal aims to introduce native event-driven capabilities to Airflow, allowing users to create workflows that can be triggered by external events, thus enabling more responsive data pipelines.
Proposal
Note. In this AIP we refer “asset” as a dataset in Airflow 2.10. We use “asset” because “dataset” is renamed “asset” in AIP 73.
Today in Airflow, you can build event-based workflows using either external task sensor, sensors/deferrable operators, REST API, and dataset (to be renamed as Assets). Scheduling using assets has recently gained popularity given the efficient execution, monitoring support and API capabilities. An Airflow asset is a logical grouping of data. Upstream producer tasks can update assets, and asset updates contribute to scheduling downstream consumer DAGs.
Example:
from airflow.datasets import Asset with DAG(...): MyOperator( # this task updates example.csv outlets=[Asset("s3://dataset-bucket/example.csv")], ..., ) with DAG( # this DAG should be run when example.csv is updated schedule=[Asset("s3://dataset-bucket/example.csv")], ..., ): ...
In this example, the first DAG sends an event (or updates) the asset, and the second DAG is scheduled upon asset update.
However, as illustrated in the example above, updating the asset is the user's responsibility. In the example above the user uses a DAG but other techniques such as using the “Create dataset event” Rest API are available to the user to update an asset. All these techniques are great but require some work from the user to set-up a pipeline in order to update the asset.
Ideally, there should be an end to end solution in Airflow to trigger DAGs based on external event such as:
- A message has been pushed in a message queue such as Google Pubsub, Amazon SQS, Azure Message Bus, Apache Kafka, ...
- A file has been created in a storage service
- A database has been updated
Design
The goal is to build a solution in Airflow to automatically update Assets based on external events. This scheduling can be categorized into two categories:
- Poll based event-driven scheduling
- Push based event-driven scheduling
Only the poll based event-driven scheduling is considered as part of this AIP. Some investigation has been done on the push based event-driven scheduling without leading to a satisfying solution. Therefore, it has been decided to focus on the poll based approach while continuing investigating on the push based event-driven scheduling. If it is decided later to move on with a push based event-driven scheduling (this proposal or another), it will be part of another AIP.
Poll based event-driven scheduling
Airflow constantly monitors the state of an external resource and updates the asset whenever the external resource reaches a given state (if it does reach it). To achieve this, the plan is to leverage Airflow Triggers. Triggers are small, asynchronous pieces of Python code whose job is to poll an external resource state. Today, triggers are used exclusively for deferrable operators but the goal here would be to use them as well to update assets based on external conditions.
Today you could achieve this by using continuous scheduling and having one deferrable operator in the DAG you are continuously scheduling. he purpose of this solution is to achieve the same results without having one DAG that is continuously scheduled to run the deferrable operator.
DAG author experience
Below is an example of DAG triggered when a new message is pushed to an Amazon SQS queue.
trigger = SqsSensorTrigger(sqs_queue="<my_queue>") asset = Asset("<my_queue>", watchers=[trigger]) with DAG( dag_id=DAG_ID, schedule=asset, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ): empty_task = EmptyOperator(task_id="empty_task") chain(empty_task)
Sequence
Below is a simplified version of a sequence diagram describing what is going on in Airflow when DAGs such as above are present in an Airflow environment.
Avoid infinite scheduling
Current triggers implementation is perfectly suited for sensors and deferrable operators but is not compatible with DAG scheduling. The reason is most of the triggers are waiting for an external resource to reach a given state. Examples:
- Wait for a file to exist in a storage service
- Wait for a job to be in a success state
- Wait for a row to be present in a database
Scheduling upon these conditions would lead to infinite scheduling because once the condition is reached, it is very likely it will remain for quite some time. Example: if a DAG is scheduled when a specific job state reaches a success state, when it does, the job state will remain in this state. Therefore, scheduling a DAG using that condition will lead to infinite scheduling when the job reaches this state. Another example, S3KeyTrigger
checks if a given file is present in a S3 bucket. Once this specific file is created in the S3 bucket, S3KeyTrigger
will always exit successfully since the condition “is the file X present in the bucket Y” is True
. In this case, the consequence would be to keep triggering any DAG scheduled based on that trigger every-time the triggerer execute the triggers (this interval is usually configurable in the trigger implementation).
To avoid this infinite scheduling loop, we want to only fire events if we have not done so since the last event update was received. For example, if a S3 file is updated, and we haven’t fired an event since this file updated time, then we trigger the event. As a result, we want to reuse the concept of triggers but we do not want to use the current implementation of triggers specific to deferrable operators and sensors. Therefore, there are two options:
- One trigger implementation should be specific to either deferrable operators/sensors or scheduling
- Introduce a new method in
BaseTrigger
(e.g.schedule
) similar to the existing methodrun
.schedule
would be used to check scheduling decisions andrun
would be used to check defer decisions
We should mention that some triggers are compatible with both use cases. As an example, SqsSensorTrigger
can be used to defer a DAG but also to schedule a DAG since it is waiting for new message in a queue, poll it when there is one and then delete it. For this reason, among the two options listed above, we might want to chose the first one.
Some other parameters could also be added to add control for the DAG author to configure how often a DAG can be scheduled based on these events. See more in the section “Additional considerations (future work)”.
Out of scope
Items below are out of scope of this AIP. However, I decided to keep them in this document because they might be useful to initiate a conversation if someone decides to work on one of these items once this AIP is completed.
Additional considerations (future work)
Airflow is not designed to handle 100s of event per second, and provide below optimization would allow users to tune the behaviour while keeping Airflow scheduling performant
- Configurable trigger behaviour: Allow DAG authors to specify whether they want the DAG to be triggered:
- For every event
- At a specified interval, processing batches of events
- When a certain number of events have accumulated
- Batch processing: Support batching of events
Some other features could also be built based upon this AIP. Example of such features: archive and replay of events. However, this is not part of the AIP and considered as a follow-up item.
Push based event-driven scheduling
As opposed to the poll based event scheduling, the push based event scheduling consists of an event sent from an external system to Airflow whenever this external system detects a change/activity. Examples:
- A user signed up to an external system
- A remote job has been successfully executed
- A file has been created in a storage service
- An OpenLineage event has been emitted
These events are fired by the external service (AWS, Google, ...) to notify such event. The external service needs to be configured to send such event/notification to the create asset event API endpoint of the Airflow environment. On Airflow side, this API endpoint needs to handle unknown incoming requests and convert it to valid create asset event API request.
To achieve this, here are the main changes we need to introduce in Airflow:
- Add an optional mapping mechanism in the create asset event API endpoint between the user (credentials used to call the create asset event API) and a method to convert the request coming from the external service to a valid create asset event API request. Example:
- Step 1. As a user I configure in my cloud provider to send an event whenever a remote job has been successfully executed. As part of this configuration, I specify the create asset event API endpoint of my Airflow environment as HTTP target. I also configure the event to use specific credentials I created in Airflow for this event (e.g. remote_job_success_user). When this step is done, whenever my remote job is successfully executed, my cloud provider will send an HTTP request to create asset event API endpoint of my Airflow environment.
- Step 2. As a user I associate a function to the user I used to call my Airflow environment (e.g. remote_job_success_user). This function would be responsible of converting the HTTP request sent by my cloud provider when a remote job is successfully executed to a valid create asset event API request. This function can be written by the user themself or use some of the functions available in providers (these functions do not exist today in Airflow providers). How this association between a user and a function will be done by the user will be determined during the implementation. We can think of using Airflow CLI to help the user make such association but this decision will be delayed to the implementation.
Note. The user might want to handle this conversion themself by using Lambda or any other computing layer between the event and the Airflow environment. This is possible today and will remain possible.
Sequence
Below is a simplified version of sequence diagram describing what is going on in Airflow when DAGs such as above is present in an Airflow environment.
Considerations
What problem does it solve?
- Lack of native support for event-driven workflows in Airflow
- Difficulty in integrating Airflow with real-time data sources and message queues
- Inability to trigger DAGs based on external events efficiently
Which users are affected by the change?
Only users who want to use this new feature. This change does not break any existing feature.
How are users affected by the change? (e.g. DB upgrade required?)
DB upgrade is required since some modifications to the DB is needed. These changes are only creation of new DB tables:
- New table to record association between assets and triggers
- New table to record event receivers
- New table to record association between event receivers and assets
What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3)
The migration effort is relatively low for existing workflows, as this proposal introduces new features without breaking existing functionality. However:
- DAG authors who want to leverage event-driven capabilities will need to modify their DAGs to use the new classes and related concepts
What defines this AIP as "done"?
Poll based scheduling as described in this AIP handled in Airflow.
14 Comments
Vincent BECK
Example of implementation of event receiver:
Jarek Potiuk
And just a general comment here Vincent BECK - I am not **entirely against** adding new "entity type" on Airflow in terms of external API - it just feels wrong to have to do it, rather than plug it in better in existing API. Geneally for API it's always the case who adapts the API - should it be caller to adapt to the API of the calee or the other way round. My guts feeling is that we should have calee exposing ONE API, and maybe add "external" mappers to map the caller API to the "callee" API. This is always the case of who adapts to whom. Should Airflow understand all the different ways it can be called? Or should the callng entityt adapt to the way "Airflow expects it". I think adding a middle-layer that can be set-up to map it is a solution, but it should not necessairly change the authentication and endpoints exposed by Airflow.
Vincent BECK
I totally understand your point, and thanks again for sharing it, and it also makes sense. I can see that many people have concerns regarding the push based mechanism because of this new endpoint. I kind of like your solution, that could be a good compromise. My plan is to update this AIP addressing all the comments (from confluence and dev email list), I'll modify heavily the push-based mechanism to adopt your idea.
Kaxil Naik
Yeah, the middle ground for me is really Event/Triggers for message queues for cloud providers: GCP PubSub, AWS SQS.
As a customer, if you on are cloud, you will use one of those (Pubsub, Kinesis / SQS and Azure Message Bus. For enterprises, Kafka or Redis should cover more than 80% cases.
Vincent BECK
I think these use cases should be covered by the poll approach? Though, you are right in the fast that this use case is the major use case and I should mention it in the AIP. I'll update it
Kaxil Naik
One last comment: I know this talks about External, but could we extend this to Internal events too? These should probably be even more easier but valuable. Events like:
DagCompletionEvent
TaskCompletionEvent
This will allow real dag dependencies on scheduling without using things like ExternalTaskSensors or Datasets. This will help non-data workflows
Vincent BECK
Today it is already possible to trigger DAGs based on these 2 events using datasets. Do you think it is still valuable to implement these 2 events? To me I feel like it is unnecessary because, as I said, assets is a perfect candidate for that. It feels like duplicate to me. If we decide to implement such events, I definitely think it should leverage assets underneath. Implementing `DagCompletionEvent` and
TaskCompletionEvent
from scratch would result in implementing a duplicated version of asset schedulingKaxil Naik
Yeah asset is a good approach for Data workflows but it is a proxy for non-data workflows.
I would think of `DagCompletionEvent` and
TaskCompletionEvent
to be very simple, just a few lines of code to check if the task is completed or not. Tbh you can just reuse WorkflowTrigger or TaskStateTrigger etc if we can schedule on "Triggers" or Events:Vincent BECK
I agree with that approach then. This should be easily achievable
Jens Scheffler
Thanks for the update/rework, now I really LIKE it (would have been great having this 2 years ago, we implemented a custom Plugin with Flask Endpoint in the past to make the same/compensate the missing).
Elad Kalif
I think we should have a limitation section.
There are limits that may comes from this AIP specifically and there are limits who may come from what Airflow can handle in general (Scheduler wise)
For example... reading this AIP I get that this is not fully event driven (at least not in the sense of triggering DagRun per each message) but there is no explanation over what can we handle? where we draw the line of what is acceptable and what is not? If a message/file lands every 1 min? 3 min? is that OK? I am hoping to see some kind of guidelines around what we hope to deliver here which in the future we will convert into the docs
Vincent BECK
That's a very good point and I tried to handle it in the section "Additional considerations (future work)", I can expand this section and add additional details if you think this is not enough
Elad Kalif
I think it's best to explicitly mention the limits
we can also differentiate between limits coming from this AIP specifically and limits that may come from other Airflow components (for example scheduler handling)
We can also mention what we hope to support and then benchmark it with relevant testing when the feature is ready.
Vikram Koka
I really like the overall updates.
I am comfortable with the Poll based approach defined above.
I am still struggling with the Push based approach.
I understand the "mapping / conversion function" approach to eliminate the need for an additional component, but still not comfortable with this, because of how much this could grow.
Can be convinced, but not sure.