Status

StateDraft
Discussion Thread
Vote Thread
Vote Result Thread
Progress Tracking (PR/GitHub Project/Issue Label)
Date Created

2025.12.04

Version Released
AuthorsStefan Wang 

Motivation

Summary

This AIP proposes adding reconnection/resumption capability to operators that submit long-running external jobs (Databricks, EMR, Snowflake, dbt Cloud, etc.). When infrastructure disruptions occur (worker crashes, pod evictions), operators can preserve external job IDs and reconnect on retry instead of wastefully restarting. This eliminates wasted computation and reduces costs while following the established deferral pattern.

Key Benefits:

  • Cost Savings: Avoid restarting 3-hour jobs that are 90% complete
  • Faster Delivery: Resume from where jobs left off instead of starting over
  • Disruption Ready: Infrastructure failures don't waste user work
  • Familiar Pattern: Mirrors existing deferral mechanism (TaskDeferred → TaskCheckpointed)

Motivation

Current Behavior and Limitations

Problem: Wasted Computation from Infrastructure Disruptions

When operators submit long-running external jobs and monitor their completion, infrastructure disruptions force operators to cancel healthy jobs and restart from scratch.

Example: The 3-Hour Databricks Job

class DatabricksOperator(BaseOperator):

    def execute(self, context):

        job_id = self.submit_databricks_job()

        return self.poll_until_complete(job_id)

    

    def on_kill(self):

        # No context - must always cancel

        if self.job_id:

            self.cancel_databricks_job(self.job_id)


Timeline:

  1. Task submits 3-hour Spark job to Databricks
  2. Job runs for 2.5 hours (83% complete, healthy)
  3. K8s node maintenance evicts Airflow worker pod
  4. on_kill() is called → cancels healthy Databricks job
  5. Retry starts → submits fresh 3-hour job

Waste: 2.5 hours computation + cluster costs + delayed delivery

The job was fine, only the Airflow worker was disrupted.



Considerations

What change do you propose to make?

Problem Visualization

Current Behavior (Wasteful):

Proposed Behavior (Resumable):

Core Concept

Extend the existing deferral pattern with a checkpointing pattern:

  • Deferral (TaskDeferred): Operator explicitly waits for external trigger (async by design)
  • Checkpointing (TaskCheckpointed): Platform transparently preserves state during disruptions (resilience by design)

These patterns are complementary and work together.

Implementation: Following the Deferral Pattern

The implementation mirrors the existing deferral mechanism:

# Existing: Deferral

raise TaskDeferred(trigger=..., method_name='execute_complete')

→ TaskInstanceState.DEFERRED

→ Scheduler calls execute_complete() when trigger fires

# New: Checkpointing  

raise TaskCheckpointed(remote_job_id='job-123', method_name='resume_job')

→ TaskInstanceState.CHECKPOINTED

→ Scheduler calls resume_job() on retry

Architecture Comparison:

Framework Changes

New Exception (mirrors TaskDeferred):

# In airflow/exceptions.py

@dataclass

class TaskCheckpointed(BaseException):

    """

    Signal to checkpoint remote job state for reconnection.

    Similar to TaskDeferred but for disruption resilience.

    """

    remote_job_id: str

    resume_method: str

    kwargs: dict[str, Any] | None = None

 New Task State (mirrors DEFERRED):

class TaskInstanceState(str, enum.Enum):

    ...

    DEFERRED = "deferred"

    CHECKPOINTED = "checkpointed"  # NEW

 BaseOperator Helper (mirrors defer()):

class BaseOperator:

    def checkpoint(

        self,

        *,

        remote_job_id: str,

        method_name: str,

        kwargs: dict[str, Any] | None = None,

    ) -> NoReturn:

        """

        Preserve remote job for reconnection.

        Similar to defer() but called from on_kill().

        """

        from airflow.exceptions import TaskCheckpointed

        raise TaskCheckpointed(

            remote_job_id=remote_job_id,

            method_name=method_name,

            kwargs=kwargs

        )

Flow Through System (mirrors deferral flow)

Deferral Flow (existing):

execute() → raise TaskDeferred → task_runner catches → 

SUPERVISOR_COMMS.send(DeferTask) → Execution API → 

DB: state=DEFERRED, next_method → Triggerer monitors → 

Scheduler calls execute_complete()

 

Checkpoint Flow (new, parallel design):

on_kill(context) → raise TaskCheckpointed → task_runner catches → 

SUPERVISOR_COMMS.send(CheckpointTask) → Execution API → 

DB: state=CHECKPOINTED, next_method, remote_job_id → 

