Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Motivation

Apache Flink's durability story is a mystery to many users. One of the most common recurring questions from users comes from not understanding the relationship between state, state backends, and snapshots. Some of this confusion can be abated with learning material. Still, the question is so pervasive that we believe Flink's user APIs should be better communicate with what different components are responsible.

Some frequent misconceptions we havve heard over the past several years include:

1) "We use RocksDB because we don't need fault tolerance."
2) "We don't use RocksDB because we don't want to manage an external database."
3) Believing RocksDB is reading and writing directly with S3 or HDFS (vs. local disk)
4) Believing FsStateBackend spills to disk or has anything to do with the local filesystem
5) Pointing RocksDB at network-attached storage, believing that the state backend needs to be fault-tolerant

This question from the mailing list is very representative of where users are struggling [1]. Many of these questions were not from new users but from organizations that were in production! The current state backend abstraction is too complex for many of our users. What all these questions have in common is misunderstanding the relationship between how data is stored locally on TMs vs how checkpoints make that state durable.

The FLIP aims actively help users by allowing them to reason about state backends separately from checkpoint durability. In the future, a state backend only defines where and how state is stored locally on the TM while checkpoint storage defines where and how checkpoints are stored for recovery.

Naming

Flink ships with three state backends out of the box: MemoryStateBackend, FsStateBackend, and RocksDBStateBackend. MemoryStateBackend and FsStateBackend are named based on where they write out their checkpoints. However, they both use the same in-memory data structure to store state locally. RocksDBStateBackend, on the other hand, is named based on where it stores data locally while it also snapshots to a durable filesystem. There is an additional common question about how to manage RocksDB. Users tend to misunderstand that RocksDB is embedded and does not need to be managed explicitly.

Configuration

The second issue is that the StateBackend interface is overloaded with too much functionality. It currently contains four methods.

public interface StateBackend extends java.io.Serializable {

  // ------------------------------------------------------------------------
  //  Checkpoint storage - the durable persistence of checkpoint data
  // ------------------------------------------------------------------------

  CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer);

  CheckpointStorage createCheckpointStorage(JobID jobId);

  // ------------------------------------------------------------------------
  //  Structure Backends
  // ------------------------------------------------------------------------
    <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
     Environment env,
     JobID jobID,
     String operatorIdentifier,
     TypeSerializer<K> keySerializer,
     int numberOfKeyGroups,
     KeyGroupRange keyGroupRange,
     TaskKvStateRegistry kvStateRegistry,
     TtlTimeProvider ttlTimeProvider,
     MetricGroup metricGroup,
     @Nonnull Collection<KeyedStateHandle> stateHandles,
     CloseableRegistry cancelStreamRegistry) throws Exception;
 
  OperatorStateBackend createOperatorStateBackend(
     Environment env,
     String operatorIdentifier,
     @Nonnull Collection<OperatorStateHandle> stateHandles,
     CloseableRegistry cancelStreamRegistry) throws Exception;
}


As you can see by the interface's comments, implementations are responsible for two separate and unrelated tasks. Checkpoint storage and local state backends. Consider some of the ways state backends can be set in code today:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend())
env.setStateBackend(new MemoryStateBackend(1048))
env.setStateBackend(new MemoryStateBackend("s3://checkpoints", 1048))
 
env.setStateBackend(new FsStateBackend("s3://checkpoints", 1048))
 
RocksDBStateBackend rocksDB = new RocksDBStateBackend("s3://checkpoints", 1028);
rocksDB.setOptionsFactory(/** blah **/);
 
RocksDBStateBackend rocksDB = new RocksDBStateBackend(new FsStateBackend("s3://checkpoints", 1048));
rocksDB.setOptionsFactory(/** blah **/);
 
RocksDBStateBackend rocksDB = new RocksDBStateBackend(new MemoryStateBackend());
rocksDB.setOptionsFactory(/** blah **/);
 
env.setStateBackend(rocksDB);


Constructors contain arguments for both how to configure the local state backend and checkpoint storage. It is not apparent to a new user in the above example that RocksDB will store data on local disk and checkpoint to S3. Most believe RocksDB will be working with S3 directly.

Proposed Changes


Separate the concerns of local state storage from checkpoint storage in user-facing APIs and provide better names.

CheckpointStorage:


To better separate concerns, we will remove the checkpoint storage methods from the StateBackend interface and placed them into a new interface, CheckpointStorage. Flink runtime currently contains an internal interface called CheckpointStorage; it will be renamed to CheckpointStorageAccess.


