Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Flink currently allows to store state locally. However, the bookkeeping information is only kept in memory. Therefore, a process or node failure will make it impossible to recover locally. With FLIP-198, we can now store information in a process' working directory. If this directory is stored on a volume that can survive a process/node failure (e.g. persistent volume), then a Flink process can recover information across process/node failures. Therefore, we propose to persist the local state information in the working directory. Moreover, we propose to also persist the last slot allocations in the working directory so that a process can recover its latest allocation state. This will allow to couple the cleanup of the local state information together with the lifetime of a slot allocation (as it is currently the case).

Proposed Changes

In order to recover from process/node failures using a persistent volume, we need to persist the TaskStateSnapshot that is stored in the TaskLocalStateStoreImpl. For the sake of simplicity and because we don't support cross version recoveries, we propose to use Java serialization. In a future step, we should establish a proper serialization format for this data.

Additionally, we need to persist the slot allocation information so that a TaskExecutor can recover which slots have been allocated by which job after a recovery. With this information in place, we don't need to change the cleanup logic for the local state, which is currently bound to the lifetime of the slot. Hence, upon recovery a TaskExecutor will try to offer its slots to the jobs that have allocated these slots. If it cannot offer the slots, the TaskExecutor will free them and thereby also clean up any local state. Any state that belongs to an allocation that could not be recovered should also be removed in order to avoid orphaned state (also to cover the case of corrupted files).

In order to make this feature work, we may no longer free any slots if the TaskExecutor shuts down ungracefully (e.g. due to a SIGTERM signal) because otherwise we won't be able to recover. It might be required to introduce a graceful shutdown of the TaskExecutor in order to support proper cleanup of resources.

Not fully written files

We need to add a mechanism to detect not fully written local files and then remove them (e.g. due to a process crash). Functionality-wise this will mean that a TM cannot make use of its local information and will have to fall back to the globally persisted state information (remote reads for the state recovery). A simple approach could be to say that components that store information locally need to make sure that they handle reading errors by removing this information.

A not fully written file should most likely lead to deserialization problems but not always. A more robust approach would be to store for every local file a checksum file (e.g. CRC32) that can be used to check for corruptions. A leaner approach would be to store the checksums as part of the serialization format.

In the first version I would propose to rely on the deserialization logic to catch not fully written files and then to ignore this information. As a follow up this can be hardened by introducing proper checksums.

Compatibility, Deprecation, and Migration Plan

  • This FLIP should not affect any other feature since it will only be active if state.backend.local-recovery has been enabled. If this is the case, then this change will incur more writes to the working directory (upon slot allocations and checkpoints).

Test Plan

The changes need to be properly tested by unit and IT cases.

Rejected Alternatives

Not binding the local state to the slot lifecycle. This would then require another mechanism to clean up potentially orphaned local state.