You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Rescaling a job comes with some downtime that is caused by stopping the job and restarting it from its most-recent state with a changed JobVertex parallelism configuration. 

Any data that was processed between the most-recent checkpoint and the moment the rescaling was started needs to be reprocessed causing an additional delay until new data is processed. We could reduce the amount of redundantly processed data by delaying the rescale operation until a new checkpoint was completed successfully.

Currently, rescaling might get triggered in the following cases:

  • The available resources changed (i.e. slots/TMs were added or removed from the cluster).
  • The job’s resource requirements changed.

The cooldown phase following the rescaling works like a grace period to prevent an immediate subsequent rescaling. Any triggering event that is caught while the scheduler is in the cooldown phase will be collected. The corresponding rescale operation will be triggered after the cooldown phase is over (see FLIP-322).

Scenarios

The following diagrams illustrates different scenarios:

Resource change after cooldown with completed checkpoint

Resource change in cooldown with failed checkpoint

Resource change events are accumulated until rescaling is triggered (current behavior)

Failed checkpoints trigger rescaling with delay

Public Interfaces

This change affects the scheduler implementations and, therefore, doesn't affect public API.

The following new configuration parameters are introduced:

  • jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count
  • jobmanager.adaptive-scheduler.max-delay-for-scale-trigger

These parameters will be described in more detail in the following section about Proposed Changes.

Proposed Changes

New internal interface RescaleManager

Currently, rescaling is controlled by two components: The AdaptiveScheduler (where the RescalingController lives) and the AdaptiveScheduler's Executing state (where rescaling events are handled).

This FLIP suggests combining the logic into a new internal component RescaleManager to organize responsibilities in a better way and improve testability.

RescaleManager
/** The {@code RescaleManager} decides on whether rescaling should happen or not. */
public interface RescaleManager {

   /** Is called if the environment changed in a way that a rescaling could be considered. */
   void onChange();

   /**
    * Is called when any previous observed environment changes shall be verified possibly
    * triggering a rescale operation.
    */
   void onTrigger();
}

The RescaleManager component will live in the AdaptiveScheduler's Executing state. The responsibilities of each class will change in the following way:

  • AdaptiveScheduler: Provides knowledge of the current environment.
  • Executing: Handles state changes for the AdaptiveScheduler
  • RescaleManager: Handles rescale decision logic (maintaining the different phases like cooldown phase and collecting change events)

New internal interface CheckpointStatsListener

The completion of a checkpoint is monitored by the CheckpointStatsTracker. The CheckpointStatsTracker gets notified if a checkpoint was successfully completed, i.e. all its data was persisted and its metadata was written to the CompletedCheckpointStore. At this point, the checkpoint becomes available to job restores and, therefore, can be utilized for restarts during a rescale operation. The CheckpointStatsTracker is also notified about failed checkpoints. This allows the rescale logic to handle errors.

The CheckpointStatsTracker which currently lives in the DefaultExecutionGraphFactory. The instance will be moved out into the scheduler. A new interface CheckpointStatsListener is introduced that is implemented by AdaptiveScheduler und used within the CheckpointStatsTracker:

CheckpointLifecycleListener
/** An interface that allows listening on the checkpoint lifecycle events within the {@link CheckpointStatsTracker}. */
public interface CheckpointStatsListener {

   /** Called when a checkpoint was completed successfully. */
   default void onCompletedCheckpoint() {}

   /** Called when a checkpoint failed. */
   default void onFailedCheckpoint() {}
}

The CheckpointStatsTracker will utilize the interface if provided. No listener being provided will result in no events being forwarded (used by the SchedulerBase implementations).

New Configuration Parameters

Rescale after certain amount of failed Checkpoints

Checkpoints can fail but we only want to trigger rescaling in case of a successfully completed checkpoint. But we also don't want to delay the rescale for too long in case of failed checkpoints (e.g. the failure could be caused by insufficient resources which might require a rescale). 

The trade-off for the user is to not rescale too early in case of a failed checkpoint (because the next checkpoint could succeed resulting in a proper rescale without reprocessing data) or too late if too many checkpoints failed. In that case, rescaling might be triggered but the recovered state is way older causing more data to be reprocessed.

jobmanager.adaptive-scheduler.rescale-on-failed-checkpoints-count
@Documentation.Section({
    Documentation.Sections.EXPERT_SCHEDULING,
    Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Integer> SCHEDULER_SCALE_ON_FAILED_CHECKPOINTS_COUNT =
        key("jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count")
                .intType()
                .defaultValue(2)
                .withDescription(
                        Description.builder()
                                .text(
                                        "The number of consecutive failed checkpoints that will initiate rescaling.")
                                .build());

The default will be set to 1 to allow immediate rescaling even in case of failure.

Rescale after a certain amount of time

We need a fallback for when checkpoints are delayed for too long in general. This also applies if checkpointing is deactivated. In that case, rescaling needs to be triggered based on processing time. The following configuration parameter is introduced to address this scenario: jobmanager.adaptive-scheduler.max-delay-for-scale-trigger

The default value should be relative to other configuration parameters:

  • Checkpointing disabled: The delay should be set to 0ms resulting in an immediate rescaling operation (pre-FLIP-461 behavior).
  • Checkpointing enabled: The delay should be set to the checkpointing interval multiplied by the newly introduced configuration parameter j.a.scale-on-failed-checkpoints-count incremented by 1. This allows the j.a.scale-on-failed-checkpoints-count to take effect.

Compatibility, Deprecation, and Migration Plan

  • No changes happen for Flink jobs that do not utilize the AdaptiveScheduler
  • No changes happen for Flink jobs that do utilize the AdaptiveScheduler but have checkpointing disabled (due to the default value of j.a.max-delay-for-scale-trigger being 0ms)
  • Rescaling will be delayed for Flink jobs that do utilize the AdaptiveScheduler and have checkpointing enabled. Users will be informed via release notes. Documentation happens in Flink's configuration documentation.

Test Plan

  • The contracts of the different internal components can be covered by unit tests.
  • ITCase for covering the rescaling with checkpointing being enabled should be added.

Rejected Alternatives

No rejected alternatives so far.

  • No labels