Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This proposal is a follow up of FLIP-168: Speculative Execution For Batch Job. It aims to enable speculative execution to work with most sources, without changing the source connectors.
There are three types of sources: user defined source function, InputFormat source, new source introduced in FLIP-27. For user defined source functions, source speculative execution is supported without any additional work. So this proposal focused on how to support speculative Execution of the two other two sources, namely, InputFormat source and new sources.
Supports Execution Attempt SourceEvent
SourceEvent is event passed between the SourceReaders and Enumerators. It is only required by some sources which have a custom event protocol between reader and enumerator. Among all Flink built-in connectors, only hybrid source would use SourceEvent.
If speculative execution of sources is enabled, one subtask/ExecutionVertex may have multiple execution attempts running at the same time. Therefore, the SplitEnumerator must be aware of the execution attempt that it receives/sends a SourceEvent. To support this, we introduce following interfaces:
Note: you don't need to care these new interfaces if your sources does not use SourceEvent.
Expose Execution Attempt ReaderInfo
Currently, the SplitEnumerator gets ReaderInfo of subtasks from SplitEnumeratorContext. However, in speculative execution scenario, where one subtask can have multiple concurrent execution attempts, users may need to see the reader info of all the attempts, to know the reader's locations to better make decisions for split assignments. To support this, we introduce interface method below:
The existing #registeredReaders() now returns the reader info of the earliest active attempt of each subtask.
InputFormat sources support speculative execution
Currently, after a source execution finishes to read assigned splits, it sends split requests to JobMaster. JobMaster calls ExecutionVertex#getNextInputSplit(host) to request a new split from ExecutionJobVertex#InputSplitAssigner.
A Speculative ExecutionVertex can have multiple executions running at the same time. In order to support speculative execution of InputFormat source, we must ensure that all executions under an ExecutionVertex process the same InputSplits.
To achieve this goal, SpeculativeExecutionVertex stores all assigned splits into a list. When handling a source request, SpeculativeExecutionVertex first tries to fetch an unprocessed split from this list. If and only if all splits in the list have been consumed by this execution, Speculative ExecutionVertex would request a new InputSplit from InputSplitAssigner and store it into the list.
New sources support speculative execution
In order to support speculative execution of new Sources introduced in FLIP-27, we need to ensure that all executions under an ExecutionVertex process the same InputSplits.
To achieve this goal, we need to do following work:
- Refactor OperatorCoordinator to be aware of different execution attempts of a subtask.
Improve SourceCoordinator and SourceCoordinatorContext to correctly handle operator events, manage source splits and reacts to execution attempt state changes.
- Enable SourceCoordinator and SourceCoordinatorContext to know whether it should support multiple concurrent execution attempts(i.e. whether speculative execution is enabled).
Before introducing speculative execution, the OperatorCoordinator did not need to know which execution sent the event when it handled an OperatorEvent from SourceOperator because an ExecutionVertex could only have one Execution running at the same time.
After introducing speculative execution, a speculative ExecutionVertex can have multiple current executions running at the same time. The OperatorCoordinator has to distinguish which execution sent the event. For example, the source coordinator needs to know which execution sends the split request. Therefore, we add an attemptNumber param to #handleEventFromOperator.
The OperatorCoordinator also needs to be aware of the state of each attempt, to properly initialize or clean up things. Therefore, existing #subtaskFailed(...) and #subtaskReady(...) methods are refactored to be #executionAttemptFailed(...) and #executionAttemptReady(...).
Improve SourceCoordinator and SourceCoordinatorContext
Currently, when the source coordinator receives a split request from source operator, it requests a SourceSplit from SplitEnumerator. After the splitEnumerator creates a new split, SplitEnumerator stores the split assignment in SourceCoordinatorContext, SourceCoordinatorContexts send AddSplitEvent as response to SourceOperator.
To support speculative execution and ensure all executions of a subtask to process the same SourceSplits, we now cache the assigned splits in SplitAssignmentTracker and record whether the subtask has no more splits.
- When a new SplitAssignment is signaled by the SplitEnumerator, the new splits of a subtask will be sent to all its current registered readers
- When a new reader is registered, all related cached splits will be send to the newly registered reader. If the subtask has no more splits, a noMoreSplits event will also be sent to the reader.
Handling Execution Attempt Operator Event
SourceCoordinator now handles operator event with awareness of the attempt which sends the event, below are the changes to make:
How to handle RequestSplitEvent
This logic can remain as is.
How to handle SourceEventWrapper
If the SourceCoordinator does not need to support concurrent execution attempts(i.e. speculative execution is not enabled), forward the event to SplitEnumerator#handleSourceEvent(int, SourceEvent),
else if enumerator is a SupportsHandleExecutionAttemptSourceEvent, SpeculativeSourceCoordinator would call SupportsHandleExecutionAttemptSourceEvent#handleSourceEvent(int, int, SourceEvent),
else it means the enumerator does not support handle SourceEvent in execution level, throws an exception.
How to handle ReaderRegistrationEvent
A ReaderInfo should be created for the attempt's reader and be registered to the SourceCoordinatorContext. If there are cached splits, the splits will be sent to the newly registered reader. If the subtask has no more splits, a noMoreSplits event will also be sent to the reader.
How to handle ReportedWatermarkEvent
ReportedWatermarkEvent is introduced in 1.15 which is used to support watermark alignment in streaming mode.
Speculative execution is only enabled in batch mode.
Therefore, SpeculativeSourceCoordinator would thrown an exception if receive a ReportedWatermarkEvent.
Reacting to Execution Attempt State Changes
When an execution attempt becomes ready, the SourceCoordinator will be notified and a SubtaskGateway will be setup for it in the SourceCoordinatorContext.
Note that this method will be invoked in two case:
- when an ExecutionVertex is newly created or reset, because its original attempt will be created at this time.
- when a speculative execution attempt is created. The SpeculativeScheduler is responsible to trigger this.
When an execution attempt fails, the SourceCoordinator will be notified and the corresponding SubtaskGateway will be removed.
Whether to support concurrent execution attempts
SourceCoordinatorContext needs to know whether concurrent execution attempts should be supported(i.e. whether speculative execution is enabled). So that itself and the SourceCoordinator can choose the proper logic to execute based on it.
SourceCoordinatorProvider is the provider of SourceCoordinator. It has to know whether concurrent execution attempts should be supported and pass this information to the SourceCoordinatorContext it creates.
OperatorCoordinatorHolder also needs to be reworked to create a CoordinatorContext instance with isSpeculationEnabled flag.
SpeculativeExecutionJobVertex which is introduced in FLIP-168: Speculative Execution For Batch Job would create an OperatorCoordinatorHolder instance with true isSpeculationEnabled flag.
Fault-tolerant processing flow
After a source operator becomes ready to receive events, subtaskAttemptReady is called, SpeculativeSourceCoordinator would store the mapping of SubtaskGateway to execution attempt in SpeculativeSourceCoordinatorContext.
Then source operator registers the reader to the coordinator, SpeculativeSourceCoordinator would store the mapping of source reader to execution attempt in SpeculativeSourceCoordinatorContext.
If the execution goes through a failover, subtaskAttemptFailed is called, SpeculativeSourceCoordinator would clear information about this execution, including source readers and SubtaskGateway.
If all the current executions of the execution vertex are failed, subtaskReset would be called, SpeculativeSourceCoordinator would clear all information about this executions and adding splits back to the split enumerator of source.
The methods subtaskReady/subtaskFailed should be renamed to SubtaskAttemptReady/subtaskAttemptFailed to perform Execution level while subtaskReset still perform ExecutionVertex level.
No support for HybridSource
HybridSource is mainly used in streaming jobs. However, Speculative Execution only works with batch jobs. So it’s not a big problem to not support HybridSource.
User defined source functions must not be affected by its speculative instances
As mentioned in the limitation section in FLIP-168, user defined source functions must not be affected by its speculative instances, or vice versa. Otherwise, speculative execution should not be enabled.
Compatibility, Deprecation, and Migration Plan
Speculative execution of sources are enabled when speculative execution is enabled, which entails that Flink's default behavior won't change.
- The changes will be covered by unit and IT cases.
- Test the functionality in a real Standanlone/Yarn/Kubernetes cluster.
No rejected alternatives yet.