You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 27 Next »

Status

Current state: Under Discussion

Discussion thread: https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/browser

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, triggering a checkpoint will cause a checkpoint barrier to flow from the sources through the DAG towards the sinks. This checkpoint barrier guarantees a consistent snapshot of the DAG at a given point in time.

For each operator with multiple input channels, an alignment phase is started whenever the checkpoint barrier is received on one channel, which causes further inputs of that channel to be blocked until the checkpoint barrier from the others channels are received.

This approach works fairly well for modest utilization but exhibits two main issues on a back-pressured pipeline:

  • Since the checkpoint barrier flows much slower through the back-pressured channels, the other channels and their upstream operators are effectively blocked during checkpointing.
  • The checkpoint barrier takes a long time to reach the sinks causing long checkpointing times. A longer checkpointing time in turn means that the checkpoint will be fairly outdated once done. Since a heavily utilized pipeline is inherently more fragile, we may run into a vicious cycle of late checkpoints, crash, recovery to a rather outdated checkpoint, more back pressure, and even later checkpoints, which would result in little to no progress in the application.


This FLIP proposes a way to perform checkpoints with a non-blocking alignment of checkpoint barriers. It provides the following benefits.

  • Upstream processes can continue to produce data, even if some operator still waits on a checkpoint barrier on a specific input channel.
  • Checkpointing times are heavily reduced across the execution graph, even for operators with a single input channel.
  • End-users will see more progress even in unstable environments as more up-to-date checkpoints will avoid too many recomputations.
  • Facilitate faster rescaling.


The key idea is to allow checkpoint barriers to be forwarded to downstream tasks before the synchronous part of the checkpointing has been conducted (see Fig. 1). To that end, we need to store in-flight data as part of the checkpoint as described in greater details in this FLIP.

Figure 1: Checkpoint barriers overtake in-flight records. Colored buffers need to be persisted together with the operator state.

Public Interfaces

  • To store in-flight data, we need to adjust the checkpoint format.

Proposed Changes

In this FLIP, we

  • Change the handling of checkpoint barriers, 
  • Propose an API for persistently storing in-flight data,
  • Sketch a first implementation that co-locates in-flight data with operator state,
  • Enhance the checkpoint format, 
  • Recover from the new checkpoint while also allowing new checkpoints to be taken during recovering to guarantee progress, and
  • Refine scaling of operators to match the changed checkpoint format.

We will describe the steps in more detail as follows.

Handling of checkpoint barriers

Whenever we receive checkpoint barriers, we forward them immediately to the output. To achieve a consistent checkpoint, we need to persist all data in the input and output buffers between the received and forwarded checkpoint barrier. In contrast to the current checkpoints, we not only “freeze” the current operator state during the synchronous checkpointing phase, but also “freeze” the channels (see Fig. 1).

For operators with one input channel, the checkpoint barrier directly overtakes all input data, triggers the synchronous parts of the checkpointing algorithm in the operator and is forwarded to the end of the output buffer. 

If an operator has multiple input channels, there are two options:

  • We start checkpointing directly when we receive the first checkpoint barrier. At that time, we may receive an arbitrarily large number of records on the other channels before seeing the respective checkpoint barriers. We can receive arbitrarily large number of records only if an upstream operator multiplies the records number (like flatMap or join). In other cases the number of records is limited by the size of Flink’s network buffers.
  • We wait until we see the last checkpoint barrier and block the other input channels. In comparison to aligned checkpoints, we will block data flow for a shorter amount of time.


Here, the tradeoff is between storage size for in-flight data and checkpoint latency and time where an input channel is blocked. The current state can even be incorporated by waiting for checkpoint barrier to arrive at the operator and inserts the checkpoint barrier before the output.

We want to implement all 3 versions to allow a fine-grain evaluation. Depending on the outcome, we will choose a specific default option and exhibit a (hidden) configuration “checkpointing policy”:

  • ALIGNED (current behavior),
  • UNALIGNED_WITH_MAX_INFLIGHT_DATA (and secondary integer option “checkpointing max inflight data”),
  • UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA.

