This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the AdaptiveScheduler, rescaling occurs after verifying the available resources in the Executing state. If the desired or sufficient resources are available, transitioning into the Restarting state will be triggered. During the restart, the existing execution graph is canceled, and the AdaptiveScheduler ends up in the WaitingForResources state.

WaitingForResources will recheck for the desired and sufficient resources. If the desired resources are unmet, resource stabilization kicks in, introducing an additional waiting period. This second timeout is redundant since the decision to rescale was already made in the Executing state.

This proposal aims to align the resource control mechanisms of the AdaptiveScheduler's WaitingForResources and Executing states. This should lead to the following outcomes:

  • Cleaner code/removal of redundant code
  • Minimizing downtime during the rescaling operation
  • Improved user experience through the reorganization of available parameters

Comparing the control flows of both states

The following diagrams show the currently implemented control flow for the WaitingForResources and Executing states of the AdaptiveScheduler.

WaitingForResources control flow

Executing control flow

Comparison of control flows

Both states wait for the desired resources. When the desired resources are reached, the AdaptiveScheduler transitions to the subsequent state without delay. 

Aside from this, there’s also the concept of sufficient resources. For both states, sufficient means that the scheduler will remain in the current state for a configurable amount of time  (WaitingForResource: jobmanager.adaptive-scheduler.resource-stabilization-timeout, Executing: jobmanager.adaptive-scheduler.scaling-interval.max). The scheduler will transition to the subsequent state if the timeout is reached and sufficient resources are still available.

A difference in behavior between the two states is implemented for the case where sufficient resources are not met:

  • In the WaitingForResources state, the scheduler will transition to the CreatingExecutionGraph state at the end of the jobmanager.adaptive-scheduler.resource-stabilization-timeout timeout with the ExecutionGraph creation failing.
  • In the Executing state, the scheduler will not transition and will continue to run the job.

Another difference between the two states is the definition of desired and sufficient resources: 

  • For WaitingForResources, desired and sufficient resources are determined by checking the absolute numbers: The JobGraph’s per-vertex parallelism defines the threshold for sufficient resources, whereas the per-vertex maximum parallelism defines the desired resources.
  • The Executing state defines sufficient and desired resources relative to the currently deployed ExecutionGraph. Comparing its parallelism with the number of slots available for this job.

Public Interface

This change affects the scheduler implementations and doesn't affect public API.

The proposal introduces changes in adaptive scheduler configuration parameters:

Proposed Changes

  • Alignment of AdaptiveScheduler timeouts and rescale triggering
  • Desired and sufficient resources definition alignment for the WaitingForResources and the Executing states
  • Make AdaptiveScheduler omit the WaitingForResources state when rescaling

Alignment of AdaptiveScheduler timeouts and rescale triggering

The stabilization and cooldown timeout logic is handled differently in the WaitingForResources and Executing states. As this logic relates, we propose moving its handling to RescaleManager (specifically to its default implementation, DefaultRescaleManager) and using the RescaleManager component in both the WaitingForResources and Executing states. The RescaleManager component was introduced in FLIP-461.

This proposal suggests renaming the RescaleManager component to StateTransitionManager (and the corresponding implementation to DefaultStateTransitionManager) to reflect its broader responsibility.

The DefaultStateTransitionManager can be implemented as a state machine having the following state transitions:

The existing implementation needs to be extended to allow for different paths to be executed if sufficient resources are not met at the end of the stabilization phase:

  • WaitingForResources: Triggers the AdaptiveScheduler state change anyway.
  • Executing: No AdaptiveScheduler state change occurs. The manager remains Idle, waiting for the next change event. Once the change event is received, the stabilization period restarts.

This extension enables using the DefaultStateTransitionManager in both the WaitingForResources and Executing states. 

The DefaultStateTransitionManager will be configured differently for these states. The cooldown period is set to 0ms for the WaitingForResources state while the resource waiting timeout is disabled for the Executing state. 

The control flows for triggering a state transition based on the available resources are similar. The following subsections outline the steps to align these control flows and implement the proposed timeout handling.

Fix inconsistent timeout handling

The current implementation has some inconsistencies in how timeouts are handled for rescaling:

