Current state: ACCEPTED

Discussion thread

JIRA SAMZA-2657 - Getting issue details... STATUS



Samza currently backs up local state of jobs to compacted Kafka topics called changelogs. For jobs with large local state, the restore can be very slow and can run in the tune of multiple hours. The effect of slow restores is that a job restarting due to hardware failure, regular maintenance like patching and upgrades, or autoscaling events can cause disruptions while the state is rebuilt. 


The motivation of introducing Blob Store as backend for backup and restore in addition to Kafka changelogs is to address the following issues:

  1. Long restore times: State restore from Kafka backed changelog topic can be slow because the restore happens at "one message at a time" cadence. With Blob Store as state restore backend, it is possible to restore state store in bulk, thereby reducing the restore times from hours to the order of minutes.
  2. Automate Job and Cluster maintenance: Slow restore times can also hinders the ability to automate job and cluster maintenance. Since restoring state from the changelog topic can take a long time, automated tasks like auto-scaling job, OS upgrades, hardware upgrades etc. need to be planned more carefully and cannot be completely automated. With a fast restore from Blob Store based restore, it is possible to reduce the duration and impact of such maintenance tasks. 
  3. Message and store size: Moving to a Blob Store backed storage would allow us to write messages of arbitrary size and expand the store to any size.


We utilize some features offered by RocksDB in the proposed design to implement robust state backup and restore. It is useful to understand these concepts related to RocksDB to help understand the proposed solution better. 

RocksDB Store Composition:

RocksDB is the in-memory key-value database that powers Samza's stateful stream processing. RocksDB flushes the in-memory buffer to the disk when it gets filled. It creates 2 types of files on the host:

  1. Mutable Files: Files like OFFSET, MANIFEST, LOCK and CURRENT are mutable files created by RocksDB. These files contain metadata information about the current state of the store. Additionally, Samza also creates mutable files with additional metadata information about the state. These are: OFFSET-V2 and OFFSET.
  2. Immutable files: RocksDB creates durable copies of memtables in the form of SST files in the store directories. However, SST files are often combined together to create larger (and possibly deduplicated) files and as such can be deleted to create new SST files.

More details about the schema of metadata as well as SST files created by RocksDB can be found in this wiki: RocksDB wiki.

RocksDB Checkpoint:

RocksDB checkpoint is a feature that allows taking a snapshot of the current state of the store to a separate directory. This is used to create a partial or full backup of RocksDB state store. RocksDB creates a checkpoint by creating a hard link of the immutable SST files in the new directory and copying over mutable metadata file like MANIFEST and CURRENT. We leverage the checkpoint feature to create a cheap and fast point in time backup of the state store. The delta between this backup and the previous backup (if one exists) is then uploaded to the Blob Store. This allows us to create an incremental backup of the state store.

A detailed description of RocksDB checkpoint process can be found here: Checkpoints in RocksDB.

Proposed Changes

Design overview 

We propose a Blob Store based state backup and restore for Samza. Following diagram illustrates a high-level view of the solution:

Blob Store backed backup and restore for Samza

Note: Orange represents synchronous/blocking calls. Green represents asynchronous/non blocking operations. 

  • Step 1: A synchronous checkpoint call is made to RocksDB that creates an immutable checkpoint directory in the container filesystem (shown in step 1a). We also generate a checkpoint ID (last 6 digits of epoch timestamp in nanoseconds) and build an in-memory data structure called checkpoint to hold checkpoint ID and relevant metadata.
  • Step 2:  We write the checkpoint ID to the checkpoint directory. The checkpoint ID in checkpoint directory helps us in verifying if the local checkpoint directory is the latest checkpoint when the container restarts with host-affinity by verifying if the checkpoint ID in the checkpoint directory and in the checkpoint kafka topic are the same or not. This allows us to skip remote restoration and use the local checkpoint instead. 
  • Step 3 to step 6: We calculate the delta of this checkpoint directory from the previous checkpoint and upload this delta to the blob store in parallel to processing. Blob store returns a blob ID for each upload. We create a special blob in the blob store with these blob IDs (and other metadata) called Index blob. Schema of Index Blob ID is explained in the section Organization of snapshot in Blob Store. Notice the filesRemoved and subDirectories removed sections. They are maintained for Garbage collection and error recovery. This is explained in detail in the next section: Post-commit cleanup and Garbage Collection.
  • Step 6 and 7: We write the blob ID of the index blob to the checkpoint data structure, serialize this data structure and write the serialized checkpoint to the checkpoint topic. 
Organization of snapshot in the Blob Store