Note, that during downscaling the size limit of “max inflight data” might be temporarily exceeded during recovery (see recovery section).

Persistently store in-flight data

To store in-flight data, this FLIP defines an API and a straight-forward implementation. We envision more sophisticated implementations in the future.

To achieve high performance, data is persisted directly by storing the containing network buffers. Records within the buffer remain serialized, so that the buffers can be directly passed to the storage service.

The API to store the buffers consists of three parts: one writing component, one reading component, and a factory component. Reading and writing will happen on the level of logical input and output, so that channels and record writers are merged in one file/state. During recover, the channels are separated again.

The writer simple has one method to append a buffer to the persisted data. It is implicitly bound to an input gate or the output of the task, such that it known to which logical input or output and to which task the data belongs.

PersistentInFlightDataStorer
interface PersistentInFlightDataStorer extends Closeable {
	/**
	 * Appends a given buffer to the persisting storage. 
	 * <p>This method may be blocking until the data is completely persisted.
	 * <p>If this method is non-blocking, implementers must make a defensive copy of the buffer.
	 */ 
	void append(Buffer buffer) throws IOException;
 
	/**
	 * Finalizes the storage for this particular input or output.
	 * <p>For non-blocking append, this method must ensure that all data has been 
	 * successfully persisted and indicate any error.
	 */
	@Override
	void close() throws IOException;
}


The corresponding reader follows the usual pattern as in AsyncDataInput.

PersistentInFlightDataRetriever
interface PersistentInFlightDataRetriever extends AvailabilityProvider {
	/**
	 * Returns the next buffer if available. This method should be non-blocking.
	 */
	Optional<Buffer> pollNext();
}


Lastly, the corresponding factory as well as all created instances are owned by a specific task and is initialized during checkpointing.

PersistentInFlightDataStorage
interface PersistentInFlightDataStorage {
	PersistentInFlightDataStorer createStorer(int gateId);
	PersistentInFlightDataRetriever createRetriever(int gateId);
}

State-based storage

A first, straight-forward solution will store all in-flight data as part of the checkpoint state, separately for each logical input/output. Buffers are directly appended onto the respective state handle with a small header that identifies the channel/record writer, so that records spanning multiple buffers can be safely restored. 

Furthermore, the state containing the persisted data of an input/output should also contain a small header. The header consists of a version number identifying the used checkpoint format. For the first version (described above), the number is ‘1’ and gives us the option for backward compatible changes. See Figure 2 for an overview over all involved headers. Note that the current buffer format including record header remains untouched and is just shown for completeness.

Figure 2: The format of the state-based storage. The state header contains the version number and is written once per
logical input / output.
Each buffer is prefixed with the channel id for restoring the original assignment to channels.

Enhancing the checkpoints of operators

Depending on the implementation of the persistent storage, operators need to store the in-flight data or pointers to it into the operator state. In an operator chain, the head operator will store the input data and the tail operator store the output data. 

The actual logic for persisting data is encapsulated in the PersistentInFlightDataStorer and PersistentInFlightDataRetriever that should receive a state handle during initialization. Future versions could store the data in external systems and simply add pointers to the external systems in the state. In the following, we assume some kind of streaming storage with offsets.

For an input channel, the relevant offset is right after the checkpoint barrier. For multiple inputs, multiple offsets need to be saved. If we decide to store keygroups individually for faster rescaling, the same offset will be stored for each key group. 

Analogously, the relevant offset to be stored for output buffers is the one before the forwarded checkpoint barrier. Similarly to input channels, we may store output offset per record writer or even key group in the future.

Recover from the checkpoint with guaranteed progress.

To recover from a checkpoint, the persistent channel needs to be consumed first before processing any additional inputs; that is, input buffers are first used to unspill data and only then can be used to process upstream data. Similarly, the output buffers are taken until the spilled data has been unspilled. Normal operation can be resumed even without all data being unspilled as long as no new input can overtake unspilled data.

