|
Smart Sensors solve an obvious need - making it a lot less resource-intensive to run many mostly-idle Sensors at once - but they are a relatively limited feature, and not very extensible.
We need a way to bring these sorts of efficiency gains to any Operator/Task that has a large amount of idle time, and on top of this, an API extensible enough that it can be extended out to future event-driven styles of operation.
What change do you propose to make?
A new core concept is added to Airflow, called a Trigger. A Trigger is a small workload that is distinct from a Task; it is designed to be run alongside many other Triggers in an asynchronous (asyncio) environment. They fire events when their conditions are met.
The ability will be added for any Task/Operator to “defer” itself to keep running at a later time, providing a Trigger as part of the deferral.
Once a task requests to be deferred, Airflow stores a small amount of state describing how it should be resumed (a method to re-enter at, and optional keyword arguments), and then puts the task in a new “deferred” state, with it no longer taking up a worker/executor slot.
When the trigger fires an event, Airflow re-schedules the task to run again, passing the state it provided when it deferred itself, plus the event that woke it up.
Triggers run in a new process called a triggerer (analogous to the scheduler), which fetches and runs the triggers inside itself (all at once, because they are required to be asynchronous). When a Task requests deferral based on a Trigger, the trigger is dynamically loaded into the triggerer and run; once there are no tasks left that are waiting on it, the trigger is stopped and unloaded.
Triggers have a relatively simple contract:
run
method that yields control when they are idle. This allows them to coexist with thousands of other Triggers within one process.Here’s an example of a basic, single-shot datetime trigger:
class DateTimeTrigger(BaseTrigger): def __init__(self, moment: datetime.datetime): super().__init__() self.moment = moment def serialize(self) -> Tuple[str, Dict[str, Any]]: return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment}) async def run(self): while self.moment > timezone.utcnow(): await asyncio.sleep(1) yield TriggerEvent(self.moment) |
This has a run()
method that sleeps (yielding control with await
) until its time has been reached, and then fires a single event.
Triggers are stored in a new database table, trigger
, with the following schema:
id
(bigint) - Autoincrementing single-column IDclasspath
(varchar) - The period-separated path to the Trigger class inside a modulekwargs
(json) - JSON-encoded keyword arguments to pass to the trigger’s constructorcreated_date
(datetime) - When the trigger was createdIn the first version, one instance of a trigger will be made per deferred task, but the schema and API contract is designed so that, in future, deduplication of triggers could be performed.
Triggers must yield back control via await or yield quickly, as otherwise the nature of Python's async implementation means they will block the whole event loop. We can ship a coarse detector inside Airflow that ensures this situation isn't happening, but we unfortunately can't tell exactly which trigger is doing it if it happens - Python's slow-async-callback detection is not customisable or accessible, so we will either have to manually force asyncio's debug mode to on (so it alerts users to long-running callbacks), or advise our users how to do this.
A separate triggerer process runs at all times, and would form a new continuously-running-process part of an Airflow installation. It contains an asyncio event loop where it can run thousands of triggers at once efficiently.
This process monitors the trigger
table and makes sure that it is always running all the triggers that are defined in there - adding new ones that appear to the event loop, and removing ones whose tasks are completed - as well as monitoring the triggers it is running continuously.
When an event fires from a trigger, the triggerer looks up which tasks depend on that trigger, and moves those tasks to the scheduled state, as well as adding the event’s payload to their kwargs. It then disables the trigger and removes it from the event loop if that was the only task depending on it.
The triggerer is designed to be run in a highly-available (HA) manner; it should coexist with other instances of itself, running the same triggers, and deduplicating events when they fire off. In future, it should also be designed to run in a sharded/partitioned architecture, where the set of triggers is divided across replica sets of triggerers.
In order for a Task/Operator to be deferred, it should throw a new exception, TaskDeferred
, from somewhere within its execute()
method. This exception takes four arguments:
execute()
. This allows Operators to separate their logic into several steps, rather than cramming it all into one method and using lots of conditionals.failed
if its trigger has not fired.There will also be a convenience method provided on BaseOperator called defer()
, so you could write the following code inside an Operator:
def execute(self, context): # This fires the TaskDeferred exception and suspends execution at this point self.defer(trigger=DateTimeTrigger(moment=self.target_time), method_name="execute_complete") def execute_complete(self, context, event): # The Operator comes back here when the trigger fires due to "method_name" return |
The state lifecycle of a deferred task is:
In order to implement deferral, the _execute_task()
wrapper on TaskInstance will catch the exception and handle the persisting of state to the database as well as setting up the trigger to run. It will also look at the new columns when a task resumes to see if it needs to use a different method to enter the operator, as well as what keyword arguments to pass.
These changes mean three columns need to be added to the task_instance
table:
trigger_id
(bigint), a reference to the trigger tabletrigger_timeout
(datetime), the optional UTC time when this deferral expires and the task instance should failnext_method
(varchar), the name of the method to run on the Operator if it is not executenext_kwargs
(json), the keyword arguments to pass to that methodAdditionally, a new possible value of state
is added, “deferred”. The scheduler will be updated to understand this as a pending type of task state, and not consider a DAG deadlocked when its tasks are all deferred.
It solves the problem where both Sensors and externally-dependent Operators spend most of their time idle, but occupying a worker slot.
Implementing this AIP will result in very significant efficiency gains in any Airflow installation that contains these.
Smart Sensors attempt to solve this problem but require specific implementation per Sensor and cannot be run in a highly-available manner. The "reschedule" mode of Sensors also tries to do this, but in a time-driven matter rather than an event-driven manner, which leads to inefficiencies if you want a relatively low-latency response. Additionally, both of these are Sensor-specific and will not work with more general Operators/Tasks.
It adds one new process that needs to continuously run on any Airflow installation that uses deferrable operators.
It adds an implementation that is a superset of Smart Sensors and so duplicates some functionality.
Users who wish to use deferrable operators will have to run a new process. Nobody else will be affected in the short term apart from a database migration.
In the long term, if Airflow changes some of its core Sensors to be deferrable, all users will have to run the triggerer process, but any DAGs that use those sensors will get an immediate performance boost with no code changes.
Any existing Operator or Sensor could be ported to be deferrable underneath with no change to its DAG-facing API, which would allow relatively seamless upgrades.
A database migration is required for all users, and running a new long-running process is required for users who wish to use deferrable operators.
In order to validate the ideas presented, this has already been implemented in a prototype form to ensure all the changes are possible and co-exist well with Airflow.
The prototype branch can be seen in draft pull request 15389.
It provides one new async sensor, AsyncTimeSensor
, which is a drop-in replacement for the existing TimeSensor
, but that doesn’t take up a worker slot while it’s waiting.
Note that the prototype is rough, and would receive significant further work before it was landed to complete this AIP.
This proposal forms the basis of an ongoing ability to modify the core of Airflow to be more event-driven. Some potential changes this opens up in future - that are not part of the scope of this AIP, but which are worth considering in terms of general direction - are:
reschedule
mode on Sensors. This could now be achieved using a time-based trigger instead, and remove the need for sensors to have execution modes.Some alternate approaches were considered but ultimately excluded from consideration for this AIP:
When the deferral functionality is merged into the master branch.