Discussion thread: <link to mailing list DISCUSS thread>




Recovery time for jobs with large state increases significantly in host-failure scenarios.
This is problematic because of two reasons:

a) causes unavailability -- incase of jobs that use Samza for network-monitoring, this implies periods of vulnerability,
b) causes backlog buildup in case of jobs with high input QPS, requiring scaleup (then scale-down) or causes increased catch-up time


JIRA-2018 has brought state-restore improvements to Samza, cutting time to restore state from kafka change log by 50-65%.
See JIRA-2018 for details.
However, even with this change state-restore times for stateful jobs is relatively high (e.g., 20 minutes for a 50 GB store). 
The motivation for this SEP is to create and leverage backup containers in Samza to create hot replicas of active-containers' state. 

This will have two benefits: 
a. Reduction in restore time for stateful job. 
b. Restore time will become independent of the state-size of the application.

Proposed Changes

Proposed changes are structured into four main parts:

1. Implementing Hot-standby tasks and containers,  

2. Allocation of active and standby containers in Samza-YARN,

3. Implementing orchestration for failover for Samza-YARN,

4. Implementing allocation and orchestration for failover for Standalone.

1. Implement Hot-standby tasks

We propose enriching Samza to assign each TaskInstance a role – active or State-Standby. 
tasks are as they exist today. 

State-Standby tasks function to simply continually consume state from Kafka changelog and restore it locally on disk.

NOTE: Other desirable roles can also be added for tasks, and this capability can be used for other Samza enhancements. 

2. Allocation of active and standby containers in Samza-YARN
We propose the following:

Enrich the JobModel to include both active and State-Standby tasks. 
Enrich the locality mappings to include task to host mappings, and deprecate the existing container to host mapping.
In the absence of dynamic-work-assignment and auto-sizing in YARN, containers can only have either all-active or all-standby tasks, more on this below. 

We add a new StandbyAwareContainerAllocator in YARN to maintain the following invariants, when allocating tasks (and hence containers to hosts)

Invariant-1: An active and its corresponding standby task/container should not be on the same host.

Invariant-2: An active task/container should be started only after its corresponding standby has been successfully stopped.

The standby containers are assigned YARN containerIDs just like active containers. The hot-standby containers work with and benefit with host-affinity just

like active containers. So, an application’s lifecycle is tied into both the active and standby containers, i.e., a job is considered deployed only when all its

containers have been successfully deployed. Failures of either type of container carry equal impact to the job’s lifecycle. 

The JobModelManager which reads the container.count from config and creates the JobModel, can then read the flag, based on which will

  1. Generate ContainerModels for the standby containers, and,

  2. populate the activeContainerlocalityMapping and the standbyContainerLocalityMapping in the JobModel.

Note that, the user could specify to use more than one standby tasks for each active task using the job.hotstandby.replicationcount config, which the JobModelManager will 

factor-in when computing the JobModel.

An existing job that adopts this feature, will have preferred-hosts for its existing stateful active containers. Such jobs, will not however have a preferred-host for

standby containers on the very first deploy.

Allocation proceeds as follows:

  1. The ContainerProcessManager issues allocation requests for all active and standby containers, using the existing preferredHost-over-anyHost semantics.

  2. The ContainerAllocator is responsible for assigning and starting actual containers on the  allocated containers returned by YARN’s RM.
    When trying to match a container resource request to an allocated resource, the
    ContainerAllocator, ensures that corresponding active and standby containers are not on the same host (we call this the same-host-check)

    Invariant: An active and its corresponding standby task/container should not be on the same host.

Note that, on the first-deploy the standby containers will not have any PreferredHosts in the locality mapping and hence all requests for them will be AnyHost requests.

3. Implementing orchestration for failover for Samza-YARN

The ContainerProcessManager is notified of container failure using a onContainerStopper(containerID) callback.

There are two distinct failure scenarios -- a) failure of an active container or, b) failure of a standby container.  

The ContainerProcessManager can differentiate these scenarios using the JobModel.

  1. Failure of an active container

The following steps detail the AM flow in this scenario. See Figure for timing diagrams.

  1. The ContainerProcessManager will request a container allocation on the same host on which the active container was last seen. If the same host is granted, container will be re-spawned -- problem solved, return.

If a container on the same host is not granted because the host is either out of capacity or unreachable due to a network partition or dead. In this case the resource request will “expire.”

  1. The ContainerProcessManager will then use the localityMappings to determine the standby host (the host of the corresponding standby container).

  2. It will delete the localityMapping of the standby container, by writing to the metadataStore
    (using localityManager.writeMapping).

  3. It will update the localityMapping of the active container to point to the standby host.

  4. It will stop the standby container using nmClientAsync.stopContainerAsync

    1. This is an async-call.

    2. This is required because standby and active containers cannot be started simultaneously on the same host (since only one of them can lock the statedir).

  5. The ContainerProcessManager will then make a PreferredHost request to place the active container on the standby host.

    1. This is an async-call.

  6. Once the ContainerProcessManager receives callbacks signalling the completion of both step 5 and step 6, it will then make a nmClientAsync.startContainer to start the active container on the standby host. Figures below show the detailed flow.
    To ensure Step 5 has completed, the ContainerAllocator/ContainerProcessManager will keep track of all container-stop requests it has issued and their progress.

  7. The ContainerProcessManager will get a callback onContainerStopper for the standby container, because of step 5, it will not find a hostMapping for the standby container, in which case it will make a AnyHost request to place the standby container on any host.