To guarantee progress even during recovery, we need to implement three features:

  • Checkpoint barriers need to be processed even if the unspilling is not completed and all input buffers are taken. 
  • Ideally, data that has not been completely unspilled will not be written twice. Instead, only the offsets of the checkpointed operators are updated.
  • Data unspilling does not require the full data to be copied to the task; that is, persistent data is consumed/unspilled lazily as a stream.

Thus, in an environment with frequent crashes (under backpressure), progress can be made as soon as the operators themselves are recovered fast enough.

The first, state-based storage will not satisfy all requirements. Nevertheless, assuming that the in-flight data is rather small, we should still quickly come to the point where new output is produced.

Rescaling of operators

When rescaling operators with in-flight data, we need to be careful to satisfy the ordering guarantees. 

If data is keyed, rescaling means that certain key groups are reassigned to new task instances. These task instances need to read the checkpoint of the former owner of the key group and filter a) relevant state (as is done already) and b) filter relevant data from the persisted data. 

Since rescaling is the rarest of the processes that touch the data (normal processing > checkpointing > recovery > rescaling), we opted for a non-optimized version of rescaling. When, after rescaling, two operator instances need to load data from the same state handle, they need to deserialize all records, apply the key selector, and receive the keygroup index to filter relevant records. We expect rather small amount of data to be processed multiple times and think that storing the keygroup index inside the data would impact performance for small records.

For non-keyed data, rescaling semantics is unfortunately a bit fuzzy. For this FLIP, we assume that no data of a given input split can overtake prior data in processing on forward channels. Any fan out or reshuffling will already destroy that ordering guarantee, so we can disregard these cases in this FLIP. If we focus on forward channels, however, we quickly run into situations where the ordering is violated (see Fig. 3).

Figure 3: Violating ordering guarantees while rescaling of non-keyed data. Split S1 is moved from source instance I to instance II.
If the original operators are back-pressured (⏮), data from the new source instance can overtake the original data.


To solve these issues, we have three solutions:

  • Unaligned checkpoints on non-keyed, forwarded operators are disabled by default. If we do not have in-flight data, we arrive at the status quo.
  • We add an explicit toggle to force unaligned checkpoints on non-keyed operators for power users. That toggle enables unaligned checkpoints when ordering is not important or no rescaling from a checkpoint will occur.
  • We devise and implement FLIP-? (light-weight partition reassignment), which will provide a very fine-grain mechanism to transfer the ownership of input splits/key groups. Based on this mechanism, we can fully support unaligned checkpoints on non-keyed operators.


Problem dimensions: keyed/non-keyed, scale-in/out, upstream/downstream

Possible issues: multi-buffer records, multi-record buffers, ordering (new/old data), pressure

General algorithm:

  1. find relevant state
    1. what are the requirements? can we use any distribution as long as we can satisfy (3) (find correct channels)?
  2. filter out records/buffers
    1. after scaling out, state file can contain irrelevant records
  3. load data into the correct channels (IncputChannel/SubPartition)
    1. this should resolve MBR issues
    2. solution: task IDs?
  4. ensure ordering
    1. solution: epochs?

Open questions

Avoiding double processing in downstream (if continuous spilling)

With continuous spilling, upstream saves all its output which includes what was already processed/buffered by the downstream.

Possible solutions:

  1. downstream loads upstream output buffers file and filters out what it has already processed; files are named deterministically and indexed
  2. same, but offset in the file is calculated based on buffer size
  3. downstream communicates its offset to upstream during snapshotting
  4. downstream communicates its offset to upstream during recovery

Resolved questions

Retaining subtask indices between restarts

This is needed to maintain correspondence between upstream output buffers and downstream input buffers.

