This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

FYI:
The original title of the FLIP was 'FLIP-495: Support AdaptiveScheduler record and query the rescale history'.
Since the design work of the query interface was separated into FLIP-487 during the discussion, we have therefore changed the title of the FLIP to:

'FLIP-495: Support AdaptiveScheduler record and store the rescale history'

Motivation

Supporting AdaptiveScheduler record and query the rescale history (number of slots used, task parallelisms) is very useful for users.

  • Facilitate users to trace the history of rescale and make rescale information more transparent
  • Provide users with information on optimizing Adaptive Scheduler parameters

Note: this FLIP was split from FLIP-487, and here is some discussion history related to the current FLIP for references.

Design

Current core adaptive scheduler state transition about rescale

core fig

From the fig,

  1. the entire chain of rescale-related scheduler states typically involves several loops:
    1. WaitingForResources -> CreatingExecutionGraph -> Executing -> StopWithSavepoint (error & restartable) -> Restarting -> WaitingForResources
    2. CreatingExecutionGraph  [-> WaitingForResources -> CreatingExecutionGraph ](optional loop ) -> Executing(rescale triggers) -> Restarting -> CreatingExecutionGraph
  2. a rescale event may be triggered under the following circumstances, called 'rescale triggers' here:
    1. updateJobResourceRequirement
    2. Restart due to recoverable failure (including the StopWithSavepoint recoverable failure)
    3. newResourceAvailable
    4. Init submit.

The related abstraction description

Treat the rescale change is a rescale event.

Rescale Event:

Generally, we consider the following three expected situations to trigger rescale:

  • Init-Submit
  • updateJobResourceRequirement
  • Restart due to recoverable failure
  • newResourceAvailable

Simply, Rescale is mainly composed of the following sets of states.

  • WaitingForResources
  • CreatingExecutionGraph
  • Executing
  • Restarting

Specifically, when a StopWithSavepoint fails due to a recoverable exception, the job may also undergo the rescale process. And When the job is first run, it will also go through the above core states.

Rescale event terminal state


The transition of Scheduler State and rescale TerminalState

The rescale will be goto a terminal state with the terminated reason information:

  • COMPLETED
    • It means that the AdaptiveScheduler has successfully completed a rescale.

  • FAILED

    • It means that the current rescale event has failed due to error.

  • IGNORED

    • Assuming there is an ongoing rescale event that is not in a Terminated state, if external resources are explicitly specified, the current rescale terminated state should be forcibly set to IGNORED.

    • Or the Rescale event would not do the rescaling for the job finally.


The main scenarios where Rescale switches to terminated

When there is a rescale in non-terminated status :

  • Upon receiving updateJobResourceRequirements, the current rescale is forcibly set to IGNORED and a new rescale is generated to record subsequent rescale information.

  • When a StateTransition jumps to the Stabilized phase and there is no difference between the new and original resource quantities, the phase re-enters the idling stage. At this point, the current rescale attempt ends because no exceptions or external condition changes caused a parallelism adjustment. Thus, the current Rescale should be set with IGNORED terminated state.

  • When invoking AdaptiveScheduler methods (goToCancelling, goToFailing, goToFinished), it indicates the entire job is preparing to terminate, so the rescale terminated state should be marked as IGNORED

  • If the job encounters a recoverable exception triggering a restart, the current rescale terminated state should be switched to IGNORED and a new rescale should be generated to record subsequent rescale information.

  • If the AdaptiveScheduler attempts to define parallelism using available slots but the number of slots is less than the sufficient slots requirement, a slot insufficiency exception is thrown. In this case, the rescale is marked as FAILED reason indicating the rescale ended due to insufficient resources.

  • When rescale succeeds (i.e., the AdaptiveScheduler state transitions to Executing), the rescale terminated state  is marked as COMPLETED indicating a successful rescale.