/**
 * CheckpointStorage defines how checkpoint snapshots are persisted for fault tolerance.
 * Various implementations store their checkpoints in different fashions and have different requirements and
 * availability guarantees.
 *
 *<p>For example, JobManagerCheckpointStorage stores checkpoints in the memory of the JobManager.
 * It is lightweight and without additional dependencies but is not highly available
 * and only supports small state sizes. This checkpoint storage policy is convenient for
 * local testing and development.
 *
 *<p>FileSystemCheckpointStorage stores checkpoints in a filesystem. For systems like
 * HDFS, NFS Drives, S3, and GCS, this storage policy supports large state size,
 * in the magnitude of many terabytes while providing a highly available foundation
 * for stateful applications. This checkpoint storage policy is recommended for most
 * production deployments.
 */
public interface CheckpointStorage extends java.io.Serializable {

  CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer);

  CheckpointStorageAccess createCheckpointStorage(JobID jobId);
}

Flink will provide two default implementations: JobManagerCheckpointStorage and FileSystemCheckpointStorage. JobManagerCheckpointStorage and FileSystemCheckpointStorage will maintain the same functionality as the implementations of these methods in MemoryStateBackend and FsStateBackend. This means JobManagerCheckpointStorage is based on the existing MemoryBackendCheckpointStorage and FileSystemCheckpointStorage is based on the existing FsCheckpointStorage. All existing configurations will be made available via the new public apis. We will also provide a CheckpointStorageFactory to retain feature parity with existing functionality.


@PublicEvolving
public interface CheckpointStorageFactory<T extends CheckpointStorage> {

	T createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException, IOException;
}

New StateBackend User API:

To improve naming and general understanding, we will deprecate the classes MemoryStateBackend, FsStateBackend, and RocksDBStateBackend. We will instead introduce HashMapStateBackend and EmbeddedRocksDBStateBackend. To be clear, we are not changing any of the runtime data structures or characteristics; these are new user-facing API classes.

The advantage is the names now clearly define where data lives during runtime, and we can remove the API's for configuring checkpoint storage to have a clean break.


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend())
 
EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend();
rocksDB.setOptionsFactory(/** blah **/);
 
env.setStateBackend(rocksDB);
env.setDefaultSavepointDirectory("s3://savepoint");

env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("s3://checkpoints"));

// shortcut for env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("s3://checkpoints"));
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints");

The default state backend will be HashMapStateBackend and default checkpoint storage will be JobManagerCheckpointStorage. This is equivalent to the semantics of MemoryStateBackend which is the default today.

Compatibility, Deprecation, and Migration Plan


The three existing state backends - MemoryStateBackend, FsStateBackend, and RocksDBStateBackend.- will be deprecated in favor of the new classes. In their JavaDoc and the release notes, we will guide users on migrating to the new API in a compatible way. Because we are using the same internal data structures, users will be able to migrate to the new API trivially, i.e., MemoryStateBackend should be replaced with HashMapStateBackend and JobManagerCheckpointStorage.


OldNew
MemoryStateBackend()HashMapStateBackend() + JobManagerCheckpointStorage()
FsStateBackend()HashMapStateBackend() + FileSystemCheckpointStorage()
RocksDBStateBackend(new MemoryStateBackend())EmbeddedRocksDBStateBackend()  + JobManagerCheckpointStorage()
RocksDBStateBackend(new FsStateBackend())EmbeddedRocksDBStateBackend()  + FileSystemCheckpointStorage()
MemoryStateBackend("file://path")HashMapStateBackend() + JobManagerCheckpointStorage("file://path")


All flink-conf configurations will be duplicated for the new state backend or snapshot storage instance, whichever is appropriate. Again, no functionality will be added or dropped in this change. Existing flink-conf keys will also be specified as deprecated keys on the new state backends to ease migration.


In particular the following configuration keys will be added:


FileSystemCheckpointStorage:

  • state.snapshot.fs.memory-threshold
    • Deprecated key: state.backend.fs.memory-threshold
  • state.snapshot.fs.write-buffer-size
    • Deprecated key: state.backend.fs.write-buffer-size


All deprecated StateBackends (Memory, Fs, RocksDB) will implement the CheckpointStorage interface to retain full backward compatibility. When a state backend implementing this interface is used, its checkpoint storage methods will precede any other provided configuration. This way, all existing state backend implementations will continue to function as they do today, and users will not see any semantics changes.

While two methods will be removed from StateBackend, externally defined state backends will be able to migrate by merely adding `implements CheckpointStorage` to their implementations. Again, this will be documented in the release notes.


 [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-Storage-Questions-td37919.html





2 Comments

  1. env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints");
    Is this a short version for `new FileSystemCheckpointStorage("s3://checkpoints")`?