Flink always uses zero-based indexes to:
1. pick subpartition to consume and bind it to input channel (see TaskDeploymentDescriptorFactory#createInputGateDeploymentDescriptors)
2. pick subpartition to output (ChannelSelector)
3. distribute key groups (see KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup, StateAssignmentOperation#assignStates, OperatorInstanceID)
and so on...

As long as job isn't rescaled, these indices are the same.

Handling records spanning multiple buffers

A single record can end up in different tasks while checkpointing.

Therefore, 

without rescaling

  1. with fan-in topology, downstream needs to match incomplete record with the upstream; but this should be handled by having separate input channels already
  2. output buffers need to be sent to the right channel - we rely on stable channels ids, so this is NOT a problem

with rescaling

  1. when scaling-in upstream,  buffers of incomplete records need to be matched
  2. when scaling-in downstream, there can be multiple incomplete records in input buffers (it can be a single input channel after recovery)

There are two options:

  1. recover from one logical channel atomically (output of upstream, input of downstream) and always on one (downstream?) side. Output of upstream should be blocked during recovery (my idea was negative credit while upstream holds more buffers than it should, which would be taken from upstream buffer pool)
  2. join records on-the-fly (sort by "channel-id" on sender and receiver; send a marker after all saved buffers of the channel were sent; this will allow downstream to switch to the next upstream (using credit-based flow control))

Decisions made

Ad-hoc vs continuous spilling

Ad-hoc POC showed much better efficiency so discard continuous spilling for now (2020-02-19).

Re-scaling

The first version can be without re-scaling support, at least for non-keyed exchanges (sources and non-sources).

However, we need to have a vision of how to implement it in the future.

(2020-02-19)

Whether to process in and out buffers of a single channel in a single task (downstream?) or not (upstream + downstream)

Single-channel pros:

  1. solves the problem of multi-buffer records (question)
  2. solves the problem of double-processing with cont. spilling (question)
  3. ...

Single-channel cons:

  1. tight coupling between upstream and downstream (naming should be consistent across all nodes)
  2. doesn't work with existing persistence mechanisms (question)

Tentatively chosen not to read upstream data in downstream because of efficiency (ready many small files) and easier implementation (2020-02-19).

Existing persistence mechanisms vs custom

Depends on:

  1. ad-hoc or cont. spilling (cont. spilling can require special handling because of higher volumes)
  2. single-task channel processing (isn't compatible with current implementation)
  3. re-scaling support (now or in future)

(assuming non-continuous spilling)

Conceptually, both channel state and operator state are part of a checkpoint.
So they must have the same: a) distribution, b) durability guarantees (and maybe other properties).

So it looks reasonable to reuse the existing mechanism of writing checkpointing data to a remote FS.
(CheckpointStreamWithResultProvider, CheckpointStreamFactory, etc.)

The returned handles need to be included in the task snapshot.

The disadvantage is that, if we don't rely on redistribution mechanisms for some reason (e.g. efficiency); it could be harder to implement the lookup. But this scenario seems to be unlikely.

(2020-02-20)

Compatibility, Deprecation, and Migration Plan

  • Make behavior optional. Disabled in first release.
  • Make behavior default in subsequent releases.
  • In documentation, clearly state that users of the checkpoint API can only use unaligned checkpoints if they do not require a consistent state across all operators.

Known Limitations

  • State size increase
    • Up to a couple of GB per task (especially painful on IO bound clusters)
    • Depends on the actual policy used (probably UNALIGNED_WITH_MAX_INFLIGHT_DATA  is the more plausible default)
  • Longer and heavier recovery depending on the increased state size
    • Can potentially trigger death spiral after a first failure
    • More up-to-date checkpoints will most likely still be an improvement about the current checkpoint behavior during backpressure

Test Plan

  • Correctness tests with induced failures
  • Compare checkpoints times under backpressure with current state
  • Compare throughput under backpressure with current state
  • Compare progress under backpressure with frequently induced failures
  • Compare throughput with no checkpointing

Rejected Alternatives

There were several alternatives for specific design decisions:

Support of rescaling in non-keyed operators:

  • No guarantees for non-keyed data (Kafka way) was rejected because we knew some users (currently) depend on these guarantees.
  • Explicitly stating partitioning by users (reinterpret stream) would be a possible alternative for special cases where the current proposal fails.
  • No scaling for non-keyed data would make the work stealing of FLIP-27 hard to implement beyond the source.

For the first implementation of the persistence layer of in-flight data:

  • Writing onto DFS/S3 would introduce a new system dependency.
  • Implementing DFS on TaskManagers would require too much effort for now.
  • Using book keeper seems promising but would also add a new system dependency.







  • No labels