Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Lock interface to replace current Latch interface

    public interface Lock {
     /**
    * Acquires the lock
    */
     void lock();

    /**
    * Releases the lock
    */
     void unlock();
     }

     


  • JobCoordinator

    public class AzureJobCoordinator implements JobCoordinator {
     
    public void start() {}
    public void stop() {}
    }
    public class BlobUtils {

    public boolean publishJobModel(String jobModelVersion, JobModel jobModel) {}
    public JobModel getJobModel(String jobModelVersion) {}
    public String getJobModelVersion() {}
    public boolean publishBarrierState(String state, String leaseId) {}
    public String getBarrierState() {}
    public boolean publishLiveProcessorList(List<String> processors, String leaseId) {}
    public List<String> getLiveProcessorList() {}
    }
    public class TableUtils {

    public void addProcessorEntity(String jmVersion, String pid, int liveness, boolean isLeader) {}
    public ProcessorEntity getEntity(String jmVersion, String pid) {{}
    public void updateHeartbeat(String jmVersion, String pid, int liveness) {}
    public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {}
    public void deleteProcessorEntity(String jmVersion, String pid) {}
    public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) {}
     
    public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) {}
    }

    public class ProcessorEntity extends TableServiceEntity {
     private int liveness;
    private boolean isLeader;
    }

     

     

  • LatchLock

    public class AzureLock implements Lock {}


  • LeaderElection

    public class AzureLeaderElector implements LeaderElector {
    LeaseBlobManager leaseBlobManager;

     @Override
    public void setLeaderElectorListener(LeaderElectorListener listener) {
    }

    /**
    * Try and acquire a lease on the shared blob.
    */
    @Override
    public void tryBecomeLeader() {
    }

    /**
    * Release the lease
    */
    @Override
    public void resignLeadership() {
    }

    @Override
    public boolean amILeader() {
    return false;
    }

    }
    public class LeaseBlobManager {
      private CloudBlobContainer container;
    private CloudPageBlob leaseBlob;

    public LeaseBlobManager(CloudBlobClient blobClient, String containerName, String blobNameCloudPageBlob leaseBlob) {
    }

    public void releaseLease(String leaseId) {
    }

    public String acquireLease(int leaseTimeInSec, String leaseId, long length) {
    }

    public boolean renewLease(String leaseId) {
    }
    }

...

The following config values will be introduced for this implementation:

  • Azure Job Coordinator: job.coordinator.factory = org.apache.samza.azure.AzureJobCoordinatorFactory for job.coordinator.factory.
  • Azure Storage Account Name
  • Azure Storage Account Key
  • Connection String: azure.storage.connect = DefaultEndpointsProtocol=https;AccountName="Insert your account name";AccountKey="Insert your account key" 

Implementation and Test Plan

  • Implement the AzureJobCoordinator, LeaderElection and Latch Lock functionality

  • Add metrics to monitor the new features

  • Implement necessary unit tests and integration tests for the added functionalities (Details: TBD)

...