Current state: [ UNDER DISCUSSION ]

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-1373 - Getting issue details... STATUS


Link to Presentation Slides: Samza on Azure


Originally, Samza could be run in distributed mode, only with the help of Yarn. Yarn was responsible for allocating resources for each physical process, and coordinating between them. Recently, Samza was released in embedded mode which enables you to use it as a library. Currently, the coordination services in the embedded version of Samza have been written using Zookeeper. The goal of this proposal is to write the same coordination primitives using services provided in Microsoft Azure, in order to make the coordination service pluggable.


With the 0.13.0 release, Samza introduced a flexible deployment model which enables you to run it in containerized environments, with resource managers other than YARN, or in the cloud with the proper coordination primitives. It also enables you to run Samza as a library, within your application. The motivation to run Samza Embedded on Azure is the following:

  • Its current dependency on Zookeeper increases our customers’ reliability on the infrastructure, and does not help with modularity. Zookeeper is tedious to maintain and does not help in componentization. 
  • Introducing a coordination service in Azure will help identify issues with the current Job Coordinator design, and validate the functionalities that Samza Embedded claims it provides. 
  • If incorporated with the EventHub connector for Brooklin, it will give us an end-to-end system running in Azure, giving more motivation to teams in Microsoft, to incorporate Samza in their existing systems. 
  • We will also get all the advantages of moving to the cloud infrastructure.

Proposed Changes

  • Implement the AzureJobCoordinator on top of current JobCoordinator.

  • Change the current Latch implementation to a distributed lock based implementation, and implement it for Zookeeper.

  • Implement the distributed Lock and Leader Election functionality with Lease Blobs in Azure. These are pluggable components. 

    • A blob in Azure storage is used for storing large amounts of unstructured data. 

    • A Lease Blob is an operation that establishes and manages a lock on a blob for write and delete operations. 

    • Acquiring a lease on a blob that has already been leased by someone results in failure. 

    • We will use this service to elect the leader when running in Azure as follows:

      • All the processors will try to acquire a lease on the same shared blob in Azure storage.

      • The processor that acquires the lease becomes the leader and automatically renews the lease at a constant time interval.

      • When the leader dies, it results in release of the lease, following which one of the worker processes will acquire a new lease on the blob. 

  • Azure does not have a watch/subscribe functionality in Azure storage. Therefore, any change in the list of processors in the system will be detected by monitoring each processor's heartbeats. This will happen as follows:
    • We will maintain the list of active processors in Azure Table Storage. 
    • Every entry in a table is uniquely identified by two of its components - PARTITION KEY and ROW KEY. 
    • The PARTITION KEY of a table can be considered as a group id. The ROW KEY identifies every entry in a group uniquely. 
    • In this design:
      • PARTITION KEY = Job Model Version
      • ROW KEY = Processor ID
    • Entries in the table will correspond to a ProcessorEntity, which consists of a liveness value and a boolean field to indicate whether it is the leader or not.
    • The processor table will maintain the current state of the system by actively maintaining the list of live processors. 
    • Every processor will constantly update its liveness value periodically to assert that it is alive. The leader will assume that the processor has died if it has not been modified(written to) since an allowed time (30 sec).

    • The following components will be stored on the lease blob:
      • Job Model (previous and current)
      • Job Model Version (previous and current)
      • Barrier State and Version
      • List of active processors
    • Any new processor will have their job model version as "UNASSIGNED". 
    • During rebalancing:
      • The leader will:
        • Generate new job model
        • Update job model, job model version, barrier state, list of active processors on the blob
        • Start barrier state listener
      • The worker will:
        • Detect change in job model version
        • Pause container, update its job model
        • Add a new row entry with the updated job model version after getting the new job model
        • Start barrier state listener
    • When barrier is reached:
      • The leader will:
        • Update the state on the blob
        • Stop barrier listener
      • The worker will:
        • Resume container
        • Stop barrier listener
        • Delete its unassigned row entry from the table
    • Polling functions:
      • Leader:
        • Renew lease
        • Check list of live processors
        • Check barrier state from table
      • Worker:
        • Heartbeat (5 sec)
        • Check leader aliveness
        • Check job model version updates
        • Check barrier state from blob
    • It should be noted that the leader is also a worker.
    • When leader dies:
      • Leader election triggered. Every processor tries to become the leader.
      • The elected leader will start rebalancing. 
    • TO DO: In case of any orphaned processors, the processor will detect it and take necessary action.

  • Implement an Azure Client class that provides a client side reference for access to the Azure Storage Account, Azure blob storage and Azure table storage.

  • Implement the checkpointing mechanism with Azure Storage.
  • Integrate all of this with the EventHubSystemProducer and EventHubSystemConsumer.

