...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
interface SplitReader<RecordsT> extends Closeable { @Nullable RecordsT fetchNextRecords(Duration timeout) throws IOException; void wakeup(); } |
...
- Pipelined persistent channels: The contents of a channel is persistent between checkpoints. A receiving task requests the data "after checkpoint X". The data is pruned when checkpoint X+1 is completed.
When a reader fails, the recovered reader task can reconnect to the stream after the checkpoint and will get the previously assigned splits. Batch is a special case, if there are no checkpoints, then the channel holds all data since the beginning.- Pro: The "pipelined persistent channel" has also applications beyond the enumerator to reader connection.
- Con: Splits always go to the same reader and cannot be distributed across multiple readers upon recovery. Especially for batch programs, this may create bad stragglers during recovery.
- Reconnects and task notifications on failures:The enumerator task needs to remember the splits assigned to each result partition until the next checkpoint completes. The enumerator task would have to be notified of the failure of a downstream task and add the splits back to the enumerator. Recovered reader tasks would simply reconnect and get a new stream.
- Pro: Re-distribution of splits across all readers upon failure/recovery (no stragglers).
- Con: Breaks abstraction that separates task and network stack.
Option 2: Enumerator on the JobManager: Enumerator on the JobManager
Similar to the current batch (DataSet) input spit assigner, the SplitEnumerator
code runs in the JobManager, as part of an ExecutionJobVertex
. To support periodic split discovery, the enumerator has to be periodically called from an additional thread.
The readers request splits via an RPC message and the enumerator responds via RPC. RPC messages carry payload for information like location.
Extra care needs to me taken to align the split assignment messages with checkpoint barriers. If we start to support metadata-based watermarks (to handle event time consistently when dealing with collections of bounded splits), we need to support that as well through RPC and align it with the input split assignment.
The enumerator creates its own piece of checkpoint state when a checkpoint is triggered.
Critical parts here are the added complexity on the master (ExecutionGraph) and the checkpoints. Aligning them properly with RPC messages is possible when going through the now single threaded execution graph dispatcher thread, but to support asynchronous checkpoint writing requires more complexity.(TBD. explain more)
Open Questions
In both cases, the enumerator is a point of failure that requires a restart of the entire dataflow.
To circumvent that, we probably need an additional mechanism, like a write-ahead log for split assignment.
...
Criterion | Enumerate on Task | Enumerate on JobManager |
---|---|---|
Encapsulation of Enumerator | Encapsulation in separate Task | Additional complexity in ExecutionGraph |
Network Stack Changes | Significant changes. Some are more clear, like reconnecting. Some seem to break abstractions, like notifying tasks of downstream failures. | No Changes necessary |
Scheduler / Failover Region | Minor changes | No changes necessary |
Checkpoint alignment | No changes necessary (splits are data messages, naturally align with barriers) | Careful coordination between split assignment and checkpoint triggering. Might be simple if both actions are run in the single-threaded ExecutionGraph thread. |
Watermarks | No changes necessary (splits are data messages, watermarks naturally flow) | Watermarks would go through ExecutionGraph |
Checkpoint State | No additional mechanism (only regular task state) | Need to add support for monasynchronous non-metadata state on the JobManager / ExecutionGraph |
Supporting graceful Enumerator recovery (avoid full restarts) | Network reconnects (like above), plus write-ahead of split | Tracking split assignment between checkpoints, plus |
Personal opinion from Stephan: If we find an elegant was to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager.
Core Public Interfaces
Source
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable { /** * Checks whether the source supports the given boundedness. * * <p>Some sources might only support either continuous unbounded streams, or * bounded streams. */ boolean supportsBoundedness(Boundedness boundedness); /** * Creates a new reader to read data from the spits it gets assigned. * The reader starts fresh and does not have any state to resume. */ SourceReader<T, SplitT> createReader(SourceContext ctx) throws IOException; /** * Creates a new SplitEnumerator for this source, starting a new input. */ SplitEnumerator<SplitT, EnumChkT> createEnumerator(Boundedness mode) throws IOException; /** * Restores an enumerator from a checkpoint. */ SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(Boundedness mode, EnumChkT checkpoint) throws IOException; // ------------------------------------------------------------------------ // serializers for the metadata // ------------------------------------------------------------------------ /** * Creates a serializer for the input splits. Splits are serialized when sending them * from enumerator to reader, and when checkpointing the reader's current state. */ SimpleVersionedSerializer<SplitT> getSplitSerializer(); /** * Creates the serializer for the {@link SplitEnumerator} checkpoint. * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()} * method. */ SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer(); } /** * The boundedness of the source: "bounded" for the currently available data (batch style), * "continuous unbounded" for a continuous streaming style source. */ public enum Boundedness { /** * A bounded source processes the data that is currently available and will end after that. * * <p>When a source produces a bounded stream, the runtime may activate additional optimizations * that are suitable only for bounded input. Incorrectly producing unbounded data when the source * is set to produce a bounded stream will often result in programs that do not output any results * and may eventually fail due to runtime errors (out of memory or storage). */ BOUNDED, /** * A continuous unbounded source continuously processes all data as it comes. * * <p>The source may run forever (until the program is terminated) or might actually end at some point, * based on some source-specific conditions. Because that is not transparent to the runtime, * the runtime will use an execution mode for continuous unbounded streams whenever this mode * is chosen. */ CONTINUOUS_UNBOUNDED } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public interface SplitEnumerator<SplitT, CheckpointT> extends Closeable { /** * Returns true when the input is bounded and no more splits are available. * False means that the definite end of input has been reached, and is only possible * in bounded sources. */ boolean isEndOfInput(); /** * Returns the next split, if it is available. If nothing is currently available, this returns * an empty Optional. * More may be available later, if the {@link #isEndOfInput()} is false. */ Optional<SplitT> nextSplit(ReaderLocation reader); /** * Adds splits back to the enumerator. This happens when a reader failed and restarted, * and the splits assigned to that reader since the last checkpoint need to be made * available again. */ void addSplitsBack(List<SplitT> splits); /** * Checkpoints the state of this split enumerator. */ CheckpointT snapshotState(); /** * Called to close the enumerator, in case it holds on to any resources, like threads or * network connections. */ @Override void close() throws IOException; } public interface PeriodicSplitEnumerator<SplitT, CheckpointT> extends SplitEnumerator<SplitT, CheckpointT> { /** * Called periodically to discover further splits. * * @return Returns true if further splits were discovered, false if not. */ boolean discoverMoreSplits() throws IOException; /** * Continuous enumeration is only applicable to unbounded sources. */ default boolean isEndOfInput() { return false; } } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamExecutionEnvironment { ... public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...} public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...} public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...} public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...} ... } |
...