Invariant: An active container should be started only after its corresponding backup has been stopped.

B. Failure of a standby container 

The ContainerProcessManager will request a container allocation on the same host on which the standby container was last seen.
If a container on the same host is
not granted, the ContainerProcessManager will make an AnyHost request, once that is granted,

the standby container will be started on the host if it passes Invariant1 above.

Additional metrics: There are an additional set of metrics to be added for standby containers. These include:

  1. #Active and #standby container failures

  2. #Active to standby container failovers

#Occurrences where an active container could not be failed to a standby.

4. Implementing allocation and orchestration for failover for Standalone

In Standalone, both active and standby tasks are evenly distributed amongst StreamProcessors at initialization time, while maintaining Invariant-1. 
This means unlike YARN, StreamProcessors have a mix of active and standby tasks. 

During failover, the leader re-evaluates the StreamProcessor-task assignment, such that, 

a. The tasks of the failed StreamProcessors are now allocated to the StreamProcessors which have the corresponding backup tasks, and 

b. The task-load is evenly distributed amongst the surviving StreamProcessors to the extent possible, while minimizing (ideally 0) the number of tasks that are not started on their backups. 

These changes will be incorporated in the ZkBasedJobCoordinator.

Therefore, the onus of sizing the resource requirements of the StreamProcessors in Standalone is on the job owners – they have to account for StreamProcessor failures 

when allocating memory and CPU at deployment time. 

Can the Samza-YARN failover behaviour be such that surviving containers have an even distribution of of active and standby tasks?
In theory yes, in practice, no because of the following limitations: 

  1. Adding/removing/activating tasks on a container requires a container restart. This means on failover, maintaining the even 
    distribution requires a restart of all container – unavailability. 
  2. Containers in YARN can only be statically sized. Because containers can be assigned additional tasks (active or standby) on a host failure, 
    this means all containers need to be over-sized.

Public Interfaces

This feature brings the following additional configs: 

job.hotstandby.enabled = true/false.

job.hotstandby.replicationcount = int

Implementation and Test Plan

  1. JobCoordinator modifications

    1. Add task-host mappings to coordstream/metadata

    2. Deprecate existing container-host mapping

    3. JobModel changes to populate/handle inactive tasks

  2. Inactive task implementation

    1. Add a task-role to the job model

    2. Add handling for creation of “active” and “inactive” tasks in Samza Container

    3. Formulate and add liveness metrics for inactive tasks

  3. Container Management - YARN

    1. Orchestration for handling active-container failover

    2. Orchestration for handling standby-container failover

    3. Formulate and add metrics to monitor failover behaviour

  4. StreamProcessor Management - Standalone

    1. Orchestration for task redistribution on failover

Stretch Plan

5. Dynamic work assignment

6. Enhancing 1c to enable active containers to mix-and-match active and standby tasks in YARN.

7. Enhance 3 to perform failover without stop-start in YARN.

Compatibility, Deprecation, and Migration Plan

The only deprecation this design brings it to deprecate the container-host locality mappings in the coordinator stream/metastore, and instead 

rely on task-container, and task-host mappings. 

Rejected Alternatives

Implementing standby containers as a specialized JVM 

In this option, the ContainerRunner for the hot-standby container does not create a SamzaContainer object. It simply runs a JVM process with a RestoreStorageManager (an enhanced TaskStorageManager), that created the required SysConsumer objects and reads from the continuously to perform putAll on store objects.

Pros: Lightweight as compared to a. -- hot-standby containers have less idle overhead than active containers.


No extensibility of this abstraction.

ContainerRunner needs to change to launch a SamzaContainer or a RestoreStorageManager depending on the container’s type.

RestoreStorageManagers will also need to support (in addition to the state-replication functionality):

a) Metrics reporting functionality,

b) Heartbeating functionality, and

c) Functionality to persist its host-to-container locality mapping,


  • Mehul A. Shah, Joseph M. Hellerstein and Eric Brewer Highly-Available, Fault-Tolerant, Parallel Dataflows , SIGMOD, June 2004.

  • Mehul A. Shah, Joseph M. Hellerstein, Sirish Chandrasekaran and Michael J. Franklin. Flux: An Adaptive Partitioning Operator for Continuous Query Systems, ICDE, March 2003.

  • Abadi, D.J., Ahmad, Y., Balazinska, M., Cetintemel, U., Cherniack, M., Hwang, J.H., Lindner, W., Maskey, A., Rasin, A., Ryvkina, E. and Tatbul, N., 2005, January. The design of the borealis stream processing engine. In Cidr 2005.

  • Abadi, Daniel J., et al. "Aurora: a new model and architecture for data stream management." the VLDB Journal 12.2 (2003): 120-139.

  • No labels