Rescale ID & ResourceRequirements request

  • ResourceRequirementsID: Each rescale requirement corresponds to a ResourceRequirementsID
    • A new value(UUID) will be generated for ResourceRequirementsID when the job is first started and each time an update resource requirements request is received from the REST API.
      It represents the ID of the desired resource requirement. In theory, each time updateJobResourceRequirements is called, the desired resource requirement is assigned a new value,
      so the resourceRequirementsID is also updated with a new value.
  • RescaleAttemptId: For the same rescale requirement (ResourceRequirementsID), multiple rescale attempts may occur.
    We treat each rescale under the current ResourceRequirementsID
      as a rescale attempt.
    That is, for each rescale attempt on the same rescale requirement (ResourceRequirementsID
    ), the rescale attempt id will increment by 1.
    • A rescale attempt is a rescale event. The slight difference is that its rescale attempts id represents the total number of rescale events that occurred when the JobResourceRequirements corresponding to the resourceRequirementsID were set as the desired resource requirement.
    • A failed attempt refers to a failed rescale event. It does not mean that the expected parallelism was not reached, but rather that the minimum available parallelism was not met or some other exception occurred.
      Perhaps the field here could be named RescaleAttemptId to make it more intuitive and easier to understand.
  • Based on these points, whenever a rescale attempt happens:
    1. For example, assume the current resourceRequirements as resourceRequirementsID1, RescaleAttemptId in the resourceRequirementsID1 scope is 1,
    2. When starting a rescaling attempt at timestamp 10000 , the RescaleAttemptId will be 2
    3. After the successful rescale, assume there was a new rescale caused by failure at timestamp 20000, the RescaleAttemptId would be 3 
    4. Then the scheduler received a new resourceRequirements as resourceRequirementsID2(new), The next rescale attempt will be triggered, the RescaleAttemptId would be 1
  • RescaleUUID
    • ​Additionally, to distinguish from the two ID combinations mentioned above and to mark the independence of each rescale operation, we introduce a ​​UUID-type descriptor​​ for each Rescale, called ​​RescaleUUID​​. This means every Rescale will use a ​​new UUID​​ as its RescaleUUID.

The diagram below illustrates the relationships and constraints between these three properties.

source fig

NOTE:

If the draft that includes the use of IDs in the description section is adopted, the pros and cons are as follows:

  • It makes it easier to understand certain relationships between Flink and external requests.

  • The current ID part is no need to consider mechanisms related to High Availability (HA) at this stage.

 

The information to record in a rescale event

Rescale ID information.

As mentioned in 'Rescale ID & ResourceRequirements request':

  • ResourceRequirementsID
  • RescaleUUID
  • RescaleAttemptId

Job Vertices Rescaled Information

  • Job Vertex ID
  • Vertex name
  • The  id of the slot sharing group where the job vertex is located.
  • The previous parallelisms of the job vertex before the current rescale.

  • The acquired parallelism of the job vertex
  • The desired parallelism of the job vertex
  • The sufficient parallelism of the job vertex

Slots / Resources Rescaled Information

During a complete rescale event (one rescaling cycle), there are four different resource allocation concepts:

  • Current resources used before the next rescale( current slots): All resources held by the job before rescaling to support current running.
  • Desired resources: The ideal resource allocation we aim for if sufficient task slots are available (essentially the upper limit of job parallelism).
  • Sufficient resources: The minimum amount of resources required for the job to run (the lower bound of job parallelism, i.e., the resources required in the parallelism request).
  • Acquired/Reserved resources: The actual resources used by the job after a successful rescale.

The first two define the resource configuration used for rescaling decisions.
The latter two represent the actual applied resource configuration.

It is important to note that the latter two may not necessarily match the resource configurations considered when deciding to rescale.
For example, the desired resources may have been available when rescaling was triggered, but task slots could be lost during the rescale deployment process, leading to unexpected results.

 

 So, the part information of slots should contain the following items.

  • The slot sharing group id
  • The slot sharing group name
  • The required resource profile of the slot sharing group
  • The acquired resource profile of the slot sharing group
  • The desired slot number of the slot sharing group
  • The sufficient slot number of the slot sharing group
  • The previous  slot number of the slot sharing group
  • The acquired slot number of the slot sharing group

