...
Lock interface to replace current Latch interface
public interface Lock {/**
* Acquires the lock
*/
void lock();
/**
* Releases the lock
*/
void unlock();}
JobCoordinator
public
class AzureJobCoordinator
implements JobCoordinator
{
public void start() {}
public void stop() {}
}
public
class BlobUtils
{
public boolean publishJobModel(String jobModelVersion, JobModel jobModel) {}
public JobModel getJobModel(String jobModelVersion) {}
public String getJobModelVersion() {}
public boolean publishBarrierState(String state, String leaseId) {}
public String getBarrierState() {}
public boolean publishLiveProcessorList(List<String> processors, String leaseId) {}
public List<String> getLiveProcessorList() {}
}
public
class TableUtils
{
public void addProcessorEntity(String jmVersion, String pid, int liveness, boolean isLeader) {}
public ProcessorEntity getEntity(String jmVersion, String pid) {{}
public void updateHeartbeat(String jmVersion, String pid, int liveness) {}
public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {}
public void deleteProcessorEntity(String jmVersion, String pid) {}
public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) {}
public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) {}
}
public
class ProcessorEntity
extends TableServiceEntity
{
private int liveness;
private boolean isLeader;}
LatchLock
public
class AzureLock
implements Lock
{
}LeaderElection
public
class AzureLeaderElector implements LeaderElector {LeaseBlobManager leaseBlobManager;@Override
public void setLeaderElectorListener(LeaderElectorListener listener) {
}
/**
* Try and acquire a lease on the shared blob.
*/
@Override
public void tryBecomeLeader() {
}
}
/**
* Release the lease
*/
@Override
public void resignLeadership() {
}
@Override
public boolean amILeader() {
return false;
}public class LeaseBlobManager {private CloudBlobContainer container;
private CloudPageBlob leaseBlob;
public LeaseBlobManager(CloudBlobClient blobClient, String containerName, String blobNameCloudPageBlob leaseBlob) {
}
public void releaseLease(String leaseId) {
}
public String acquireLease(int leaseTimeInSec, String leaseId, long length) {
}
public boolean renewLease(String leaseId) {
}
}
...
The following config values will be introduced for this implementation:
- Azure Job Coordinator: job.coordinator.factory = org.apache.samza.azure.AzureJobCoordinatorFactory for job.coordinator.factory.
- Azure Storage Account Name
- Azure Storage Account Key
- Connection String: azure.storage.connect = DefaultEndpointsProtocol=https;AccountName="Insert your account name";AccountKey="Insert your account key"
Implementation and Test Plan
Implement the AzureJobCoordinator, LeaderElection and Latch Lock functionality
Add metrics to monitor the new features
Implement necessary unit tests and integration tests for the added functionalities (Details: TBD)
...