Scheduler calls resume_job(context, remote_job_id)

Operator Implementation

Path 1: Resumable-Only (Synchronous Polling)

Before (current, wasteful):

class DatabricksOperator(BaseOperator):

   def execute(self, context):

       job_id = self.submit_databricks_job()

       return self.poll_until_complete(job_id)

  

   def on_kill(self):

       if self.job_id:

           self.cancel_databricks_job(self.job_id)  # Always cancels

 

After (resumable):

class DatabricksOperator(BaseOperator):

   def execute(self, context):

       job_id = self.submit_databricks_job()

       self._current_job_id = job_id

       return self.poll_until_complete(job_id)

  

   def on_kill(self, execution_context):

       if execution_context and execution_context.category == INFRASTRUCTURE:

           # Infrastructure disruption - preserve job

           self.checkpoint(

               remote_job_id=self._current_job_id,

               method_name='resume_job'

           )

       else:

           # Timeout or user action - cancel

           self.cancel_databricks_job(self._current_job_id)

  

   def resume_job(self, context, remote_job_id):

       """Called on retry - reconnect to existing job."""

       status = self.get_job_status(remote_job_id)

      

       if status in ["RUNNING", "PENDING"]:

           return self.poll_until_complete(remote_job_id)

       elif status == "SUCCESS":

           return self.get_job_results(remote_job_id)

       else:

           # Job failed/cancelled - start fresh

           return self.execute(context)

Path 2: Deferral + Resumable (Recommended)

Combines async efficiency (deferral) with disruption resilience (checkpointing):

class DatabricksOperator(BaseOperator):

   """Full example: Async monitoring + disruption resilience."""

  

   def __init__(self, deferrable: bool = True, **kwargs):

       super().__init__(**kwargs)

       self.deferrable = deferrable

       self._current_job_id = None

  

   def execute(self, context):

       """Submit job and start monitoring (PRE-DEFER PHASE - can be disrupted)."""

       job_id = self.submit_databricks_job()

       self._current_job_id = job_id

      

       if self.deferrable:

           # Async: free worker slot

           self.defer(

               trigger=DatabricksJobTrigger(job_id=job_id),

               method_name='execute_complete'

           )

           # DEFERRED PHASE: Worker freed, no disruption possible here

       else:

           # Sync: poll until complete

           return self.poll_until_complete(job_id)

  

   def execute_complete(self, context, event):

       """Called when trigger fires - job is ALREADY COMPLETE (POST-DEFER PHASE).

      

       If disrupted here, infrastructure auto-retry handles it (AIP-XX).

       No checkpoint needed - job is done, just re-fetch results.

       """

       job_id = event['job_id']

      

       if event.get('status') == 'SUCCESS':

           return self.get_job_results(job_id)

       else:

           raise AirflowException(f"Job {job_id} failed")

  

   def on_kill(self, execution_context):

       """Smart cleanup - only checkpoint during PRE-DEFER phase."""

       if not self._current_job_id:

           return

      

       if execution_context and execution_context.category == INFRASTRUCTURE:

           # Infrastructure disruption during PRE-DEFER phase

           # Preserve job for reconnection

           # Note: execute_complete() disruptions handled by infra auto-retry

           self.checkpoint(

               remote_job_id=self._current_job_id,

               method_name='resume_job'

           )

       else:

           # Timeout or user action - cancel

           self.cancel_databricks_job(self._current_job_id)

  

   def resume_job(self, context, remote_job_id):

       """Reconnect to existing job after PRE-DEFER disruption."""

       status = self.get_job_status(remote_job_id)

      

       if status in ["RUNNING", "PENDING"]:

           self._current_job_id = remote_job_id

           # Resume in original mode

           if self.deferrable:

               # Re-defer to continue async monitoring

               self.defer(

                   trigger=DatabricksJobTrigger(job_id=remote_job_id),

                   method_name='execute_complete'

               )

           else:

               return self.poll_until_complete(remote_job_id)

      

       elif status == "SUCCESS":

           # Job completed while the worker was down!

           return self.get_job_results(remote_job_id)

      

       else:

           # Job failed - start fresh

           return self.execute(context)


What problem does it solve?

  1. Eliminates Wasted Computation: External jobs complete even when Airflow workers are disrupted
  2. Reduces Cloud Costs: Avoid restarting expensive compute jobs (Spark, ML training, etc.)
  3. Improves Reliability: Infrastructure issues don't cause work loss
  4. Better User Experience: Long-running jobs become disruption-ready

Why is it needed?

 Scenario 1: Individual Users – Long-Running External Jobs

Users: Data engineers running ETL, ML engineers training models, analysts running queries