Other information

  • The terminated reason type about a rescale when it reaching a terminated state.
  • The terminal state to represents the current rescale terminal state stage..
  • The trigger-description to drive the rescale events.
    • updateJobResourceRequirement
    • Restart due to recoverable failure
    • newResourceAvailable
    • initial submit
  • The scheduler states history information during the rescale event
    • State name
    • The timestamp on entering the state
    • The timestamp on leaving the state
    • The duration of the scheduler state
      • It could be produced from end & trigger timestamp of the corresponding scheduler state.
    • The error with timestamp  in the state span
  • The trigger timestamp of the Rescale event
  • The end timestamp of the Rescale event
  • The duration of the rescale event.
    • It could be produced from end & trigger timestamp of the rescale event.

About rescale events storage

Introduce a parameter 'web.adaptive-scheduler.rescale-history.size' to control the number of recent rescale events to be retained.

  • Memory + Disk
    • Keep the rescale events specified by  'web.adaptive-scheduler.rescale-history.size' in AdaptiveScheduler like ExceptionHistory in memory and disk.
    • After a job failure, a backup remains available on disk, though it may be unusable in some cases, such as when the JobManager moves after HA.
    • i.e. following what's done for the ExecutionGraphInfoStore for now

Proposed Changes

Public Interfaces

How to enable the feature ?


Introduce a config option to control the max rescale size to record/hold: 

  • web.adaptive-scheduler.rescale-history.size
    • Default value: 0
    • Type: integer
    • Description: The maximum number of the rescale history per job whose scheduler is AdaptiveScheduler.  When the configuration value is less than 1, this feature will not be enabled.

Internal Interfaces

How to represent a rescale event?

Introduce the IdEpoch class to represent the rescale id related information

IdEpoch
/** The class to represent the rescale id description in one resource requirements rescale. */
public class IdEpoch {
    private final AbstractID rescaleUuid;
    private final AbstractID resourceRequirementsId;
    private final long rescaleAttemptId;
    // Placeholders...
}

Introduce the VertexParallelismRescale class to represent a job vertex rescale information.

VertexParallelismRescale
/** The rescale information of a {@link org.apache.flink.runtime.jobgraph.JobVertex}. */
public class VertexParallelismRescale {
    private final JobVertexID jobVertexId;
    private String jobVertexName;
    private SlotSharingGroupId slotSharingGroupId;
    private Integer desiredParallelism;
    private Integer sufficientParallelism;
    @Nullable private Integer preRescaleParallelism;
    @Nullable private Integer postRescaleParallelism;
    // Placeholders...
}

Introduce the SlotSharingGroupRescale class to represent a sot sharing group rescale information.

SlotSharingGroupRescale
/**
 * The matching information of a requested {@link
 * org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup}.
 */
public class SlotSharingGroupRescale {
    private final SlotSharingGroupId slotSharingGroupId;
    private String slotSharingGroupName;
    private ResourceProfile requiredResourceProfile;
    private Integer desiredSlots;
    private Integer sufficientSlots;
    @Nullable private Integer preRescaleSlots;
    @Nullable private Integer postRescaleSlots;
    @Nullable private ResourceProfile acquiredResourceProfile;
    // Placeholders...
}

Introduce the SchedulerStateSpan class to represent a scheduler state information during a  rescale.

SchedulerStateSpan
/** Utils class to record the information of a scheduler state. */
public class SchedulerStateSpan {
    private final String state;
    @Nullable private final Long inTimestamp;
    @Nullable private final Long outTimestamp;
    @Nullable private final Long duration;
    @Nullable private final String stringedException;
    // Placeholders...
}


Introduce the TriggerCause class to represent the trigger cause of a rescale.

