DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
FYI:
The original title of the FLIP was 'FLIP-495: Support AdaptiveScheduler record and query the rescale history'.
Since the design work of the query interface was separated into FLIP-487 during the discussion, we have therefore changed the title of the FLIP to:
'FLIP-495: Support AdaptiveScheduler record and store the rescale history'.
Motivation
Supporting AdaptiveScheduler record and query the rescale history (number of slots used, task parallelisms) is very useful for users.
- Facilitate users to trace the history of rescale and make rescale information more transparent
- Provide users with information on optimizing Adaptive Scheduler parameters
Note: this FLIP was split from FLIP-487, and here is some discussion history related to the current FLIP for references.
Design
Current core adaptive scheduler state transition about rescale
From the fig,
- the entire chain of rescale-related scheduler states typically involves several loops:
- WaitingForResources -> CreatingExecutionGraph -> Executing -> StopWithSavepoint (error & restartable) -> Restarting -> WaitingForResources
- CreatingExecutionGraph [-> WaitingForResources -> CreatingExecutionGraph ](optional loop ) -> Executing(rescale triggers) -> Restarting -> CreatingExecutionGraph
- a rescale event may be triggered under the following circumstances, called 'rescale triggers' here:
- updateJobResourceRequirement
- Restart due to recoverable failure (including the StopWithSavepoint recoverable failure)
- newResourceAvailable
- Init submit.
The related abstraction description
Treat the rescale change is a rescale event.
Rescale Event:
Generally, we consider the following three expected situations to trigger rescale:
- Init-Submit
- updateJobResourceRequirement
- Restart due to recoverable failure
- newResourceAvailable
Simply, Rescale is mainly composed of the following sets of states.
- WaitingForResources
- CreatingExecutionGraph
- Executing
- Restarting
Specifically, when a StopWithSavepoint fails due to a recoverable exception, the job may also undergo the rescale process. And When the job is first run, it will also go through the above core states.
Rescale event terminal state
The transition of Scheduler State and rescale TerminalState
The rescale will be goto a terminal state with the terminated reason information:
- COMPLETED
It means that the AdaptiveScheduler has successfully completed a rescale.
FAILED
It means that the current rescale event has failed due to error.
IGNORED
Assuming there is an ongoing rescale event that is not in a Terminated state, if external resources are explicitly specified, the current rescale terminated state should be forcibly set to IGNORED.
Or the Rescale event would not do the rescaling for the job finally.
The main scenarios where Rescale switches to terminated
When there is a rescale in non-terminated status :
Upon receiving updateJobResourceRequirements, the current rescale is forcibly set to IGNORED and a new rescale is generated to record subsequent rescale information.
When a StateTransition jumps to the Stabilized phase and there is no difference between the new and original resource quantities, the phase re-enters the idling stage. At this point, the current rescale attempt ends because no exceptions or external condition changes caused a parallelism adjustment. Thus, the current Rescale should be set with IGNORED terminated state.
When invoking AdaptiveScheduler methods (goToCancelling, goToFailing, goToFinished), it indicates the entire job is preparing to terminate, so the rescale terminated state should be marked as IGNORED
If the job encounters a recoverable exception triggering a restart, the current rescale terminated state should be switched to IGNORED and a new rescale should be generated to record subsequent rescale information.
If the AdaptiveScheduler attempts to define parallelism using available slots but the number of slots is less than the sufficient slots requirement, a slot insufficiency exception is thrown. In this case, the rescale is marked as FAILED reason indicating the rescale ended due to insufficient resources.
When rescale succeeds (i.e., the AdaptiveScheduler state transitions to Executing), the rescale terminated state is marked as COMPLETED indicating a successful rescale.
Rescale ID & ResourceRequirements request
- ResourceRequirementsID: Each rescale requirement corresponds to a ResourceRequirementsID.
- A new value(UUID) will be generated for ResourceRequirementsID when the job is first started and each time an update resource requirements request is received from the REST API.
It represents the ID of the desired resource requirement. In theory, each timeupdateJobResourceRequirementsis called, the desired resource requirement is assigned a new value,
so the resourceRequirementsID is also updated with a new value.
- A new value(UUID) will be generated for ResourceRequirementsID when the job is first started and each time an update resource requirements request is received from the REST API.
- RescaleAttemptId: For the same rescale requirement (ResourceRequirementsID), multiple rescale attempts may occur.
We treat each rescale under the current ResourceRequirementsID as a rescale attempt.
That is, for each rescale attempt on the same rescale requirement (ResourceRequirementsID), the rescale attempt id will increment by 1.- A rescale attempt is a rescale event. The slight difference is that its rescale attempts id represents the total number of rescale events that occurred when the JobResourceRequirements corresponding to the resourceRequirementsID were set as the desired resource requirement.
- A failed attempt refers to a failed rescale event. It does not mean that the expected parallelism was not reached, but rather that the minimum available parallelism was not met or some other exception occurred.
Perhaps the field here could be named RescaleAttemptId to make it more intuitive and easier to understand.
- Based on these points, whenever a rescale attempt happens:
- For example, assume the current resourceRequirements as resourceRequirementsID1, RescaleAttemptId in the resourceRequirementsID1 scope is 1,
- When starting a rescaling attempt at timestamp 10000 , the RescaleAttemptId will be 2
- After the successful rescale, assume there was a new rescale caused by failure at timestamp 20000, the RescaleAttemptId would be 3
- Then the scheduler received a new resourceRequirements as resourceRequirementsID2(new), The next rescale attempt will be triggered, the RescaleAttemptId would be 1
- RescaleUUID
Additionally, to distinguish from the two ID combinations mentioned above and to mark the independence of each rescale operation, we introduce a UUID-type descriptor for each Rescale, called RescaleUUID. This means every Rescale will use a new UUID as its RescaleUUID.
The diagram below illustrates the relationships and constraints between these three properties.
NOTE:
If the draft that includes the use of IDs in the description section is adopted, the pros and cons are as follows:
It makes it easier to understand certain relationships between Flink and external requests.
The current ID part is no need to consider mechanisms related to High Availability (HA) at this stage.
The information to record in a rescale event
Rescale ID information.
As mentioned in 'Rescale ID & ResourceRequirements request':
- ResourceRequirementsID
- RescaleUUID
- RescaleAttemptId
Job Vertices Rescaled Information
- Job Vertex ID
- Vertex name
- The id of the slot sharing group where the job vertex is located.
The previous parallelisms of the job vertex before the current rescale.
- The acquired parallelism of the job vertex
- The desired parallelism of the job vertex
- The sufficient parallelism of the job vertex
Slots / Resources Rescaled Information
During a complete rescale event (one rescaling cycle), there are four different resource allocation concepts:
- Current resources used before the next rescale( current slots): All resources held by the job before rescaling to support current running.
- Desired resources: The ideal resource allocation we aim for if sufficient task slots are available (essentially the upper limit of job parallelism).
- Sufficient resources: The minimum amount of resources required for the job to run (the lower bound of job parallelism, i.e., the resources required in the parallelism request).
- Acquired/Reserved resources: The actual resources used by the job after a successful rescale.
The first two define the resource configuration used for rescaling decisions.
The latter two represent the actual applied resource configuration.
It is important to note that the latter two may not necessarily match the resource configurations considered when deciding to rescale.
For example, the desired resources may have been available when rescaling was triggered, but task slots could be lost during the rescale deployment process, leading to unexpected results.
So, the part information of slots should contain the following items.
- The slot sharing group id
- The slot sharing group name
- The required resource profile of the slot sharing group
- The acquired resource profile of the slot sharing group
- The desired slot number of the slot sharing group
- The sufficient slot number of the slot sharing group
- The previous slot number of the slot sharing group
- The acquired slot number of the slot sharing group
Other information
- The terminated reason type about a rescale when it reaching a terminated state.
- The terminal state to represents the current rescale terminal state stage..
- The trigger-description to drive the rescale events.
- updateJobResourceRequirement
- Restart due to recoverable failure
- newResourceAvailable
- initial submit
- The scheduler states history information during the rescale event
- State name
- The timestamp on entering the state
- The timestamp on leaving the state
- The duration of the scheduler state
- It could be produced from end & trigger timestamp of the corresponding scheduler state.
- The error with timestamp in the state span
- The trigger timestamp of the Rescale event
- The end timestamp of the Rescale event
- The duration of the rescale event.
- It could be produced from end & trigger timestamp of the rescale event.
About rescale events storage
Introduce a parameter 'web.adaptive-scheduler.rescale-history.size' to control the number of recent rescale events to be retained.
- Memory + Disk
- Keep the rescale events specified by 'web.adaptive-scheduler.rescale-history.size' in AdaptiveScheduler like ExceptionHistory in memory and disk.
- After a job failure, a backup remains available on disk, though it may be unusable in some cases, such as when the JobManager moves after HA.
- i.e. following what's done for the ExecutionGraphInfoStore for now
Proposed Changes
Public Interfaces
How to enable the feature ?
Introduce a config option to control the max rescale size to record/hold:
- web.adaptive-scheduler.rescale-history.size
- Default value: 0
- Type: integer
- Description: The maximum number of the rescale history per job whose scheduler is AdaptiveScheduler. When the configuration value is less than 1, this feature will not be enabled.
Internal Interfaces
How to represent a rescale event?
Introduce the IdEpoch class to represent the rescale id related information
Introduce the VertexParallelismRescale class to represent a job vertex rescale information.
Introduce the SlotSharingGroupRescale class to represent a sot sharing group rescale information.
Introduce the SchedulerStateSpan class to represent a scheduler state information during a rescale.
Introduce the TriggerCause class to represent the trigger cause of a rescale.
Introduce the TerminalState class to represent the terminated state of the current rescale .
Introduce the TerminatedReason class to represent the rescale terminated reason.
Introduce the Rescale class to represent a rescale.
How to hold/maintain the rescales history ?
Introduce a RescaleTimeline at AdaptiveScheduler to hold/maintain the rescales history
Introduce a RescaleTimeline related Getter method in StateWithExecutionGraph.Context/StateWithoutExecutionGraph.Context for calling RescaleTimeline in AdaptiveScheduler states to record rescale information conveniently.
How to process rescales storage ?
Introduce rescale history related snapshot classes(like as CheckpointStatsSnapshot) in ExecutionGraphInfo . In this way, it will follow processing logic of ExecutionGraphInfo in the ExecutionGraphInfoStore for storage and visiting.
ExecutionGraphInfo could be modified to include the Rescale history (along the exception history). The data can be then saved in the ExecutionGraphInfoStore. All we have to do is to store the rescale history in AdaptiveScheduler and an object that holds the infos of the current rescale process. If this current rescale process terminates, an immutable rescale snapshot of this event is created that is saved in the rescale history.
Compatibility, Deprecation, and Migration Plan
- N.A
Test Plan
Add the corresponding test cases.
The draft PR for POC
https://github.com/RocMarshal/flink/pull/7
Rejected Alternatives
Rescale event terminal state
This Rescale status is aligned with the Adaptive Scheduler's status, making it easier to understand, which can be called SchedulerState, too.
The transition of RescaleStatus/SchedulerState and TerminalState
The transition of RescaleStatus/SchedulerState and TerminalState
Created: Initiate scheduler state when starting at first scheduling.CreatingExecutionGraph (i.e. the corresponding adaptive scheduler state during a rescale event)WaitingForResources (i.e. the corresponding adaptive scheduler state during a rescale event)Executing (i.e. the corresponding adaptive scheduler state during a rescale event)Restarting (i.e. the corresponding adaptive scheduler state during a rescale event)
The rescale will be goto a terminal state with the terminated reason information:
COMPLETEDIt means that the AdaptiveScheduler has successfully completed a rescale.
FAILEDIt means that the current rescale event has failed due to error.
IGNOREDAssuming there is an ongoing rescale event that is not in a Terminated state, if external resources are explicitly specified, the current rescale terminated state should be forcibly set to IGNORED.Or the Rescale event would not do the rescaling for the job finally.
The main scenarios where Rescale switches to terminated
When there is a rescale in non-terminated status:
Upon receiving updateJobResourceRequirements, the current rescale is forcibly set to IGNORED and a new rescale is generated to record subsequent rescale information.When a StateTransition jumps to the Stabilized phase and there is no difference between the new and original resource quantities, the phase re-enters the idling stage. At this point, the current rescale attempt ends because no exceptions or external condition changes caused a parallelism adjustment. Thus, the current Rescale should be set with IGNORED terminated state.When invoking AdaptiveScheduler methods (goToCancelling, goToFailing, goToFinished), it indicates the entire job is preparing to terminate, so the rescale terminated state should be marked as IGNOREDIf the job encounters a recoverable exception triggering a restart, the current rescale terminated state should be switched to IGNORED and a new rescale should be generated to record subsequent rescale information.If the AdaptiveScheduler attempts to define parallelism using available slots but the number of slots is less than the sufficient slots requirement, a slot insufficiency exception is thrown. In this case, the rescale is marked as FAILED reason indicating the rescale ended due to insufficient resources.When rescale succeeds (i.e., the AdaptiveScheduler state transitions to Executing), the rescale terminated state is marked as COMPLETED indicating a successful rescale.
About rescale events storage
The remaining 3 candidates options to store the rescale events.
Option 1 – (Memory + HDFS)Keep the rescale events specified by 'web.adaptive-scheduler.rescale-history-max' in AdaptiveScheduler like ExceptionHistory in memory. And when a job fails to submit or reaches a terminal state, to store the most recent events in an archive json to a location HDFS where the history server can retrieve its.This approach not only supports fast queries but also ensures data availability when the job terminates normally or abnormally or HA failover. And the HistoryServer could access it when it's available.
Option 2 – (Memory)Keep the rescale events specified by 'web.adaptive-scheduler.rescale-history-max' in AdaptiveScheduler like ExceptionHistory in memory.Unfortunately, this approach may lead to data loss when the job terminates normally or abnormally. However, it reduces the complexity of user configuration to some extent.
Option 3 - Move the logic into FLIP-360 or a new separated FLIP to implement it.This is still a good choice, which can reduce the workload of the current FLIP. However, we need to consider how to show the archived rescale history that stored in the JobResultStore in FLIP-360.If we adapt this option, we could keep the limited records in the memory(eg Option-2) and ignored the external storage about the records.
Rescale ID & ResourceRequirements request
If we wish to introduce a long-type global rescale ID named GlobalRescaleID and expect this GlobalRescaleID to remain continuous after failover, the anticipated complete contextual description is as follows:
ResourceRequirementsID: Each rescale requirement corresponds to a ResourceRequirementsID.A new value(UUID) will be generated for ResourceRequirementsID when the job is first started and each time an update resource requirements request is received from the REST API.
It represents the ID of the desired resource requirement. In theory, each timeupdateJobResourceRequirementsis called, the desired resource requirement is assigned a new value,
so the resourceRequirementsID is also updated with a new value.
RescaleAttemptId: For the same rescale requirement (ResourceRequirementsID), multiple rescale attempts may occur.
We treat each rescale under the current ResourceRequirementsID as a rescale attempt.
That is, for each rescale attempt on the same rescale requirement (ResourceRequirementsID), the rescale attempt id will increment by 1.A rescale attempt is a rescale event. The slight difference is that its rescale attempts id represents the total number of rescale events that occurred when the JobResourceRequirements corresponding to the resourceRequirementsID were set as the desired resource requirement.A failed attempt refers to a failed rescale event. It does not mean that the expected parallelism was not reached, but rather that the minimum available parallelism was not met or some other exception occurred.
Perhaps the field here could be named RescaleAttemptId to make it more intuitive and easier to understand.
GlobalRescaleIDSimilar to the checkpoint ID, it is of type long, incrementing, and can be recovered from the previous failure or state restart. It represents the ID of the rescale event. One GlobalRescaleID corresponds to one rescale event.
Based on these points, whenever a rescale attempt happens, the GlobalRescaleID will increment globally.For example, assume the current resourceRequirements as resourceRequirementsID1,GlobalRescaleID=1(globally), RescaleAttemptId in the resourceRequirementsID1 scope is 1,When starting a rescaling attempt at timestamp 10000 , the RescaleAttemptId will be 2, the GlobalRescaleID is 2After the successful rescale, assume there was a new rescale caused by failure at timestamp 20000, the RescaleAttemptId would be 3, and the GlobalRescaleID would be 3.Then the scheduler received a new resourceRequirements as resourceRequirementsID2(new), The next rescale attempt will be triggered, the RescaleAttemptId would be 1, and the GlobalRescaleID would be 4
RescaleUUIDAdditionally, to distinguish from the two ID combinations mentioned above and to mark the independence of each rescale operation, we introduce a UUID-type descriptor for each Rescale, called RescaleUUID. This means every Rescale will use a new UUID as its RescaleUUID.
The diagram below illustrates the relationships and constraints between these three properties.