Public Interfaces

The following interfaces will be implemented for Azure:

  • Lock interface to replace current Latch interface

    public interface Lock {
    * Acquires the lock
     void lock();

    * Releases the lock
     void unlock();

  • JobCoordinator

    public class AzureJobCoordinator implements JobCoordinator {
    public void start() {}
    public void stop() {}
    public class BlobUtils {

    public boolean publishJobModel(String jobModelVersion, JobModel jobModel) {}
    public JobModel getJobModel(String jobModelVersion) {}
    public String getJobModelVersion() {}
    public boolean publishBarrierState(String state, String leaseId) {}
    public String getBarrierState() {}
    public boolean publishLiveProcessorList(List<String> processors, String leaseId) {}
    public List<String> getLiveProcessorList() {}
    public class TableUtils {

    public void addProcessorEntity(String jmVersion, String pid, boolean isLeader) {}
    public ProcessorEntity getEntity(String jmVersion, String pid) {{}
    public void updateHeartbeat(String jmVersion, String pid) {}
    public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {}
    public void deleteProcessorEntity(String jmVersion, String pid) {}
    public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) {}
    public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) {}

    public class ProcessorEntity extends TableServiceEntity {
     private int liveness;
    private boolean isLeader;



  • Lock

    public class AzureLock implements Lock {}

  • LeaderElection

    public class AzureLeaderElector implements LeaderElector {
    LeaseBlobManager leaseBlobManager;

    public void setLeaderElectorListener(LeaderElectorListener listener) {

    * Try and acquire a lease on the shared blob.
    public void tryBecomeLeader() {

    * Release the lease
    public void resignLeadership() {

    public boolean amILeader() {
    return false;

    public class LeaseBlobManager {
      private CloudPageBlob leaseBlob;

    public LeaseBlobManager(CloudPageBlob leaseBlob) {

    public void releaseLease(String leaseId) {

    public String acquireLease(int leaseTimeInSec, String leaseId) {

    public boolean renewLease(String leaseId) {

The following config values will be introduced for this implementation:

  • Azure Job Coordinator: job.coordinator.factory = org.apache.samza.AzureJobCoordinatorFactory
  • Azure Storage Connection String: = DefaultEndpointsProtocol=https;AccountName="Insert your account name";AccountKey="Insert your account key" 

Implementation and Test Plan

  • Implement the AzureJobCoordinator, LeaderElection and Lock functionality

  • Add metrics to monitor the new features

  • Implement necessary unit tests and integration tests for the added functionalities (Details: TBD)

Compatibility, Deprecation, and Migration Plan

The changes made in this proposal will be backward compatible.

Rejected Alternatives

  1. Processor Liveness: Each worker processor heartbeating directly to the leader processor through HTTP requests.
  2. State information: Separate request response channel
    • The leader will keep track of whether every processor has the updated version of the job model through HTTP requests. 
    • All processors will pause functioning until everybody has the updated job model version. (Workers pause when they get job model update notification from the leader.)
    • When a processor gets the updated job model version, it sends a notification to the leader.
    • Once the leader gets notifications from all the worker processes, it will send a Http request to all workers to resume functioning.
  3. State information: Per processor blob
    • Every processor will have a separate blob associated with it (apart from the shared lease blob which only the leader can write to). 
    • Each processor writes to its blob periodically to assert that it is alive. 
    • The leader will monitor these blobs to maintain the list of active processors, and delete any blob that has not been modified(written to) since an allowed time. This means that the processor died or lost connection temporarily.
    • In case of any orphaned processors (no associated blobs), the processor will detect the absence of the blob and take necessary action.
    • The leader will also maintain the barrier state, and list of processors that have the updated job model by monitoring worker blobs.
    • Every worker will monitor the leader blob to check its job model version and barrier state. It will pause/resume its functioning according to the barrier state that the leader is advertising.



  • No labels