TriggerCause
/** The cause of trigger rescaling. */
public enum TriggerCause {
     /** The first scheduling when the job starting. */
     INITIAL_SCHEDULE,
     /** Update job resource requirements. */
     UPDATE_REQUIREMENT,
     /** New resources available. */
     NEW_RESOURCE_AVAILABLE,
     /** Recoverable failover. */
     RECOVERABLE_FAILOVER
}

Introduce the TerminalState class to represent the terminated state of the current rescale .

TerminalState
public enum TerminalState {
    COMPLETED,
    FAILED,
    IGNORED;
}

Introduce the TerminatedReason class to represent the rescale terminated reason.

TerminatedReason
public enum TerminatedReason {
    SUCCEEDED(TerminalState.COMPLETED),
    EXCEPTION_OCCURRED(TerminalState.FAILED),
    RESOURCE_REQUIREMENTS_UPDATED(TerminalState.IGNORED),
    NO_SUFFICIENT_RESOURCES_OR_PARALLELISMS_CHANGE(TerminalState.IGNORED),
    JOB_FINISHED(TerminalState.IGNORED),
    JOB_FAILING(TerminalState.IGNORED),
    JOB_FAILOVER_RESTARTING(TerminalState.IGNORED),
    JOB_CANCELING(TerminalState.IGNORED);

    private final TerminalState terminalState;
}


Introduce the Rescale class to represent a rescale.

Rescale
/** Rescale event. */
public class Rescale {
     private final IdEpoch idEpoch;
     private final boolean newResourceRequirement;     
     // To hold the vertices & slots information during the rescale.
     private final Map<JobVertexID, VertexParallelismRescale> vertices;
     private final Map<SlotSharingGroupId, SlotSharingGroupRescale> slots;
     // To hold the scheduler states information during the rescale.
     private final List<SchedulerStateSpan> schedulerStates;

     private Long startTimestamp;
     private Long endTimestamp;
     private TriggerCause triggerCause;     
     @Nullable private terminalState terminalState;
     private TerminatedReason terminatedReason;
 }

How to hold/maintain the rescales history ?

Introduce a RescaleTimeline at AdaptiveScheduler to hold/maintain the rescales history

RescaleTimeline
/** The rescale line information updating interface. */
public interface RescaleTimeline {

    /**
     * Judge whether the timeline contain terminated rescale or empty rescale.
     *
     * @return <code>ture</code> if contain <code>false</code> else.
     */
    boolean inIdling();

    /**
     * Judge whether the timeline contain non-terminated rescale.
     *
     * @return <code>ture</code> if contain <code>false</code> else.
     */
    boolean notTerminated();

    /**
     * Create a new rescale and assign it as current rescale.
     *
     * @param newRescaleEpoch It represents whether the rescale resource requirements is in the new
     *     epoch.
     * @return <code>ture</code> if create successfully <code>false</code> else.
     */
    boolean newCurrentRescale(boolean newRescaleEpoch);

    @Nullable
    Rescale latestRescale(TerminalState terminalState);

     /**
     * Update the current rescale.
     *
     * @param rescaleUpdater The action to update the current rescale.
     * @return <code>ture</code> if update successfully <code>false</code> else.
     */
    boolean updateCurrentRescale(RescaleUpdater rescaleUpdater);

    /** Rescale operation interface. */
    interface RescaleUpdater {
        void update(Rescale rescaleToUpdate);
    }
}

Introduce a RescaleTimeline related Getter method in StateWithExecutionGraph.Context/StateWithoutExecutionGraph.Context for calling RescaleTimeline in AdaptiveScheduler states to record rescale information conveniently.

StateWithExecutionGraph.Context/StateWithoutExecutionGraph.Context
public interface StateWithExecutionGraph {
    // Old lines placeholder.
    
    interface Context extends StateTransitions.ToFinished {
         // Old lines placeholder.
 
         /**
         * Get the rescale timeline.
         *
         * @return The rescale timeline handler.
         */
        RescaleTimeline getRescaleTimeline(); 
     }

}