Files are stored and organized in the Blob Store as immutable blobs. Each blob is assigned a unique blob ID by the Blob Store. The way we organize a checkpoint directory is as follows:

  1. All the files corresponding to a checkpoint are uploaded in parallel to the Blob Store. Blob Store returns a blob ID associated with every file. Note that large files (multiple GBs) can be chunked and uploaded, and can have one blob ID per chunk. Thus, a file has one or more blob IDs associated with it. 
  2. We organize and index the blobs associated with a checkpoint by creating an Index blob in the Blob Store that holds file name, blob IDs and metadata of all the files associated with a checkpoint. Schema defined below shows the fields in index blob. Note: The filesRemoved section in the schema helps in garbage collection. Please see the next section on Post-commit cleanup and Garbage Collection for details.
  3. Retrieving an Index blob is necessary and sufficient to rebuild a snapshot since the Index blob contains blob ID and metadata of every file associated with a checkpoint directory.   

IndexBlob Schema
    "schemaVersion": "Version of current schema", 
    "checkpointId": "Checkpoint ID", 
    "createdTimeMs": "Created time in epoch milliseconds", 
    "jobName": "Job name", 
    "jobInstance": "Instance ID of the job", 
    "taskName": "Name of the task", 
    "storeName": "Store name", 
    "dirIndex": {
        "filesPresent": [
                "fileName": "Name of the file uploaded to Ambry", 
                "blobs": [
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
                "mtime": "Last updated time in epoch milliseconds", 
                "ctime": "Created time in epoch milliseconds",
                "sizeInBytes": "Size of the file", 
                "crc": "crc32 checksum of the file", 
                "attribs": "permission attributes of the file"
        "filesRemoved": [
                "fileName": "Name of the file to delete from Ambry", 
                "blobs": [
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
        "subDirectoriesPresent": [
                "dirIndex": {
        "subDirectoriesRemoved": [
                "dirIndex": {
  "prevSnapshotIndexBlobId": “Blob ID of previous snapshot index”
Incremental backup and restore

As described in the previous section, we can retrieve content and metadata of a snapshot using the Index blob. We leverage the Index blob to perform incremental backup and restore, rather than creating a complete checkpoint or restore on every commit. 

On every container start/restart, we first look up for the latest Checkpoint ID in the checkpoint topic. We use this checkpoint ID to get the Index Blob of the last checkpoint written to the Blob Store and cache it. 

Before uploading a checkpoint directory to the Blob Store, we retrieve the cached Index blob and deserialize the blob. This gives us the content of the previous checkpoint uploaded. We then list the files in the local checkpoint directory to get it’s content and metadata. We then compare the previous cached checkpoint and the current checkpoint. As described in the background section, we have two types of files in the checkpoint directory - mutable files and immutable files. For all the immutable files, we compare file names and verify if a) a new file has been added b) if a file previously checkpointed has been deleted. Newly added files are uploaded to the Blob Store. Deleted files are marked for deletion from the Blob Store and added to filesDeleted section of Index blob (details Post-Commit cleanup and Garbage Collection). Mutable files are always uploaded since they are typically only a few kilobytes. 

At the end, using the blob IDs of the newly uploaded and deleted files, we recreate a new Index blob, serialize it and upload it. This concludes the incremental backup.

Incremental restore is just the reverse of the incremental backup outlined above. We consider the Checkpoint topic as the source of truth, and restore the checkpoint directory target by retrieving the Index blob ID from Checkpoint obtained from Checkpoint topic. We then retrieve the Index blob from the Blob Store. Index blob contains lists called filesAdded and filesRemoved. We retrieve the filesAdded blob list from the Blob Store and add them to the checkpoint directory. Similarly, we delete the filesRemoved list from the local checkpoint directory. We also compare the same files in remote and local checkpoint directory and match their checksum, size and permission attributes to see if they are the same file. If a file’s content has changed since the remote checkpoint was created, we restore the file from the remote checkpoint and delete the local file.

Post-Commit cleanup and Garbage Collection

A commit can fail at any stage. The container may restart before a commit completes or before the post commit cleanup phase is complete. If a commit fails midway, we may have already created blobs in the blob store  and not made the checkpoint durable in the checkpoint topic. These blobs are part of a failed commit and are considered garbage that needs to be cleaned up. Additionally, if a job fails during the post commit phase, we may not have deleted the blobs corresponding to the files deleted in the new checkpoint. 

The proposed solution handles garbage collection automatically by leveraging TTL and REMOVE_TTL () functionality that we implement.

Blob Stores like Azure and AWS S3 allow every blob to have a TTL associated with it. Blobs are garbage collected at the end of the TTL. Additionally, we expose an API in BlobStoreManager interface called REMOVE_TTL to allow a blob’s TTL to be updated to never expire. A second call to REMOVE_TTL  on the same blob has no effect and will be a no-op. We use these facts to ensure garbage or untracked blobs are cleaned up in the Blob Store. 

  1. As part of the commit, when we upload a file for checkpointing, we create a blob with a TTL of 30 days. This ensures that if the commit sequence fails at any step, the blobs are automatically garbage collected at the end of TTL. 
  2. Files are not deleted immediately and are kept around until the commit is complete in case they are relevant for rollback or otherwise. Rather, any files to be deleted are added to the filesRemoved section of the Index blob schema as explained in this section
  3. Commit completes after the checkpoint ID is written to the checkpoint topic. At this point, we update all the blobs created in that commit sequence to never expire using a REMOVE_TTL() request to the Blob Store. We also send delete requests at the end of the commit phase.
  4. Whenever a job restores, we perform 2 operations as part of init operation:
    1. Send REMOVE_TTL to never expire all the blobs in the Index blob. 
    2. Send delete requests for all the blobs in the cleanup section of the Index blob. 
  5. Step 4 ensures that if a job fails in the post commit phase, we can reclaim the blobs and they are not garbage collected, and ensures that blobs to be deleted are not left behind as garbage in the Blob Store. Both the operations have no effect if they have successfully completed earlier.

Public Interfaces

BlobStoreManager Interface
 * Provides interface for common blob store operations: GET, PUT and DELETE
public interface BlobStoreManager {
   * Initialize underlying blob store client, if necessary.
  void init();

   * Non-blocking PUT call to remote blob store with supplied metadata
   * @param inputStream InputStream to read the file
   * @param metadata user supplied {@link Metadata} of the request
   * @return a future containing the blob ID of the uploaded blob if the upload is successful.
  CompletionStage<String> put(InputStream inputStream, Metadata metadata);

   * Non-blocking GET call to remote blob store
   * @param id Blob ID of the blob to get
   * @param outputStream OutputStream to write the downloaded blob
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when all the chunks are downloaded and written successfully to the OutputStream
   * @throws returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata);

   * Non-blocking call to mark a blob for deletion in the remote blob store
   * @param id Blob ID of the blob to delete
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when the blob is successfully deleted from the blob store.
   * @throws returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error. This exception is
   *         caught and ignored by the caller of the delete method during initial cleanup and SnapshotIndex read.
  CompletionStage<Void> delete(String id, Metadata metadata);

   * Non-blocking call to remove the Time-To-Live (TTL) for a blob and make it permanent.
   * @param blobId Blob ID of blob to remove TTL for.
   * @param metadata User supplied {@link Metadata} of the request
   * @return a future that completes when the TTL for the blob is removed.
   * @throws returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
  CompletionStage<Void> removeTTL(String blobId, Metadata metadata);

   * Cleanly close resources like blob store client
  void close();
 * Factory to create instance of {@link StateBackendAdmin}s that needs to be implemented for every
 * state backend
public interface BlobStoreAdminFactory {
   * Returns an instance of {@link StateBackendAdmin}
   * @param config job configuration
   * @param jobModel Job Model
  StateBackendAdmin getStateBackendAdmin(Config config, JobModel jobModel);
 * Factory to create instance of {@link BlobStoreManager}s that needs to be implemented for every state backend 
public interface BlobStoreManagerFactory {
   * Returns an instance of {@link BlobStoreManager} for backup. 
   * @param config job configuration.
   * @param backupExecutor excutor service for backup operation.
  BlobStoreManager getBackupBlobStoreManager(Config config, ExecutorService backupExecutor);
   * Returns an instance of {@link BlobStoreManager} for restore. 
   * @param config job configuration.
   * @param restoreExecutor excutor service for restore operation.
  BlobStoreManager getRestoreBlobStoreManager(Config config, ExecutorService restoreExecutor);
 * Representation of a directory in the blob store
public class DirIndex {
  public static final String ROOT_DIR_NAME = "";
  private static final short SCHEMA_VERSION = 1;

  private final String dirName;

  private final List<FileIndex> filesPresent;
  private final List<FileIndex> filesRemoved;

  // Note: subDirsPresent can also have filesRemoved and subDirsRemoved within them.
  private final List<DirIndex> subDirsPresent;
  private final List<DirIndex> subDirsRemoved;
 * Representation of a file in blob store
public class FileIndex {
  private final String fileName;
   * Chunks of file uploaded to blob store as {@link FileBlob}s
  private final List<FileBlob> fileBlobs;
   * Metadata (e.g. POSIX file attributes) associated with the file.
  private final FileMetadata fileMetadata;
   * Checksum of the file for verifying integrity.
  private final long checksum;

 * Representation of a File in a Blob store
public class FileBlob {

  private final String blobId;
   * Offset of this blob in the file. A file can be uploaded multiple chunks, and can have
   * multiple blobs associated with it. Each blob then has its own ID and an offset in the file.
  private final int offset;
 * A {@link SnapshotIndex} contains all the information necessary for recreating the local store by
 * downloading its contents from the remote blob store. The {@link SnapshotIndex} is itself serialized
 * and stored as a blob in the remote store, and its blob id tracked in the Task checkpoint.
public class SnapshotIndex {
  private static final short SCHEMA_VERSION = 1;

  private final long creationTimeMillis;
   * Metadata for a snapshot like job name, job Id, store name etc.
  private final SnapshotMetadata snapshotMetadata;
  private final DirIndex dirIndex;

   * Blob ID of previous snapshot index blob. Tracked here to be cleaned up
   * in cleanup phase of commit lifecycle.
  private final Optional<String> prevSnapshotIndexBlobId;
 * Representation of the diff between a local directory and a remote directory contents.
public class DirDiff {

  private final String dirName;

   * New files in this directory that needs to be uploaded to the blob store.
  private final List<File> filesAdded;

   * Files that have already been uploaded to the blob store in a previous snapshot and haven't changed.
  private final List<FileIndex> filesRetained;

   * Files that have already been uploaded to the blob store in a previous snapshot and need to be removed.
  private final List<FileIndex> filesRemoved;

   * Subdirectories of this directory that are not already present in the previous snapshot and all of their contents
   * need to be recursively added.
  private final List<DirDiff> subDirsAdded;

   * Subdirectories of this directory that are already present in the previous snapshot, but whose contents
   * may have changed and may need to be recursively added or removed.
  private final List<DirDiff> subDirsRetained;

   * Subdirectories that are already present in the previous snapshot, but don't exist in the local snapshot,
   * and hence all of their contents need to be recursively removed.
  private final List<DirIndex> subDirsRemoved;

Implementation and Test Plan

Phase 1
  1. Implement Blob Store Snapshot structure: Directory, File, Snapshot index schemas. 
  2. Implement diff algorithm to calculate backup and restore delta.
  3. Implement Util classes that abstracts Put/Get/Delete at File and Directory level.
Phase 2
  1. Implement backup and restore classes for Blob Store
  2. Implement and introduce metrics for backup and restore flows.
Test ScopeTest Scenarios
Flows to test
  1. Backup of a new store
  2. Restore on a new host
  3. Incremental backup/restore of a store
  4. Cleanup for a new container/job 
  5. Cleanup called after a failed container restart
  6. Cleanup called during commit sequence
  7. Partial failure of backup in different stages of commit lifecycle (commit, upload, persist, cleanup).
    1. Ensure that the partial state is cleaned up.
  8. Failure scenarios of restore in different stages of commit lifecycle (commit, upload, persist, cleanup).
    1. Ensure that a container restarting on the same host after partial failure can cleanup/recover and restore correctly (does not get stuck).
  9. Failure scenarios of blob store (missing index blobs, file blobs, retry etc.)
  10. Test cleanup/commit sequence during taskinstance init works with transactional kafka and blob store
End-to-end testing
  1. Test that checkpoint read/write version configurations are validated before job launch.
  2. Test upgrade and rollback compatibility for samza versions with and without blob store backend.
  3. How do we enforce that backup and restore managers aren't both enabled for the first deployment?

  1. init 

    1. init with checkpoint V1

    2. init with no/null checkpoint 

    3. Test init cleans up unused stores correctly.

  2. upload

    1. No previous checkpoint (first upload)

    2. Previous checkpoint passed during init (subsequent upload)

    3. Test upload handles logged / non-logged / persistent / durable stores correctly (document expectation here). 

    4. Test upload calculates diff from previous checkpoint correctly (during initial start and during post-startup commits)

    5. Test upload returns snapshot blob id and records previous snapshot blob id in the snapshot correctly.

  3. cleanup

    1. Test Cleanup removes TTL of remote snapshot and associated files

    2. Test Cleanup deletes old remote snapshot

    3. Test Cleanup deletes files/subdirs to remove from current checkpoint

    4. Test Cleanup cleans stores removed from config

    5. Cleanup failed container/job restart

  1. init

    1. Test init fails for checkpoint V1.

    2. Test init works for no/null checkpoint.

    3. Test init returns blob store backend store scms if present in checkpoint.

    4. Test util method to get snapshot indexes from checkpoint.

    5. Test container fails to start with meaningful error message if init fails.

  2. restore

    1. Test that restore restore to the correct store directory depending on store type.

    1. Test that it ignores any files that are not present when upload is called (e.g. offset files).

    1. Test restore handles logged / non-logged / durable / persistent stores correctly.

    2. Test logic for checking if checkpoint directory is identical to remote snapshot.

    3. Test restore handles stores with missing SCM in checkpoint correctly.

    4. Test restore handles multiple stores correctly.

    5. Test restore always deletes main store dir.

    6. Test restore uses previous checkpoint directory if identical to remote snapshot.

    7. Test restore restores from remote snapshot if no previous checkpoint dir.

    8. Test restore restores from remote snapshot if checkpoint dir not identical to remote snapshot.

    9. Test restore recreates subdirs correctly.

    10. Test restore recreates recursive subdirs correctly

    11. Test restore creates empty files correctly.

    12. Test restore creates empty dirs correctly.

    13. Test restore creates empty sub-dirs / recursive subdirs correctly.

    14. Test restore restores multi-part file contents completely and in correct order.

    15. Test restore verifies checksum for files restored if enabled.

  1. Test throws exception for checkpoint v1.

  2. Test no-op for null / empty checkpoint.

  3. Test works correctly for missing blob store backend factory entry.

  4. Test works correctly for missing blob store backend factory store entry.

  5. Test throws exception on sync and async blob store errors.

  6. Test gets the right blobid from remote store.

  7. Test returns the correct pair of scm and snapshot index.

  8. Test blocks once at the end for all futures instead of blocking for each store.

Concurrency and Retries
  1. Test CompletableFutureUtil methods.
  2. Test that all operations use an explicit and expected executor (no default executor).
  3. Verify future composition (allOf, toMap etc) and blocking (individual vs collected vs nonblocking) for all async methods.
  4. Verify that there is no blocking on caller threads. Document and justify exceptions (e.g. restore thread)
  5. Test BlobStoreManager Impl/BlobStoreUtil error handling and retries.
    1. Test completionexception unwrapping to identify actual cause.
    1. Test callback order (par/seq dep graph) for all chained operations.
    2. Test async retriable exceptions are transformed correctly.
    3. Test/verify put / get / delete futures always complete (handle sync / async errors correctly).
    4. Test retries for get / put create new input / output streams.
    5. Test error handling for get (sync, future, callback errors).
  6. Test TaskInstance commit flow.
    1. Test async commit stage fails if upload/checkpoint write/cleanup fails.
    1. Verify all async stage operations execute on a separate threadpool.
    2. Test async commit succeeds and ublocks future commits if all async operations succeed.
    3. Test async commit stage fails if any async operations failed.
    4. Verify async commit stage operations are chained correctly.
    5. Test exceptions in asyc commit stage are propagated to next sync commit stage.
    6. Test sync commit fails if a previous async commit fails.
    7. Test commit skips if previous async commit in progress and < max delay.
    8. Test commit blocks if previous async commit in progress and > max delay
    9. Test that sync commit times out if previous async commit does not complete within max commit delay.
  7. Test BackupManager/RestoreManager flow
    1. Test all async stage operations execute on a separate threadpool.
    2. Verify/Test error propagation, handling and operation chaining (par/seq dep graph).
    3. Verify/Test timeouts for blocking operations. Document and justify blocking operations.
    4. Test handling of retriable / ignorable (410s) / unrecoverable errors.
    5. Verify/Test idempotency of cleanup / delete / ttl operations.

Compatibility, Deprecation, and Migration Plan

Following migration plan is a guideline that can help users in migrating existing Samza jobs to Blob Store based backup and restore: 

  1. Dual-commit: The config task.backup.factories accepts a a list of factories. This can be used to enable dual commit for a subset of stores or jobs. This enables the job to perform every commit twice. One commit would be done to the Kafka changelog topic, as is done today, and the other would be done to the Blob Store based storage.
  2. Restore from the Blob Store: Once the backup to blob store is verified, the restore can also be enabled through the Blob Store. The restore target (Blob Store or Kafka changelog topic) is chosen through a config value task.restore.manager. The changes are backward compatible and if a restore fails, can be reverted back to kafka based restore by simply switching the configuration value.
  3. Disable dual-commit: After a few successful restores from Blob Store, dual-commit can be disabled and the backup and restore can be done entirely from the Blob Store. A config value task.commit.manager would enable commit and restore from the Blob Store alone. At this point, the changes are backward incompatible and a failure may require rollback. A rollback can be done by switching the config task.restore.manager and task.commit.manager.

  • No labels