DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Motivation
Summary
This AIP proposes adding reconnection/resumption capability to operators that submit long-running external jobs (Databricks, EMR, Snowflake, dbt Cloud, etc.). When infrastructure disruptions occur (worker crashes, pod evictions), operators can preserve external job IDs and reconnect on retry instead of wastefully restarting. This eliminates wasted computation and reduces costs while following the established deferral pattern.
Key Benefits:
- Cost Savings: Avoid restarting 3-hour jobs that are 90% complete
- Faster Delivery: Resume from where jobs left off instead of starting over
- Disruption Ready: Infrastructure failures don't waste user work
- Familiar Pattern: Mirrors existing deferral mechanism (TaskDeferred → TaskCheckpointed)
Motivation
Current Behavior and Limitations
Problem: Wasted Computation from Infrastructure Disruptions
When operators submit long-running external jobs and monitor their completion, infrastructure disruptions force operators to cancel healthy jobs and restart from scratch.
Example: The 3-Hour Databricks Job
class DatabricksOperator(BaseOperator): def execute(self, context): job_id = self.submit_databricks_job() return self.poll_until_complete(job_id)
def on_kill(self): # No context - must always cancel if self.job_id: self.cancel_databricks_job(self.job_id) |
Timeline:
- Task submits 3-hour Spark job to Databricks
- Job runs for 2.5 hours (83% complete, healthy)
- K8s node maintenance evicts Airflow worker pod
- on_kill() is called → cancels healthy Databricks job
- Retry starts → submits fresh 3-hour job
Waste: 2.5 hours computation + cluster costs + delayed delivery
The job was fine, only the Airflow worker was disrupted.
Considerations
What change do you propose to make?
Problem Visualization
Current Behavior (Wasteful):
Proposed Behavior (Resumable):
Core Concept
Extend the existing deferral pattern with a checkpointing pattern:
- Deferral (TaskDeferred): Operator explicitly waits for external trigger (async by design)
- Checkpointing (TaskCheckpointed): Platform transparently preserves state during disruptions (resilience by design)
These patterns are complementary and work together.
Implementation: Following the Deferral Pattern
The implementation mirrors the existing deferral mechanism:
# Existing: Deferral raise TaskDeferred(trigger=..., method_name='execute_complete') → TaskInstanceState.DEFERRED → Scheduler calls execute_complete() when trigger fires # New: Checkpointing raise TaskCheckpointed(remote_job_id='job-123', method_name='resume_job') → TaskInstanceState.CHECKPOINTED → Scheduler calls resume_job() on retry |
Architecture Comparison:
Framework Changes
New Exception (mirrors TaskDeferred):
# In airflow/exceptions.py @dataclass class TaskCheckpointed(BaseException): """ Signal to checkpoint remote job state for reconnection. Similar to TaskDeferred but for disruption resilience. """ remote_job_id: str resume_method: str kwargs: dict[str, Any] | None = None |
New Task State (mirrors DEFERRED):
class TaskInstanceState(str, enum.Enum): ... DEFERRED = "deferred" CHECKPOINTED = "checkpointed" # NEW |
BaseOperator Helper (mirrors defer()):
class BaseOperator: def checkpoint( self, *, remote_job_id: str, method_name: str, kwargs: dict[str, Any] | None = None, ) -> NoReturn: """ Preserve remote job for reconnection. Similar to defer() but called from on_kill(). """ from airflow.exceptions import TaskCheckpointed raise TaskCheckpointed( remote_job_id=remote_job_id, method_name=method_name, kwargs=kwargs ) |
Flow Through System (mirrors deferral flow)
Deferral Flow (existing):
execute() → raise TaskDeferred → task_runner catches → SUPERVISOR_COMMS.send(DeferTask) → Execution API → DB: state=DEFERRED, next_method → Triggerer monitors → Scheduler calls execute_complete() |
Checkpoint Flow (new, parallel design):
on_kill(context) → raise TaskCheckpointed → task_runner catches → SUPERVISOR_COMMS.send(CheckpointTask) → Execution API → DB: state=CHECKPOINTED, next_method, remote_job_id → Scheduler calls resume_job(context, remote_job_id) |
Operator Implementation
Path 1: Resumable-Only (Synchronous Polling)
Before (current, wasteful):
class DatabricksOperator(BaseOperator): def execute(self, context): job_id = self.submit_databricks_job() return self.poll_until_complete(job_id)
def on_kill(self): if self.job_id: self.cancel_databricks_job(self.job_id) # Always cancels |
After (resumable):
class DatabricksOperator(BaseOperator): def execute(self, context): job_id = self.submit_databricks_job() self._current_job_id = job_id return self.poll_until_complete(job_id)
def on_kill(self, execution_context): if execution_context and execution_context.category == INFRASTRUCTURE: # Infrastructure disruption - preserve job self.checkpoint( remote_job_id=self._current_job_id, method_name='resume_job' ) else: # Timeout or user action - cancel self.cancel_databricks_job(self._current_job_id)
def resume_job(self, context, remote_job_id): """Called on retry - reconnect to existing job.""" status = self.get_job_status(remote_job_id)
if status in ["RUNNING", "PENDING"]: return self.poll_until_complete(remote_job_id) elif status == "SUCCESS": return self.get_job_results(remote_job_id) else: # Job failed/cancelled - start fresh return self.execute(context) |
Path 2: Deferral + Resumable (Recommended)
Combines async efficiency (deferral) with disruption resilience (checkpointing):
class DatabricksOperator(BaseOperator): """Full example: Async monitoring + disruption resilience."""
def __init__(self, deferrable: bool = True, **kwargs): super().__init__(**kwargs) self.deferrable = deferrable self._current_job_id = None
def execute(self, context): """Submit job and start monitoring (PRE-DEFER PHASE - can be disrupted).""" job_id = self.submit_databricks_job() self._current_job_id = job_id
if self.deferrable: # Async: free worker slot self.defer( trigger=DatabricksJobTrigger(job_id=job_id), method_name='execute_complete' ) # DEFERRED PHASE: Worker freed, no disruption possible here else: # Sync: poll until complete return self.poll_until_complete(job_id)
def execute_complete(self, context, event): """Called when trigger fires - job is ALREADY COMPLETE (POST-DEFER PHASE).
If disrupted here, infrastructure auto-retry handles it (AIP-XX). No checkpoint needed - job is done, just re-fetch results. """ job_id = event['job_id']
if event.get('status') == 'SUCCESS': return self.get_job_results(job_id) else: raise AirflowException(f"Job {job_id} failed")
def on_kill(self, execution_context): """Smart cleanup - only checkpoint during PRE-DEFER phase.""" if not self._current_job_id: return
if execution_context and execution_context.category == INFRASTRUCTURE: # Infrastructure disruption during PRE-DEFER phase # Preserve job for reconnection # Note: execute_complete() disruptions handled by infra auto-retry self.checkpoint( remote_job_id=self._current_job_id, method_name='resume_job' ) else: # Timeout or user action - cancel self.cancel_databricks_job(self._current_job_id)
def resume_job(self, context, remote_job_id): """Reconnect to existing job after PRE-DEFER disruption.""" status = self.get_job_status(remote_job_id)
if status in ["RUNNING", "PENDING"]: self._current_job_id = remote_job_id # Resume in original mode if self.deferrable: # Re-defer to continue async monitoring self.defer( trigger=DatabricksJobTrigger(job_id=remote_job_id), method_name='execute_complete' ) else: return self.poll_until_complete(remote_job_id)
elif status == "SUCCESS": # Job completed while the worker was down! return self.get_job_results(remote_job_id)
else: # Job failed - start fresh return self.execute(context) |
What problem does it solve?
- Eliminates Wasted Computation: External jobs complete even when Airflow workers are disrupted
- Reduces Cloud Costs: Avoid restarting expensive compute jobs (Spark, ML training, etc.)
- Improves Reliability: Infrastructure issues don't cause work loss
- Better User Experience: Long-running jobs become disruption-ready
Why is it needed?
Scenario 1: Individual Users – Long-Running External Jobs
Users: Data engineers running ETL, ML engineers training models, analysts running queries
Common Problem Pattern: Long-running external jobs (Databricks, SageMaker, Snowflake, EMR) get cancelled and restarted when Airflow workers are disrupted, even though the external jobs are healthy and progressing normally.
Typical Pattern:
- Job reaches 90%+ complete (ETL: 1h 50min / ML: 7.5h)
- Worker disrupted (pod eviction, heartbeat timeout)
- Operator cancels healthy external job → Restart from 0%
- Time impact: Missed SLAs, delayed deployments, cascading failures
With Resumable Operators:
- Worker disrupted → Job ID preserved, external job continues running
- New worker reconnects → Resumes monitoring from 90%+
- Result: Jobs complete on time, $0 wasted compute
Scenario 2: Platform-Wide – Aggregate Cost Impact and Operational Load
User: Morgan manages Data Processing platform serving 100+ data teams
Scale Problem: Even if individual job waste seems small, it compounds across the platform:
- ~20% of long-running jobs hit infrastructure disruptions
- Each disruption wastes in compute resources
- Platform-wide pattern: Huge amount of avoidable costs
- Happier users, fewer support tickets
Are there any downsides to this change?
Minimal:
- Adds new CHECKPOINTED state (similar to existing DEFERRED state)
- External job systems must provide status APIs (most already do)
- Operators must implement resume_job() method (opt-in)
Mitigations:
- Pattern mirrors existing deferral, familiar to maintainers
- Backward compatible (operators without checkpointing work unchanged)
- Documentation with migration examples for provider maintainers
Which users are affected by the change?
Positively Affected:
- Users of long-running external jobs (Databricks, EMR, Snowflake, SageMaker, etc.)
- Platform teams managing infrastructure (pod/node maintenance no longer disruptive)
- Finance teams monitoring cloud costs (reduced wasted compute)
Not Affected:
- Users of operators that don't implement checkpointing
- Short-running tasks (< 1 minute) where restarts are cheap
Affected Operators
This pattern has the potential to improve any operator that submits external jobs, e.g.:
- Databricks: DatabricksSubmitRunOperator, DatabricksRunNowOperator
- EMR: EmrAddStepsOperator, EmrCreateJobFlowOperator
- Snowflake: SnowflakeOperator (long-running queries)
- dbt Cloud: DbtCloudRunJobOperator
- Dataproc: DataprocSubmitJobOperator
- SageMaker: SageMakerTrainingOperator
- Glue: GlueJobOperator
How are users affected by the change? (e.g. DB upgrade required?)
For End Users (DAG Authors)
No changes required. This is an operator-level enhancement that works transparently:
# Before: works but wasteful on disruptions databricks_task = DatabricksSubmitRunOperator( task_id='process_data', ... ) # After: same code, but now disruption-ready databricks_task = DatabricksSubmitRunOperator( task_id='process_data', ... ) |
For Provider Maintainers
Opt-in enhancement by implementing resume_job():
# Step 1: Track job ID def execute(self, context): job_id = self.submit_external_job() self._current_job_id = job_id # Track it return self.poll_until_complete(job_id) # Step 2: Smart on_kill() def on_kill(self, execution_context): if execution_context and execution_context.category == INFRASTRUCTURE: self.checkpoint( remote_job_id=self._current_job_id, method_name='resume_job' ) else: self.cancel_external_job(self._current_job_id) # Step 3: Implement resume logic def resume_job(self, context, remote_job_id): status = self.get_job_status(remote_job_id) if status in ["RUNNING", "PENDING"]: return self.poll_until_complete(remote_job_id) elif status == "SUCCESS": return self.get_job_results(remote_job_id) else: return self.execute(context) |
What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3)
None, N/A
- All changes are opt-in at operator level
- Existing operators work unchanged
- New state and exception follow established patterns
Other considerations?
What defines this AIP as "done"?
- TaskCheckpointed exception implemented
- CHECKPOINTED state added to TaskInstanceState
- BaseOperator.checkpoint() helper implemented
- Task runner catches and handles TaskCheckpointed
- Execution API processes checkpoint requests (reusing deferral code path)
- Scheduler calls resume_job() on retry (reusing deferral mechanism)
- Documentation: operator implementation guide
- An Example External Job Operator updated as reference implementation
- Tests for checkpoint flow end-to-end





1 Comment
Ash Berlin-Taylor
How is this different to having an operator that then deferers to a trigger?