Status

Current state: Under Discussion

Discussion threadhere (<- link to https://lists.apache.org/thread/2d06gny1n84vp2216w38j8gy20pqhhkd)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-39267)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Checkpoint configuration (checkpointInterval, checkpointTimeout) is currently immutable after job submission. This creates three key operational problems for long-running streaming jobs:

1. Static configuration cannot adapt to variable workloads. Checkpoint duration fluctuates with traffic patterns. A generous timeout masks genuine hangs; a tight timeout causes unnecessary failures during load spikes. There is no way to tune this at runtime.

2. Cascading checkpoint failures cannot be resolved without restart. When checkpoints begin timing out due to transient issues (storage slowdowns, GC pauses), operators must restart the entire job to adjust configuration  causing data reprocessing delays and potential downstream impact.

3. Near-complete checkpoints are wasted on timeout. A checkpoint that has persisted 95% of state but exceeds the timeout is entirely discarded  all I/O is wasted, and the next attempt starts from scratch. For large-state jobs (hundreds of GBs), this creates a cascading failure loop where the job never successfully checkpoints.

Flink already has precedents for runtime dynamic adjustment: setIsProcessingBacklog() (FLIP-309) dynamically switches checkpoint intervals, and PUT /jobs/:jobid/resource-requirements dynamically updates parallelism with persistence. This FLIP extends the same principle to checkpoint configuration.

Public Interfaces

REST API

PATCH /jobs/:jobid/checkpoints/configuration

Request body (all fields optional, only provided fields are updated):


{ "checkpointInterval": 60000, "checkpointTimeout": 600000 }
FieldTypeDescription
checkpointIntervallong (ms)Base checkpoint interval. Must be > 0.
checkpointTimeoutlong (ms)Max checkpoint duration. Must be > 0. Applies to both in-flight and subsequent checkpoints.

Response: 200 OK on success, 400 for invalid params, 404 if job not found, 409 if another update is in progress.

The existing GET /jobs/:jobid/checkpoints/config endpoint will be extended to return the effective (dynamically updated) values.

 

Proposed Changes


Phase 1 focuses on two parameters: checkpointInterval and checkpointTimeout. Both fields in CheckpointCoordinator change from final to volatile.

The design has three pillars: in-flight timeout modification, persistence across failover, and interval rescheduling.

In-Flight Timeout Modification

When checkpointTimeout is updated, the change applies immediately to all pending checkpoints by rescheduling their canceller timers:

  1. For each PendingCheckpoint, compute remainingTimeout = newTimeout - elapsed.
  2. If remainingTimeout > 0: cancel old CheckpointCanceller, schedule new one with remainingTimeout, call resetCancellerHandle().
  3. If remainingTimeout <= 0: abort the checkpoint immediately.

Example: A checkpoint at 95% completion with 10 seconds until timeout. SRE extends timeout from 10min to 15min  old canceller is cancelled, new canceller fires in 5min5s  checkpoint completes at 10min2s ✓. All I/O work is preserved.

Thread safety: All operations are within the existing synchronized(lock) block. The CheckpointCanceller.run() already has an idempotent isDisposed() guard, making cancel-reschedule race-free.

Interval Rescheduling

When checkpointInterval is decreased, the periodic trigger is rescheduled to fire sooner  following the same pattern as setIsProcessingBacklog().

Persistence Across Failover

Dynamic changes are persisted to ExecutionPlanStore following the JobResourceRequirements pattern:

  • Storage: JobCheckpointingOverrides serialized into ExecutionPlan.getJobConfiguration() via InstantiationUtil, using key $internal.job-checkpoint-overrides.
  • Write path: DefaultExecutionPlanStore.putJobCheckpointingOverrides() — fetch-modify-write under synchronized(lock), identical to putJobResourceRequirements().
  • Recovery: On job startup / failover, read JobCheckpointingOverrides.readFromExecutionPlan() and apply to CheckpointCoordinator. No manual API re-invocation needed.
Failure ScenarioBehavior
API succeedsPersisted AND applied in-memory
Crash after persist, before in-memoryOverrides re-applied on recovery — no loss
Crash before persistClient receives error — no partial state
Concurrent API callsSecond request rejected with 409


Compatibility, Deprecation, and Migration Plan

  • Fully backward compatible: purely additive change, no existing APIs modified or removed.
  • No migration required: jobs not using the new API behave identically.

Test Plan

  • Unit tests: CheckpointCoordinator — verify interval/timeout update, periodic trigger reschedule, in-flight canceller reschedule, abort on reduced timeout. PendingCheckpoint — verify resetCancellerHandle() behavior. JobCheckpointingOverrides — serialization roundtrip. Handler — request validation.
  • Integration tests: Verify REST API updates take effect; verify in-flight checkpoint survives extended timeout; verify GET .../config reflects updated values; verify failover recovery (following UpdateJobResourceRequirementsRecoveryITCase pattern); verify concurrent update rejection.
  • E2E tests: curl-based verification against running cluster.

Rejected Alternatives

1. Generic job configuration API. A PATCH /jobs/:jobid/configuration accepting arbitrary key-values lacks type safety and domain-specific validation.

2. Timeout changes only for subsequent checkpoints. Fails to address the most critical use case  saving near-complete checkpoints from timeout. The canceller reschedule mechanism is low-cost and race-free.

3. Non-persistent (in-memory only) changes. SRE-initiated changes should not be silently lost on failover. The JobResourceRequirements persistence pattern is proven and directly reusable.

Relationship with FLIP-530 (Dynamic Job Configuration)

FLIP-530 provides a generic framework for runtime configuration updates but requires a job restart to apply changes and is limited to AdaptiveScheduler. FLIP-571 is complementary — it targets checkpoint parameters specifically, with two capabilities FLIP-530 cannot offer: no-restart updates and in-flight timeout rescue (rescheduling canceller timers to save near-complete checkpoints). When FLIP-530 matures, FLIP-571's endpoint could be integrated as a specialized handler within that framework.

Future Work

Additional parameters (minPauseBetweenCheckpoints, maxConcurrentCheckpoints, tolerableCheckpointFailureNumber, alignedCheckpointTimeout) may be supported in future iterations.


  • No labels