Authors: Kostas Kloudas, Aljoscha Krettek, Konstantin Knauf, Yu Li
Current state: Under Discussion
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Although savepoints and checkpoints were created with different semantics and assumptions in mind, some features added along the way blurred the lines and made these assumptions not hold in all scenarios, with potential caveats for users. In addition, some of the semantics of the two features were not clearly defined, with negative consequences for the user.
- discusses the relationship between checkpoints and savepoints,
- tries to fix their semantics under a unified perspective based on which both checkpoints and savepoints can be seen as state snapshots, and
- based on the above, it proposes some additions that aim at reducing the risk of users shooting themselves in the foot.
The following matrix shows the current state of things with the different formats and features of checkpoints and savepoints.
Yes, the feature is supported
No, the feature is not supported
Supported but not in all cases
Checkpoints were created as a system property and their lifecycle was expected to be totally under system control. Savepoints, on the other hand, are a user-facing feature and are expected to be under user control. The addition of "Retained Checkpoints" (previously also known as externalized checkpoints, with retention) blurred this separation of user/system space with negative consequences for users. One of them is that now, there may be retained incremental checkpoints that once they were used for recovery after a failure they are considered as savepoints and can never be cleaned up because Flink does not clean them up and the user cannot know if they are referenced by subsequent checkpoints.
Another shortcoming stemming from the not-so-clear semantics of checkpoints/savepoints is that given that savepoints call the same methods in the UDFs as checkpoints, they commit side-effects (e.g. when using exactly-once sinks), just as checkpoints do. Given this, and to not break the exactly-once end-to-end semantics, they are also considered as checkpoints for recovery. If this were not the case, in the scenario where a failure happens after a savepoint, the job would restart from the last successful checkpoint (which preceded the last savepoint), thus leading to duplicates in the sink. But considering savepoints as checkpoints leads to the user having no permission to delete them, at least until the next checkpoint.
The main change that this FLIP proposes is:
All state snapshots (checkpoints and savepoints) are simply SNAPSHOTS from Flink’s and they are eligible for RECOVERY.
Given the above:
- All state snapshots are considered for RECOVERY, so:
- the user cannot delete/move them on his/her own
- they all commit side-effects
- Deleting or moving a snapshot must be done by Flink
The user can also trigger user-induced snapshots, where the user can request Flink to take a snapshot through the command line. These correspond to something similar to the current savepoints, as they will not be automatically deleted by Flink, BUT:
- the user cannot delete/move them, as they are still snapshots and eligible for recovery.
- they are written in the common Snapshot directory.
The command for the user-induced snapshots will allow the user to specify the FORMAT of the snapshot. Different formats will different properties. For example, the UNIFIED format described in FLIP-41 will be able to be used across different state-backends. The default format will be the one that the user has specific for the whole job (e.g. full, incremental, etc). The formats and their different properties can be described in the documentation in a simple matrix similar to the one included above.
Deleting a SNAPSHOT (system or user-induced) can only be done through Flink. Even for user-induced snapshots, the user has to ask Flink to safely delete a snapshot. In other case, he/she risks corrupting his job's state. This calls for additional commands to be added to the CLI which can be DELETE and MOVE for now.
We can expose (through REST preferably) a higher-level, declarative API for the most common actions that use savepoints but whose goal is NOT the savepoint itself. This API will describe the desired end state of the job, without exposing what snapshot format is used underneath. As an example, the user will say "rescale to parallelism Y" and not "stop the job with a full snapshot", and "restart the job with that snapshot and parallelism Y".
Potentially expose some format transformations through an external tool, which will further increase the supported use-cases while not increasing the complexity of the code in the master.
- The complexity of explaining of savepoint/checkpoints and corner cases disappears, as we can have a matrix with all available formats and their strong points and limitations
- The user can mix-and-match snapshot formats based on his/her needs. For example he/she can stop a job with an incremental user-induced snapshot as described in FLIP-45 and use that to upgrade the cluster. This will allow to stop the job quickly (incremental snapshot) even for jobs with large state, where taking a full snapshot could be prohibitively slow.
- Evolution of savepoints/checkpoints is bound to be common for both, as most of the discussion will be around evolution of the formats, which are available to both.
Savepoints with no Side-Effects
According to this proposal:
- checkpoints and savepoints are different,
- savepoints commit no side-effects and we maintain two different, independent timelines: one for savepoints and one for checkpoints
- only checkpoints are used for recovery
- use a special savepoint that commits side-effects only during graceful job termination.
This solution was discarded because:
- Introduces too much complexity to the code, e.g. the performCheckpoint should know if it is executing a checkpoint or a savepoint or a synchronized savepoint so that it knows if it should commit side-effects. In addition, new methods should be introduced to UDFs and elsewhere to make explicit the places where side-effects are committed.
- Does not allow to have an initial version of FLIP-45, which would allow for fast shutdown of a job in the case where we simply want to restart it later with the same parallelism, e.g. framework upgrade.
- Although more subtle and maybe not important for most cases, in the scenario where savepoints are used for AB testing, we may have the following sequence of events:
- take the savepoint from job execution A
- start job execution B with the alternative job configuration with that savepoint while leaving execution A running
- after a failure, execution A falls back to the last checkpoint (which is not our savepoint)
- now due to different order of events the state reflected in the savepoint is never met again by execution A
- our two pipelines have different histories.