public interface StateWithoutExecutionGraph {
    // Old lines placeholder.
    
    interface Context extends StateTransitions.ToFinished {
         // Old lines placeholder.
 
         /**
         * Get the rescale timeline.
         *
         * @return The rescale timeline handler.
         */
        RescaleTimeline getRescaleTimeline(); 
     }

}

How to process rescales storage ?

Introduce rescale history related snapshot classes(like as CheckpointStatsSnapshot) in ExecutionGraphInfo . In this way, it will follow processing logic of ExecutionGraphInfo in the ExecutionGraphInfoStore for storage and visiting.
ExecutionGraphInfo could be modified to include the Rescale history (along the exception history). The data can be then saved in the ExecutionGraphInfoStore. All we have to do is to store the rescale history in AdaptiveScheduler and an object that holds the infos of the current rescale process. If this current rescale process terminates, an immutable rescale snapshot of this event is created that is saved in the rescale history.

RescalesStatsSnapshot
public class RescalesStatsSnapshot implements Serializable {
    private static final long serialVersionUID = 1L;

    private final List<Rescale> rescaleHistory;
    private final RescalesSummarySnapshot rescalesSummarySnapshot;

    // Placeholders...
}

Compatibility, Deprecation, and Migration Plan

  • N.A

Test Plan


Add the corresponding test cases.

The draft PR for POC

https://github.com/RocMarshal/flink/pull/7

Rejected Alternatives

Rescale event terminal state

This Rescale status is aligned with the Adaptive Scheduler's status, making it easier to understand, which can be called SchedulerState, too.


The transition of RescaleStatus/SchedulerState and TerminalState

  • Created: Initiate scheduler state when starting at first scheduling.
  • CreatingExecutionGraph (i.e. the corresponding adaptive scheduler state during a rescale event)

  • WaitingForResources (i.e. the corresponding adaptive scheduler state during a rescale event)
  • Executing (i.e. the corresponding adaptive scheduler state during a rescale event)
  • Restarting (i.e. the corresponding adaptive scheduler state during a rescale event)

The rescale will be goto a terminal state with the terminated reason information:

  • COMPLETED
    • It means that the AdaptiveScheduler has successfully completed a rescale.

  • FAILED

    • It means that the current rescale event has failed due to error.

  • IGNORED

    • Assuming there is an ongoing rescale event that is not in a Terminated state, if external resources are explicitly specified, the current rescale terminated state should be forcibly set to IGNORED.

    • Or the Rescale event would not do the rescaling for the job finally.


The main scenarios where Rescale switches to terminated

When there is a rescale in non-terminated status:

  • Upon receiving updateJobResourceRequirements, the current rescale is forcibly set to IGNORED and a new rescale is generated to record subsequent rescale information.

  • When a StateTransition jumps to the Stabilized phase and there is no difference between the new and original resource quantities, the phase re-enters the idling stage. At this point, the current rescale attempt ends because no exceptions or external condition changes caused a parallelism adjustment. Thus, the current Rescale should be set with IGNORED terminated state.

  • When invoking AdaptiveScheduler methods (goToCancelling, goToFailing, goToFinished), it indicates the entire job is preparing to terminate, so the rescale terminated state should be marked as IGNORED

  • If the job encounters a recoverable exception triggering a restart, the current rescale terminated state should be switched to IGNORED and a new rescale should be generated to record subsequent rescale information.

  • If the AdaptiveScheduler attempts to define parallelism using available slots but the number of slots is less than the sufficient slots requirement, a slot insufficiency exception is thrown. In this case, the rescale is marked as FAILED reason indicating the rescale ended due to insufficient resources.

  • When rescale succeeds (i.e., the AdaptiveScheduler state transitions to Executing), the rescale terminated state  is marked as COMPLETED indicating a successful rescale.

About rescale events storage

