...
The SplitEnumerator
runs as a task with parallelism one. Downstream of the enumerator are the SourceReader tasks, which run in parallel. Communication goes through the regular data streams.
...
Critical parts here are the added complexity on the master (ExecutionGraph) and the checkpoints. Aligning them properly with RPC messages is possible when going through the now single threaded execution graph dispatcher thread, but to support asynchronous checkpoint writing requires more complexity.
Option 3: Introduce an independent component named SourceCoordinator, Enumerator runs on the SourceCoordinator
The SourceCoordinator
is an independent component, not a part of ExecutionGraph
. The SourceCoordinator
could run on JobMaster
or run as an independent process. There is no restrict by design. Communication with SourceCoordinator
(Enumerator
) is through RPC. Split assignment through RPC supports pull-based. SourceReader
need to register to SourceCoordinator
(address is in TaskDeploymentDescriptor
or be updated by JobMaster
through RPC) and then sends split request with payload information.
Each job has at most one SourceCoordinator
which is started by JobMaster
. There might be several Enumerators
in one job since there might be several different sources, all Enumerators
run on this SourceCoordinator
.
Split assignment need to satisfy the checkpointing mode semantics. Enumerator
has its own states (split assignment), they are a part of global checkpoint. When a new checkpoint is triggered, CheckpointCoordinator
sends barriers to SourceCoordinator
first. SourceCoordinator
snapshots states of all Enumerators
. Then SourceCoordinator
sends barriers to SourceReader
through RPC. The split and barrier through RPC is FIFO, so Flink could align the split assignment with checkpoint naturally.
If user specifies RestartAllStrategy
as the failover strategy, Flink restarts all tasks and SourceCoordinator
when a task fails. All tasks and Enumerators
are restarted and restored from last successful checkpoint.
If user specifies RestartPipelinedRegionStrategy
as failover strategy, it’s a little complicated. There is no failover region problem in this model, since there is no execution edge between Enumerator
and SourceReader
(SourceCoordinator
is not a port of ExecutionGraph
). We need to explain it separately.
When a
SourceReader
task fails,JobMaster
does not restart theSourceCoordinator
or theEnumerators
on it.JobMaster
cancels other tasks in the same failover region with failed task as usual. ThenJobMaster
notifiesEnumerator
the failure or cancelation ofSourceReader
tasks (there might be multipleSourceReader
tasks in same failover region) and which checkpoint version will be restored from. The notification happens before restarting new tasks. WhenEnumerator
is aware of the task failures, it restores the states related failed tasks from the specific checkpoint version. That meansSourceCoordinator
need to support partial restoring.Enumerator
also keeps a two-level map ofSourceReader
, checkpoint version and split assignment in memory. This map helps to find the splits should be reassigned or added back toEnumerator
. There would be different strategies to handle these failed splits. In some event-time based jobs, reassignment of failed splits to other tasks may break the watermark semantics. After restoring the split assignment state, reconstructing the map in memory and handling the failed splits,Enumerator
returns an acknowledgement back toJobMaster
, thenJobMaster
restarts the tasks of failed region. There might be an optimization thatEnumerator
returns an acknowledgement immediately without waiting for restoring. Thus the scheduling of failed region tasks and restoringEnumerator
can be processing at the same time. Another important thing is that whenEnumerator
is restoring, the other runningSourceReaders
should work normally, including pulling next split.When
Enumerator
orSourceCoordinator
fails, if there is a write-ahead log available (mentioned below),JobMaster
would restart theEnumerator
orSourceCoordinator
but not restartSourceReader
tasks. After restarting,Enumerator
restores states, replays the write-ahead log, then starts to working. At the meantime,SourceReader
waits for reconnecting, there is no more splits assigned temporarily until reregistering successfully. The reregistration is necessary. There should be alignment after replaying write-ahead log betweenEnumerator
andSourceReader
becauseEnumerator
can not make sure last split assignments to eachSourceReader
are successful or not. The reconnection information is updated byJobMaster
if needed (process is crashed). If there is no write-ahead log available, the failover would fallback to global failover, all tasks andEnumerators
would be restarted and restored from last successful checkpoint.
CheckpointCoordinator
should notify Enumerator
that checkpoint has been completed. So Enumerator
could prune the map kept in memory and the write-ahead log.
Open Questions
In both cases, the enumerator is a point of failure that requires a restart of the entire dataflow.
To circumvent that, we probably need an additional mechanism, like a write-ahead log for split assignment.
...
Comparison between Options
Criterion | Enumerate on Task | Enumerate on JobManager | Enumerate on SourceCoordinator |
---|---|---|---|
Encapsulation of Enumerator | Encapsulation in separate Task | Additional complexity in ExecutionGraph | New component SourceCoordinator |
Network Stack Changes | Significant changes. Some are more clear, like reconnecting. Some seem to break abstractions, like notifying tasks of downstream failures. | No Changes necessary | No Changes necessary |
Scheduler / Failover Region | Minor changes | No changes necessary | Minor changes |
Checkpoint alignment | No changes necessary (splits are data messages, naturally align with barriers) | Careful coordination between split assignment and checkpoint triggering. Might be simple if both actions are run in the single-threaded ExecutionGraph thread. | No changes necessary (splits are through RPC, naturally align with barriers) |
Watermarks | No changes necessary (splits are data messages, watermarks naturally flow) | Watermarks would go through ExecutionGraph | Watermarks would go through RPC |
Checkpoint State | No additional mechanism (only regular task state) | Need to add support for asynchronous non-metadata state on the JobManager / ExecutionGraph | Need to add support for asynchronous state on the SourceCoordinator |
Supporting graceful Enumerator recovery (avoid full restarts) | Network reconnects (like above), plus write-ahead of split | Tracking split assignment between checkpoints, plus | Tracking split assignment between checkpoints, plus write-ahead of split assignment between checkpoints |
Personal opinion from Stephan: If we find an elegant was way to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager.
...