DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here (<- link to https://lists.apache.org/thread/2d06gny1n84vp2216w38j8gy20pqhhkd)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-39267)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Checkpoint configuration (checkpointInterval, checkpointTimeout) is currently immutable after job submission. This creates three key operational problems for long-running streaming jobs:
1. Static configuration cannot adapt to variable workloads. Checkpoint duration fluctuates with traffic patterns. A generous timeout masks genuine hangs; a tight timeout causes unnecessary failures during load spikes. There is no way to tune this at runtime.
2. Cascading checkpoint failures cannot be resolved without restart. When checkpoints begin timing out due to transient issues (storage slowdowns, GC pauses), operators must restart the entire job to adjust configuration — causing data reprocessing delays and potential downstream impact.
3. Near-complete checkpoints are wasted on timeout. A checkpoint that has persisted 95% of state but exceeds the timeout is entirely discarded — all I/O is wasted, and the next attempt starts from scratch. For large-state jobs (hundreds of GBs), this creates a cascading failure loop where the job never successfully checkpoints.
Flink already has precedents for runtime dynamic adjustment: setIsProcessingBacklog() (FLIP-309) dynamically switches checkpoint intervals, and PUT /jobs/:jobid/resource-requirements dynamically updates parallelism with persistence. This FLIP extends the same principle to checkpoint configuration.
Public Interfaces
REST API
PATCH /jobs/:jobid/checkpoints/configuration
Request body (all fields optional, only provided fields are updated):
{ "checkpointInterval": 60000, "checkpointTimeout": 600000 }
| Field | Type | Description |
|---|---|---|
checkpointInterval | long (ms) | Base checkpoint interval. Must be > 0. |
checkpointTimeout | long (ms) | Max checkpoint duration. Must be > 0. Applies to both in-flight and subsequent checkpoints. |
Response: 200 OK on success, 400 for invalid params, 404 if job not found, 409 if another update is in progress.
The existing GET /jobs/:jobid/checkpoints/config endpoint will be extended to return the effective (dynamically updated) values.
Proposed Changes
Phase 1 focuses on two parameters: checkpointInterval and checkpointTimeout. Both fields in CheckpointCoordinator change from final to volatile.
The design has three pillars: in-flight timeout modification, persistence across failover, and interval rescheduling.
In-Flight Timeout Modification
When checkpointTimeout is updated, the change applies immediately to all pending checkpoints by rescheduling their canceller timers:
- For each
PendingCheckpoint, computeremainingTimeout = newTimeout - elapsed. - If
remainingTimeout > 0: cancel oldCheckpointCanceller, schedule new one withremainingTimeout, callresetCancellerHandle(). - If
remainingTimeout <= 0: abort the checkpoint immediately.
Example: A checkpoint at 95% completion with 10 seconds until timeout. SRE extends timeout from 10min to 15min → old canceller is cancelled, new canceller fires in 5min5s → checkpoint completes at 10min2s ✓. All I/O work is preserved.
Thread safety: All operations are within the existing synchronized(lock) block. The CheckpointCanceller.run() already has an idempotent isDisposed() guard, making cancel-reschedule race-free.
Interval Rescheduling
When checkpointInterval is decreased, the periodic trigger is rescheduled to fire sooner — following the same pattern as setIsProcessingBacklog().
Persistence Across Failover
Dynamic changes are persisted to ExecutionPlanStore following the JobResourceRequirements pattern:
- Storage:
JobCheckpointingOverridesserialized intoExecutionPlan.getJobConfiguration()viaInstantiationUtil, using key$internal.job-checkpoint-overrides. - Write path:
DefaultExecutionPlanStore.putJobCheckpointingOverrides()— fetch-modify-write undersynchronized(lock), identical toputJobResourceRequirements(). - Recovery: On job startup / failover, read
JobCheckpointingOverrides.readFromExecutionPlan()and apply toCheckpointCoordinator. No manual API re-invocation needed.
| Failure Scenario | Behavior |
|---|---|
| API succeeds | Persisted AND applied in-memory |
| Crash after persist, before in-memory | Overrides re-applied on recovery — no loss |
| Crash before persist | Client receives error — no partial state |
| Concurrent API calls | Second request rejected with 409 |
Compatibility, Deprecation, and Migration Plan
- Fully backward compatible: purely additive change, no existing APIs modified or removed.
- No migration required: jobs not using the new API behave identically.
Test Plan
- Unit tests:
CheckpointCoordinator— verify interval/timeout update, periodic trigger reschedule, in-flight canceller reschedule, abort on reduced timeout.PendingCheckpoint— verifyresetCancellerHandle()behavior.JobCheckpointingOverrides— serialization roundtrip. Handler — request validation. - Integration tests: Verify REST API updates take effect; verify in-flight checkpoint survives extended timeout; verify
GET .../configreflects updated values; verify failover recovery (followingUpdateJobResourceRequirementsRecoveryITCasepattern); verify concurrent update rejection. - E2E tests:
curl-based verification against running cluster.
Rejected Alternatives
1. Generic job configuration API. A PATCH /jobs/:jobid/configuration accepting arbitrary key-values lacks type safety and domain-specific validation.
2. Timeout changes only for subsequent checkpoints. Fails to address the most critical use case — saving near-complete checkpoints from timeout. The canceller reschedule mechanism is low-cost and race-free.
3. Non-persistent (in-memory only) changes. SRE-initiated changes should not be silently lost on failover. The JobResourceRequirements persistence pattern is proven and directly reusable.
Relationship with FLIP-530 (Dynamic Job Configuration)
FLIP-530 provides a generic framework for runtime configuration updates but requires a job restart to apply changes and is limited to AdaptiveScheduler. FLIP-571 is complementary — it targets checkpoint parameters specifically, with two capabilities FLIP-530 cannot offer: no-restart updates and in-flight timeout rescue (rescheduling canceller timers to save near-complete checkpoints). When FLIP-530 matures, FLIP-571's endpoint could be integrated as a specialized handler within that framework.
Future Work
Additional parameters (minPauseBetweenCheckpoints, maxConcurrentCheckpoints, tolerableCheckpointFailureNumber, alignedCheckpointTimeout) may be supported in future iterations.