We suggest to remove this behavior to achieve a consistent timeout handling:

  • The cooldown period is always measured from the last rescale event (i.e., the Executing state's initialization).
  • The timeout interval for the stabilization phase is always measured from the appearance of the first change event or if the change event was observed during the cooldown at the end of the cooldown period.
  • If sufficient resources are unavailable at the end of the stabilization period, the timer is reset, and the next change event marks the beginning of a new stabilization period.

Aligning the definition of sufficient and desired resources

As mentioned in the Motivation section above, the desired and sufficient resources are defined differently in the WaitingForResources and Executing states. That brought additional complexity to the code. We propose to use the same definition as is in the WaitingForResources for both states. 

Additionally, in the Executing state, we will preserve a check that the new parallelism differs from the previous one to avoid unnecessary restarts.

Unifying the logic of how the timeouts and resources are handled will make the parameter jobmanager.adaptive-scheduler.min-parallelism-increase unnecessary. The same applies to the internal interface RescalingController and its two implementations:  EnforceMinimalIncreaseRescalingController and EnforceParallelismChangeRescalingController. 

We propose removing this legacy parameter and interface. To control the sensitivity of scaling operations, a combination of cooldown and stabilization timeouts should be used. The REST endpoint introduced in FLIP-291 should be used to control the definition of desired and/or sufficient resources.

Aligning rescaling parameters

We have various rescaling-related parameters right now that should be adapted to reflect the aligned mechanism: 

Note: due to space limitations, the parameters do not include the jobmanager.adaptive-scheduler.* prefix.

State

Parameter

Pre-FLIP

Post-FLIP

WaitingForResources

resource-stabilization-timeout

  • default: 10s
  • Trigger: Change event
  • Renamed to submission.resource-stabilization-timeout

resource-wait-timeout

  • default: 5min
  • Trigger: State initialization
  • Can be disabled
  • Renamed to submission.resource-wait-timeout

Executing

scaling-interval.min

  • default: 30s
  • “Cooldown”: No change events in that timeframe trigger any rescale
  • Trigger: Change event
  • Renamed to executing.cooldown-after-rescaling
  • The cooldown period will be disabled (i.e., 0ms) in the WaitingForResources state.
  • A slight change in the Executing state: Timeout starts during state initialization.

scaling-interval.max

  • default: n/a
  • Timeout, after which even sufficient resources are enough to trigger rescaling
  • Replaced with executing.resource-stabilization-timeout with default value 60s.

min-parallelism-increase

  • default: 1
  • Minimum increase in parallelism for a job to scale up.
  • The parameter will be deprecated and not used anymore.
  • This parameter becomes unnecessary as the number of scales will be reduced by the cooldown period and stabilization timeout.

max-delay-for-scale-trigger 

  • default: n/a
  • The maximum time the JM will wait to evaluate previously observed events for rescaling.
  • Since Flink 2.0 (not yet released; FLIP-461)
  • Replaced with rescale-trigger.max-delay
  • Simple renaming is enough since this parameter will be just introduced with Flink 2.0

scale-on-failed-checkpoints-count

  • default: 2
  • The number of failed checkpoints will trigger rescaling even without a completed checkpoint.
  • Since Flink 2.0 (not yet released; FLIP-461)
  • Replaced with rescale-trigger.max-checkpoint-failures
  • Simple renaming is enough since this parameter will be just introduced with Flink 2.0

For example, assume there is 1 slot in the entire cluster, and the job runs with parallelism 1. If a new task manager (1 slot) is added, and a user increases the lower bound parallelism to 2 and the upper bound to 3 through the REST endpoint, the system will rescale after 60 seconds (or the rescale will be aligned with the checkpoint event as specified in FLIP-461). Alternatively, if the user sets the same value to 3 for the lower and upper bounds, the rescale operation will wait until all slots are available.

Make AdaptiveScheduler omit the WaitingForResources state when rescaling

The current state transition diagram for the AdaptiveScheduler uses the Restarting state for rescaling:

This results in the stabilization phase being performed twice in scenarios where only sufficient resources are available. 

We propose introducing a new flag in the Restarting phase that directs the state to transition to CreatingExecutionGraph instead of WaitingForResources:

Test Plan

We will use unit tests to cover the correctness of introduced changes.

Compatibility, Deprecation, and Migration Plan

  • No changes happen for Flink jobs that do not utilize the AdaptiveScheduler.
  • The configuration jobmanager.adaptive-scheduler.resource-wait-timeout will be deprecated and replaced with jobmanager.adaptive-scheduler.submission.resource-wait-timeout. Flink will fall back to the old parameter if the old parameter is set explicitly, but no value is specified for the new version of the parameter.
  • The configuration jobmanager.adaptive-scheduler.resource-stabilization-timeout will be deprecated and replaced with jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout.Flink will fall back to the old parameter if the old parameter is set explicitly, but no value is specified for the new version of the parameter.
  • The configuration jobmanager.adaptive-scheduler.scaling-interval.min will be deprecated and replaced with jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling.Flink will fall back to the old parameter if the old parameter is set explicitly, but no value is specified for the new version of the parameter.
  • The configuration jobmanager.adaptive-scheduler.scaling-interval.max will be deprecated. A new configuration will be introduced with a different default value jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout. A log warning message pointing to the new configuration will be written if the parameter is still set. 
  • Since jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout will have a default value (60s) while jobmanager.adaptive-scheduler.scaling-interval.max does not, this change will always force a scaling operation once the timeout expires if the new parallelism differs from the current one. 

One can achieve the same behavior as not setting a value for jobmanager.adaptive-scheduler.scaling-interval.max by setting the same value for both the lower and upper bound parallelism using the REST endpoint introduced in FLIP-291.

Rejected Alternatives

To avoid performing the stabilization phase twice, we could introduce another AdaptiveScheduler state, Rescaling, which would share common functionality with the Restarting state. However, instead of transitioning to WaitingForResources, it would transition directly to CreatingExecutionGraph. The rationale is that resource control already occurs in the Executing state, making WaitingForResources redundant in this control flow. See the alternative state transition below:

However, introducing a new state would bring additional code complexity and state duplication. Additionally, a new JobStatus would probably need to be added.