The remaining 3 candidates options to store the rescale events.

  • Option 1 – (Memory + HDFS) 
    • Keep the rescale events specified by  'web.adaptive-scheduler.rescale-history-max' in AdaptiveScheduler like ExceptionHistory in memory. And when a job fails to submit or reaches a terminal state, to store the most recent events in an archive json to a location HDFS where the history server can retrieve its.
    • This approach not only supports fast queries but also ensures data availability when the job terminates normally or abnormally or HA failover. And the HistoryServer could access it when it's available.
  • Option 2 – (Memory)
    • Keep the rescale events specified by  'web.adaptive-scheduler.rescale-history-max' in AdaptiveScheduler like ExceptionHistory in memory.
    • Unfortunately, this approach may lead to data loss when the job terminates normally or abnormally. However, it reduces the complexity of user configuration to some extent.
  • Option 3 - Move the logic into FLIP-360 or a new separated FLIP to implement it.
    • This is still a good choice, which can reduce the workload of the current FLIP. However, we need to consider how to show the archived rescale history that stored in the JobResultStore in FLIP-360.
    • If we adapt this option, we could keep the limited records in the memory(eg Option-2) and ignored the external storage about the records.


Rescale ID & ResourceRequirements request

If we wish to introduce a long-type global rescale ID named GlobalRescaleID and expect this GlobalRescaleID to remain continuous after failover, the anticipated complete contextual description is as follows:

  • ResourceRequirementsID: Each rescale requirement corresponds to a ResourceRequirementsID
    • A new value(UUID) will be generated for ResourceRequirementsID when the job is first started and each time an update resource requirements request is received from the REST API.
      It represents the ID of the desired resource requirement. In theory, each time updateJobResourceRequirements is called, the desired resource requirement is assigned a new value,
      so the resourceRequirementsID is also updated with a new value.
  • RescaleAttemptId: For the same rescale requirement (ResourceRequirementsID), multiple rescale attempts may occur.
    We treat each rescale under the current ResourceRequirementsID
      as a rescale attempt.
    That is, for each rescale attempt on the same rescale requirement (ResourceRequirementsID
    ), the rescale attempt id will increment by 1.
    • A rescale attempt is a rescale event. The slight difference is that its rescale attempts id represents the total number of rescale events that occurred when the JobResourceRequirements corresponding to the resourceRequirementsID were set as the desired resource requirement.
    • A failed attempt refers to a failed rescale event. It does not mean that the expected parallelism was not reached, but rather that the minimum available parallelism was not met or some other exception occurred.
      Perhaps the field here could be named RescaleAttemptId to make it more intuitive and easier to understand.
  • GlobalRescaleID
    • Similar to the checkpoint ID, it is of type long, incrementing, and can be recovered from the previous failure or state restart. It represents the ID of the rescale event. One GlobalRescaleID corresponds to one rescale event.
  • Based on these points, whenever a rescale attempt happens, the GlobalRescaleID will increment globally.
    1. For example, assume the current resourceRequirements as resourceRequirementsID1,GlobalRescaleID=1(globally), RescaleAttemptId in the resourceRequirementsID1 scope is 1,
    2. When starting a rescaling attempt at timestamp 10000 , the RescaleAttemptId will be 2, the GlobalRescaleID is 2
    3. After the successful rescale, assume there was a new rescale caused by failure at timestamp 20000, the RescaleAttemptId would be 3, and the GlobalRescaleID would be 3
    4. Then the scheduler received a new resourceRequirements as resourceRequirementsID2(new), The next rescale attempt will be triggered, the RescaleAttemptId would be 1, and the GlobalRescaleID would be 4
  • RescaleUUID
    • ​Additionally, to distinguish from the two ID combinations mentioned above and to mark the independence of each rescale operation, we introduce a ​​UUID-type descriptor​​ for each Rescale, called ​​RescaleUUID​​. This means every Rescale will use a ​​new UUID​​ as its RescaleUUID.

The diagram below illustrates the relationships and constraints between these three properties.

source fig


  • No labels