DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Motivation
Current Behavior and Limitations
Traditionally, XCom is defined as “a mechanism that lets Tasks talk to each other”. However, XCom also has the capacity and potential to help persist and manage task state within a task itself.
Currently, Apache Airflow automatically clears a task instance’s XCom data when it is retried. This behavior, while ensuring clean state for retry attempts, creates limitations:
- Loss of Internal Progress: Tasks that have internal checkpointing or progress tracking lose all intermediate state on retry, forcing restart from the beginning.
- Resource State Loss: Tasks cannot maintain state about allocated resources (compute instances, downstream job IDs, etc.) across retry attempts, leading to redundant expensive setup operations.
- No Recovery/Resume Capability: There's no way for tasks to resume from internal checkpoints when transient failures occur during long-running atomic operations.
- Poor User Experience: users must implement external state management systems to work around this limitation, adding complexity to DAG authoring.
This proposal aims at extending the capacity of XCom by allowing persisting a Task Instance’s XCom through its retries, enabling users to build more resilient and efficient pipelines. This is particularly useful for the type of tasks which are atomic (so one such task cannot be split into multiple tasks) and need to manage internal state or checkpoints.
Problem Examples
Consider scenarios where tasks have legitimate internal checkpoints or expensive setup operations:
- Large Dataset Processing: A task processing a million-record dataset where failure at record 999,999 of 1,000,000 forces restart from record 1.
- Resource Allocation: A task that allocates expensive cloud compute resources, fails during processing, and must reallocate the same resources on retry.
- External Job Tracking and Polling: A task triggers an external job, then polls status. Infrastructure failures cause retries to launch duplicate jobs instead of tracking the existing one. This causes unnecessary cost, and may even cause more severe consequences if the job in the external system is not idempotent.
In these cases, the current XCom clearing behavior forces redundant operations that could be avoided, and can even lead to more severe consequences in certain scenarios (e.g. if the external job in example 3 above is not idempotent).
If we have the option to persist XCom and make it available in the retry, these problems would have a solution. For example, the problem in the 3rd example above can be avoided if the ID of the external system job is persisted in XCom and available for the retries, so that the retry attempts can always check if there is already an existing job and check its status first before triggering a new job.
Considerations
What change do you propose to make?
We propose adding a new optional boolean parameter persist_xcom_through_retry to the BaseOperator class that controls whether XCom data is preserved during task retries.
Implementation Details
- Parameter Default:
Falseto maintain backward compatibility - Scope:
- Only affects XCom clearing behavior when task is eligible for retry
- Does not interfere with any manual XCom clearing.
What problem does it solve?
The problem was user cannot make full use of XCom to manage the state or checkpoints within a task, causing inconvenience and cost waste, or even more severe result when the operation is not idempotent.
Why is it needed?
- Better User Experience: Provides native support for stateful task recovery
- Cost Optimization: Reduces compute costs by avoiding redundant operations
- Improved Reliability: Enables intelligent retry strategies with checkpointing
Are there any downsides to this change?
No. And backward capacity is fully guaranteed.
Which users are affected by the change?
No user will be affected. It just provided a new option for users who have the need to manage state or checkpoint within a task.
How are users affected by the change? (e.g. DB upgrade required?)
User won't be affected.
For users who need to use this new feature, they just need to adjust the value for the new boolean parameter persist_xcom_through_retry (default is False for backward compatibility).
What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3)
Zero Breaking Changes & 100% backward compatible: Default value
Falsepreserves current behaviorGradual Adoption: Users can enable on specific tasks as needed
What defines this AIP as "done"?
- This boolean parameter
persist_xcom_through_retryis added toBaseOperatorand works properly - Documentation updated accordingly.
Appendix - Use Cases
Use Case-1: Large Dataset Processing with Checkpoints
Problem: Processing millions of records where failure late in the process forces a complete restart.
Benefit: Can process millions of records without losing progress on transient failures.
@task(persist_xcom_through_retry=True)
def process_large_dataset(**context):
last_id = context["ti"].xcom_pull(key='last_processed_id') or 0
processed_count = context["ti"].xcom_pull(key='processed_count') or 0
for record in get_records_after(last_id):
process_record(record)
processed_count += 1
# Checkpoint every 10,000 records
if processed_count % 10000 == 0:
context["ti"].xcom_push(key='last_processed_id', value=record.id)
context["ti"].xcom_push(key='processed_count', value=processed_count)
Use Case-2: External Job Tracking and Polling
Problem: A task triggers an external job, then polls status. Infrastructure failures cause retries to launch duplicate jobs instead of tracking the existing one.
Benefit: Prevents duplicate external jobs and allows resumption of status polling for existing jobs after infrastructure failures.
Why not separate job Triggering and Polling into two steps?: Having them as two separate tasks may cause confusion and inconvenience in manual retries.
@task(persist_xcom_through_retry=True)
def trigger_and_wait_for_external_job(**context):
job_id = context["ti"].xcom_pull(key='external_job_id')
if not job_id:
job_id = trigger_external_job(payload=job_data)
context["ti"].xcom_push(key='external_job_id', value=job_id)
# Poll until completion
while True:
status = check_external_job_status(job_id)
if status == 'completed':
return get_external_job_result(job_id)
elif status == 'failed':
raise Exception(f"Job {job_id} failed")
time.sleep(30)
Use Case-3: More Efficient API Integration
Problem: External API calls with quota tracking and rate limit state management.
Benefit: Better respects API rate limits across multiple Airflow task retries.
@task(persist_xcom_through_retry=True)
def sync_with_external_api(**context):
remaining_quota = context["ti"].xcom_pull(key='remaining_quota') or 1000
if remaining_quota < 100:
remaining_quota += request_new_quota()
context["ti"].xcom_push(key='remaining_quota', value=remaining_quota)
results = make_api_calls(limit=remaining_quota)
remaining_quota -= len(results)
context["ti"].xcom_push(key='remaining_quota', value=remaining_quota)
return results
Use Case-4: Resource Management and Cleanup
Problem: Allocated cloud resources need to be tracked across retries to prevent leaks.
Benefit: Expensive resource allocation happens once, and cleanup is handled correctly.
Why not separate resource allocation and actual usage into two steps?: Having them as two separate tasks may cause confusion and inconvenience when manually retrying the actual usage step.
@task(persist_xcom_through_retry=True)
def process_with_cloud_resources(**context):
worker_ids = context["ti"].xcom_pull(key='allocated_workers') or []
if not worker_ids:
worker_ids = allocate_cloud_workers(count=10) # Expensive, and a waste to re-request for workers
context["ti"].xcom_push(key='allocated_workers', value=worker_ids)
try:
result = process_data_with_workers(worker_ids)
deallocate_cloud_workers(worker_ids)
return result
except Exception:
raise # Keep workers for retry
Use Case-5: Adaptive Processing with Learning
Problem: Need to optimize retry behavior based on previous attempt failures and system conditions.
Benefit: Builds self-improving workflows that learn from failures and optimize retry strategies automatically.
@task(persist_xcom_through_retry=True)
def adaptive_batch_processor(**context):
failed_attempts = context["ti"].xcom_pull(key='failed_attempts') or []
# Adapt parameters based on previous failures
if not failed_attempts:
batch_size, timeout = 1000, 300 # Default
else:
last_attemp = failed_attempts[-1]
last_error = last_attemp['error_type'].lower()
batch_size = last_attemp['latest_params']['batch_size']
timeout = last_attemp['latest_params']['timeout']
if 'memory' in last_error:
# Reduce batch to reduce memory need
batch_size = batch_size * 0.9
elif 'timed out' in last_error:
# Only adjust timeout
timeout = timeout * 1.1
else:
# Conservative
batch_size = batch_size * 0.95
timeout = timeout * 1.05
try:
result = process_data_batch(batch_size=batch_size, timeout=timeout)
return result
except Exception as e:
failed_attempts.append(
{
'error_type': type(e).__name__,
'latest_params': {'batch_size': batch_size, 'timeout': timeout}
}
)
context["ti"].xcom_push(key='failed_attempts', value=failed_attempts)
if len(failed_attempts) >= 5:
raise Exception("Too many adaptive attempts failed")
raise


1 Comment
Jens Scheffler
Same like the response on devlist by Ash Berlin-Taylor and Jarek Potiuk I'd assume a state storage is rather the solution space for the described use case.
I'd still think there would be demand of a "versioned XCom" backend that stores XCom per attempt at task output just to have a history of differences per attempt - but not to use it as state store but rather for compliance/tracking purposes.