Common Problem Pattern: Long-running external jobs (Databricks, SageMaker, Snowflake, EMR) get cancelled and restarted when Airflow workers are disrupted, even though the external jobs are healthy and progressing normally.

Typical Pattern:

  • Job reaches 90%+ complete (ETL: 1h 50min / ML: 7.5h)
  • Worker disrupted (pod eviction, heartbeat timeout)
  • Operator cancels healthy external job → Restart from 0%
  • Time impact: Missed SLAs, delayed deployments, cascading failures

With Resumable Operators:

  • Worker disrupted → Job ID preserved, external job continues running
  • New worker reconnects → Resumes monitoring from 90%+
  • Result: Jobs complete on time, $0 wasted compute

Scenario 2: Platform-Wide – Aggregate Cost Impact and Operational Load

User: Morgan manages Data Processing platform serving 100+ data teams

Scale Problem: Even if individual job waste seems small, it compounds across the platform:

  • ~20% of long-running jobs hit infrastructure disruptions
  • Each disruption wastes in compute resources
  • Platform-wide pattern: Huge amount of avoidable costs
  • Happier users, fewer support tickets

Are there any downsides to this change?

Minimal:

  • Adds new CHECKPOINTED state (similar to existing DEFERRED state)
  • External job systems must provide status APIs (most already do)
  • Operators must implement resume_job() method (opt-in)

Mitigations:

  • Pattern mirrors existing deferral, familiar to maintainers
  • Backward compatible (operators without checkpointing work unchanged)
  • Documentation with migration examples for provider maintainers

Which users are affected by the change?

Positively Affected:

  • Users of long-running external jobs (Databricks, EMR, Snowflake, SageMaker, etc.)
  • Platform teams managing infrastructure (pod/node maintenance no longer disruptive)
  • Finance teams monitoring cloud costs (reduced wasted compute)

Not Affected:

  • Users of operators that don't implement checkpointing
  • Short-running tasks (< 1 minute) where restarts are cheap

Affected Operators

This pattern has the potential to improve any operator that submits external jobs, e.g.:

  • Databricks: DatabricksSubmitRunOperator, DatabricksRunNowOperator
  • EMR: EmrAddStepsOperator, EmrCreateJobFlowOperator
  • Snowflake: SnowflakeOperator (long-running queries)
  • dbt Cloud: DbtCloudRunJobOperator
  • Dataproc: DataprocSubmitJobOperator
  • SageMaker: SageMakerTrainingOperator
  • Glue: GlueJobOperator

How are users affected by the change? (e.g. DB upgrade required?)

For End Users (DAG Authors)

No changes required. This is an operator-level enhancement that works transparently:

# Before: works but wasteful on disruptions

databricks_task = DatabricksSubmitRunOperator(

    task_id='process_data',

    ...

)

# After: same code, but now disruption-ready

databricks_task = DatabricksSubmitRunOperator(

    task_id='process_data',

    ...

)

For Provider Maintainers

Opt-in enhancement by implementing resume_job():

# Step 1: Track job ID

def execute(self, context):

    job_id = self.submit_external_job()

    self._current_job_id = job_id  # Track it

    return self.poll_until_complete(job_id)

# Step 2: Smart on_kill()

def on_kill(self, execution_context):

    if execution_context and execution_context.category == INFRASTRUCTURE:

        self.checkpoint(

            remote_job_id=self._current_job_id,

            method_name='resume_job'

        )

    else:

        self.cancel_external_job(self._current_job_id)

# Step 3: Implement resume logic

def resume_job(self, context, remote_job_id):

    status = self.get_job_status(remote_job_id)

    if status in ["RUNNING", "PENDING"]:

        return self.poll_until_complete(remote_job_id)

    elif status == "SUCCESS":

        return self.get_job_results(remote_job_id)

    else:

        return self.execute(context)

What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3)

None, N/A

  • All changes are opt-in at operator level
  • Existing operators work unchanged
  • New state and exception follow established patterns

Other considerations?


What defines this AIP as "done"?

  1. TaskCheckpointed exception implemented
  2. CHECKPOINTED state added to TaskInstanceState
  3. BaseOperator.checkpoint() helper implemented
  4. Task runner catches and handles TaskCheckpointed
  5. Execution API processes checkpoint requests (reusing deferral code path)
  6. Scheduler calls resume_job() on retry (reusing deferral mechanism)
  7. Documentation: operator implementation guide
  8. An Example External Job Operator updated as reference implementation
  9. Tests for checkpoint flow end-to-end


1 Comment

  1. How is this different to having an operator that then deferers to a trigger?