Status
Current state: ACCEPTED
Discussion thread: http://mail-archives.apache.org/mod_mbox/samza-dev/202001.mbox/browser
JIRA: SAMZA-2373
Released: Samza 1.5
Problem
Samza operates in a multi-tenant environment with cluster managers like Yarn and Mesos where a single host can run multiple Samza containers. Often due to soft limits configured for cluster managers like Yarn and no notion of dynamic workload balancing in Samza a host lands in a situation where it is underperforming and it is desired to move one or more containers from that host to other hosts. Today this is not possible without affecting other jobs on the hot host or restarting the affected job manually. In other use cases like resetting checkpoints of a single container or supporting canary or rolling bounces the ability to restart a single or a subset of containers without restarting the whole job is highly desirable.
This doc addresses the problem of restarting/moving a single container of a job without affecting other containers of the same job or other jobs running on the same host. X
Motivation
Alleviating Hot Host Problems: Yarn as a resource manager in Samza is configured to operate on soft limits (a job sets vcore for a container or has defaulted, this is the minimum guarantee of resources to be provided by Yarn) & most of the customers go by default and fail to right-size their Job. In addition, there is no notion of a dynamic cluster balancer in Samza at LinkedIn today. Although Yarn to some capacity acts as a Cluster Balancer if configured with hard limits. Due to this often a host lands in a situation when containers on it are underperforming (also referred to as a hot host) because it has CPU heavy containers running on it while some other hosts are underutilized. Now it is desirable to move this container to a different host. To achieve the same following solutions exists:
- Rewrite the locality mapping in coordinator stream and restart the job
- Take the hot host out of rotation which kills containers from all the jobs running on that host. Then trigger a restart on a job whose container is supposed to be moved so Yarn would try to allocate some other host for it. Once the container starts on other hosts, then put the hot host in rotation again so that other container who were killed as a result of taking host out can be attempted to restart on the hot host again. It ain't easy!
If the ability to move a container exists, someone at the simplest can manually move containers to different hosts or write some simple scripts to automate that.
Canary / Rolling Bounces: When there is a bug in the Samza framework code that affects the Samza container deployment, Samza engineer needs to manually restart the container process on the given machine with the given binary version. This involves multiple steps (e.g. manually identify and log in the container host before using kill -9 to stop the process) which is inconvenient. With the restart ability, the same system can be used for building support for Canary or Rolling Bounces for YARN based Samza deployments, restart ability can be easily extended to deploy a single or subset of containers using a different version of application code.
Resetting Checkpoints: Startpoint API has made resetting checkpoints easy but it still needs a dev to restart his job once he has set his start points. The restart can be potentially be prevented with restart ability of a single container or a few containers
Draining a host: Moving all running containers from a host sequentially to other hosts or in parallel.
Fix a Job in Degraded State: Often users find it desirable to just restart a single container for various reasons like underperforming containers when only a few containers running into exceptions because some partitions have corrupt messages. Today Samza kills the job if a container has run into exceptions more than a fixed configured number of times. In these scenarios, it's desirable for Users to keep the job running in a degraded state and only fix one or few containers of the job and issue a restart for them.
Dynamic Workload Balancer: The ability to move and restart containers is the fundamental building block to developing a load balancer (like Cruise Control) for Samza. At the very simple this load balancer can be a simple script trying to balance cluster, later it can be built into a more sophisticated system.
Heterogeneous container: Each Samza job today has homogenous containers (in regards to memory & vcore configurations). One of the desired use cases for Samza in the future is the ability to restart a container with different sizes (memory & cpu) which can be used by ay auto-sizing engine designed for samza.
Selectively Spin StandBy Containers: Samza has a feature of Hot StandBy Containers for reducing stateful restoration times. Enabling this feature for a job involves doubling the containers at the least (simplest case where every container has 1 standby replica enabled). Customers are reluctant to enable this since doubling the containers increases the cost to serve. To improve the adoption for this feature we can build the ability to spin up StandBy Containers for a single or a subset of containers while the job is running, these StandBy Containers then can be used for failover to reduce downtime.
Requirements
Goals
- Build the ability to move container (non-am) of a running job from its existing host to a preferred host (if specified) or any host
- If an existing stateful container has a standby container, move to the active container to standby container
- If an existing stateful container is without a standby container, spin up a standby container and then move the active to standBy once stand by has bootstrapped to reduce downtime [Stretch]
- Build the ability to restart containers with the same host preferences
- Build the ability to selectively spin up standby for one or a subset of containers [Stretch]
- Expose these APIs via a tool or dashboard for operability
Non-Goals
- This system does not intend to solve the problem of Dynamic Workload Balance i.e Cruise control. It may act as a building block for one later.
- Solving the canary problem for YARN based deployment model is out of the scope of this solution however system built should be easily extensible to support canary
- This system will not have built-in intelligence to find a better match for the host for a container it will make simplistic decisions as per params passed by the user.
SLA / SCALE / LIMITS (Assumptions)
- At a time AM for a single job will only serve one request per container, parallel requests across containers are still supported. If a control request is underway any other requests issued on the same container will be queued. Same assumption holds for in-flight requests on standby and active i.e if any container placement request is in-progress for an active or its standby replica, all subsequent placement actions on either are queued
- Actions are de-queued and sorted in order of timestamps populated by the client and are executed in that order
- The system should be capable of scaling to be used across different jobs at the same time
General Requirements
At a high level the system can be divided into two logical components:
- Container Placement Handler (CPH): to serve as a dispatcher of specific control requests from a control plane. Fetches the container placement control action from control-plane, queues it up and dispatches requests on containers of a job (non-am) with policy (details below)
- Container Placement Service: API layer built around Cluster Manager to move containers. Reacts to control actions from CPH and interacts with underlying Cluster manager to execute them
Requirements for Container Placement Handler
- Should be able to handle multiple move request for different containers at the time for a single job
- Should provide the good monitoring mechanism
- Should be general enough to be coupled with any Control Plane
- Should allow queuing requests for multiple containers across multiple jobs at the same time
- Should be easily extensible to build canary support for yarn deployments
Requirements for Container Placement Service
- Under no circumstances should Control Actions result in an inconsistent / non-working state of the job (i.e., #running-containers should equal number of configured containers).
- Should not be tightly coupled with YARN, should be extensible to be used by any cluster manager implementation
Failure Scenarios & Assumptions
- Each control action will be associated with a deployment id of an app it is intended to be issued upon
- Across Job restarts, any requests issued for the previous deployment incarnation will not be respected. This includes AM restarts (today if the AM restarts that implies a job restart)
Glossary
Term | Description |
Cluster Manager | Resource manager used by samza eg. Yarn, Mesos, etc |
Job Coordinator (JC) { Also referred to as ApplicationMaster (AM) for Yarn } | Each Samza application has a JC that manages the application’s workload, asks RM for containers and handles notifications when one of its containers fails. |
Node Manager (NM) | A single node in a Yarn Cluster |
Resource Manager (RM) | Central Service that provides like scheduling, heartbeats, liveness monitoring to all applications running in the YARN cluster |
Host Affinity | The ability of Samza to allocate a container to the same machine across job restarts is referred to as host-affinity. |
Container processorId | Samza allocates containers resource Ids eg 0, 1, 2 which remains the same across restarts of a job |
Proposed Solution:
Solution 1. Write locality to Coordinator Stream (Metastore) and restart job [Rejected]
This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart
Pros | Cons |
|
|
Solution 2. Container Placement API [Accepted]
API design
On the basis of types of Control actions, the commands are the following:
1. Container Placement Actions (Move / Restart)
API | placeContainer |
Description | Active Container: Stops container process on source-host and starts it for
StandBy Container: Stops container process on source-host and starts it on:
|
Parameters | deploymentId: unique identifier of the deployed app for which the action is taken processor-id: Samza resource id of container e.g 0, 1, 2 destination-host: valid hostname / “ANY_HOST” / “STANDBY” request-expiry-timeout: [optional]: timeout for any resource request to the cluster manager |
Status code | CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED |
Returns | UUID for the client to query the status of request |
Failure Scenarios | There are following cases under which a request to place container might fail:
|
Note: For supporting canary above parameter list can be easily extended to support the following parameters
Parameters | app-version: user application version [optional] samza-version: samza framework version [optional] jvm-args: arbitrary string to be used as jvm arguments [optional] |
2. Container Status
API | containerStatus |
Description | Gives the status & info of the container placement request, for ex is it running, stopped what control commands are issued on it |
Parameters | processor-id: Samza resource id of container e.g 0, 1, 2 deploymentId: unique identifier of the deployed app for which the action is taken |
Status code | BAD_REQUEST, SUCCEEDED |
Returns | Status of the Container placement action |
2. Enable & Disable StandBy [Stretch]
API | controlStandBy |
Description | Starts or Stops a standBy container for the active container |
Parameters | processor-id: Samza resource id of container e.g 0, 1, 2 deploymentId: unique identifier of the deployed app for which the action is taken |
Status code | CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED |
Returns | UUID for the client to query the status of the request |
Architecture
For implementing a scalable container placement control system, the proposed solution is divided into two parts:
Part 1. Container Placement Handler
- Control Plane is a channel outside the job that allows taking control actions by multiple controllers like Samza Dashboard, Startpoints controller.
- ContainerPlacementHandler is a stateless handler registered to control plane that dispatches placement actions to invoke Container Placement Service APIs
This control plane can be implemented in the following ways
Option 1: Samza Metastore API [Preferred]
Samza Metastore provides an API to write to the coordinator stream. One simple way to expose Container Placement API is, Container Placement handler can have a coordinator stream consumer polling control messages from the coordinator stream
under a separate namespace (different than the one where locality messages of containers live) & acting on them.
Pros | Cons |
|
|
Option 2: Stub a Rest Endpoint in JC [Rejected Alternative]
It's fairly simple to embed a REST endpoint in JC or extend the existing Rest endpoint in JC that exposes configs in JC to support apis listed above
Pros | Cons |
|
|
Implementation Details:
- ContainerPlacementHandler is a stateless handler dispatching ContainerPlacementRequestMessages from Metastore to Container Placement Service & ContainerPlacementResponseMessages from Container Placement Service to metastore for external controls to query the status of an action. (PR).
- Metastore used today by in Samza by default is Kafka (coordinator stream) which is used to store configs & container mappings & is log compacted
- ContainerPlacementRequestMessage & ContainerPlacementResponseMessage are maintained in same namespace using NamespaceAwareMetaStore ("samza-place-container-v1")
Key-Value Format
Key for storing the ContainerPlacementRequestMessage & ContainerPlacementResponseMessage in Metastore is chosen to be UUID + "." + messageType(ContainerPlacementResponseMessage or ContainerPlacementRequestMessage). Value will be payload container ContainerPlacementRequestMessage & ContainerPlacementResponseMessage. Messages are written and read to the Metastore through the MetadataStore abstraction.
ContainerPlacementRequestMessage:
Key | Value | Field Description | Field Type |
---|---|---|---|
"UUID.subType" | uuid | Unique identifier of a response message | Required |
processorId | Logical processor id 0,1,2 of the container | Required | |
deploymentId | Unique identifier for a deployment | Required | |
subType | Type of message here: ContainerPlacementRequestMessage | Required | |
destinationHost | Destination host where the container is desired to be moved | Required | |
statusCode | Status of the request | Required | |
timestamp | The timestamp of the response message | Required | |
requestExpiry | Request expiry which acts as a timeout for any resource request to cluster resource manager | Optional |
Sample KV
Key | Value |
---|---|
[1,"samza-place-container-v1","f068175b-c9b6-4f34-982b-ecb5619f21de.ContainerPlacementRequestMessage"] | {"processorId":"1","deploymentId":"app-atttempt-001","subType":"ContainerPlacementRequestMessage","uuid":"f068175b-c9b6-4f34-982b-ecb5619f21de","destinationHost":"ANY_HOST","statusCode":"CREATED","timestamp":1578693870484} |
ContainerPlacementResponseMessage:
Key | Value | Field Description | Field Type |
---|---|---|---|
"UUID.subType" | uuid | Unique identifier of a response message | Required |
processorId | Logical processor id 0,1,2 of the container | Required | |
deploymentId | Unique identifier for a deployment | Required | |
subType | Type of message here: ContainerPlacementResponseMessage | Required | |
destinationHost | Destination host where the container is desired to be moved | Required | |
statusCode | Status of the response | Required | |
responseMessage | Response message in conjunction to status | Required | |
timestamp | The timestamp of the response message | Required | |
requestExpiry | Request expiry which acts as a timeout for any resource request to cluster resource manager | Optional |
Sample KV
Key | Value |
---|---|
[1,"samza-place-container-v1","88b0d30c-d518-4307-9e8e-c8529eb30f04.ContainerPlacementResponseMessage"] | {"processorId":"1","deploymentId":"app-atttempt-001","subType":"ContainerPlacementResponseMessage","responseMessage":"Request is accepted","uuid":"88b0d30c-d518-4307-9e8e-c8529eb30f04","destinationHost":"ANY_HOST","statusCode":"ACCEPTED","timestamp":1578694070875} |
Challenges with Metastore
Metastore today (Kafka) is at least once & eventually consistent, hence ContainerPlacementService has to do in-memory caching of UUIDs of accepted actions so that it does not take one request twice in case of duplicates delivered. But the in-memory caching must not be an unbounded cache since that can result in a job running out of memory. Size of a UUID is 16bytes, at max a job lets say might have 500 containers, then one request action per container for 500 containers will result in 0.008 MBs of increase memory (just in memory lookup). If we cache lets say last 20K actions (which can accomodate 40 failovers of 500 containers in the current scenario) the memory used will be 0.64 MBs at max if we implement a FIFO cache (inmemory lookup + fifo queue).
GC policy for stale messages in metastore
- One way to delete stale ContainerPlacementMessages is to delete request/responses from the previous incarnation of the job in the metastore on job restarts, this is the responsibility of ContainerPlacementService
- Once the request is complete, ContainerPlacementService can issue an async delete to clean up the request from the metastore
- Request/response message can be externally cleaned by a tool
Part 2. Container Placement Service
Container Placement service is a set of APIs built around AM to move/restart containers. Container Placement Service periodically gets a queue of placement actions per container which it issues in parallel across different containers but sequentially on one container. Each placement request has a "deploymentId" attached to it because if a job restarts all the placement actions queued for the previous deployment must be disregarded and deleted. Samza internally has an id generated for each run of a job ("app.run.id") that is generated at the job planning phase we can use that id as the "deploymentId" for placement requests.
The solution proposes to refactor & simplify the current AM code & introduce a ContainerManager which is a single entity managing container actions like start, stop for both active and standby containers. Enlisted are functions of ContainerManager & proposed refactoring around the AM code
- Remove the HostAwareContainerAllocator & ContainerAllocator, simplify Container Allocator as a simple lightweight entity allocating requests to available resources (PR1, PR2)
- Introduce ContainerManager which acts as a brain for validating and issuing any actions on containers in the Job Coordinator for both active & Standby containers. (PR)
- Transfer state & validation of container launch & expired request handling from ContainerAllocator to ContainerManager
- Transfer state & lifecycle management of Container allocator & resource request on job start (reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager*
- Encapsulates logic and state related to container placement actions like move, restarts for active & standby container in ContainerManager (PR-1, TDB)
- It is ContainerManager’s duty to validate any ContainerPlacementRequestMessages & also invalidate messages from the previous deployment incarnation
- It is ContainerManager’s duty to write ContainerPlacementResponseMessages to Metastore for the external controller to query the status of the request
- ContainerPlacementMetadata is a metadata holder for container actions (ControlActionMetadata) for a request_id, current status, requested resources etc
Note: *ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes will be done except for moving state & lifecycle management of Container allocator & resource request on job start (reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests.
2.1 Container Move
2.1.1 Stateless Container Move & Stateful Container Move (without Standby)
Option 1: Reserve-Stop-Move: Request Resource first and Only make a move once resources are allocated by ClusterResource Manager (Preferred)
Pros | Cons |
|
|
Option 2: Stop-Reserve-Move: Stop the container and then make a resource request for preferred resources and attempt a start (Rejected Alternative)
Pros | Cons |
|
|
Orchestration of Stateless Container Move using Option 1: The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.
The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.
- ContainerPlacementHandler dispatches the move request to ContainerProcessManager
- ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
- Now there are two possible scenarios
- Requested Resources are allocated by ClusterResourceManager
- In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
- On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage on allocated resources
- If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
- Requested Resources are allocated by ClusterResourceManager
- Resource Request expires
- In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected
Who writes the new locality mapping after a successful move?
Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality
Failure Scenarios:
- If the preferred resources are not able to be acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
- If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
- If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then container fallbacks to source host and a failure notification are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST
Note: ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes (in part 2) will be done except for moving state & lifecycle management of Container allocator & resource request on job start(reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests.
Option 3: Stateful without Standby (Spin Up StandBy container & then move) (Phase 2) [Strech]
If we expose an API to spin a StandBy Container for an active container, a client can then make two requests, one to spin up a StandBy then periodically monitor the lag for StandByCotntainer (Using Diagnostics). Once StandBy Container has a steady lag is ready to move, this case becomes Case 1 mentioned above. (More details TBD later)
2.1.2 Stateful Container Move with Standby Enabled
The stateful container is divided into two cases on the basis of whether the job has StandBy Container for it or not
Let's take a case when the job has a stateful container C on H1 and has a StandBy container enabled C’ on H2
Option 1: Stop-Reserve-Move: Initiate a Stop on C & issue a Standby failover (Preferred - Phase 1*)
In this option, C is stopped on H1 and failover is initiated from H1 to H2 with current semantics of Failover of an active Container in the Hot standby feature. This ensures C moves either to H2 (in the best case) or to some other hosts (when Cluster Manager cannot allocate capacity on H2, i.e H2 is under capacity)
Pros | Cons |
|
|
Option 2: Reserve-Stop-Move: Initiate a Standby move first then issue a move for Active Container (Preferred - Phase 2*)
- In this option request is first issued to stop C’
- Then request H2 from cluster manager to start C
- Issue a stop on C only H2 can be allocated (#2 succeeds).
- Then request C’ to start ANY_HOST except for H1, H2
Two ways to achieve this:
- A client can make two calls for this first move C’ to ANY_HOST apart from H2, H1 (similar to a stateless move), then move the request of C to C’ (similar to a stateless move) so the state maintenance of this lives on the client
- Maintain state in JC to accomplish this
Pros | Cons |
|
|
* Phase 1 & Phase 2 refer to the implementation phases, please see the section: Implementation Plan
Option 3: Reserve-Stop-Move: First Request H2 from Cluster Manager and initiate the failover once you get H2 as a preferred resource [Rejected Alternative]
- In this option C’ is stopped on H2 and failover is initiated from H1 to H2 with current semantics as developed by the feature Hot standby
- In case of the move failed only affects standby
Pros | Cons |
|
|
Orchestration of StandBy Enabled Container Move using Option 1 (Phase 1)
- ContainerPlacementHandler dispatches the move request to ContainerProcessManager
- ContainerProcessManager registers a move & initiates a failover with StandByContainerManager then the following failover sequence is followed
2.2 Container Restart
This is much simpler as compared to Move. We can either make restart equivalent to a move or issue a stop a container and ContainerProcessManager will try to start that container again. Let's discuss these options in detail:
Option 1: Reserve-Stop-Start: Restart is equivalent to move on the same host [Preferred]
- In this option, resources are requested on the same host first before issuing a stop on Container
- Once the resources are accrued a container is stopped & requested to start
- So this is equivalent to move semantics above
Pros | Cons |
|
|
Both stateless & stateful container restarts will be equivalent to Stateless Container Move on the same host. Since the container is restarted on the same host then
Option 2: Stop-Reserve-Start: Restart is equivalent to stopping container first then attempting to start it [Rejected Alternative]
- In this option, a container is issued a stop first
- Once the container stops, resources are requested for starting on the same host (last seen host)
- Once the resources are accrued, the container is issued a start on the same host
Pros | Cons |
|
|
Usage Example:
place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp --app.id = 1 --processor-id 4 --request-expiry 10 --destination-host abc.prod.com
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host") public class ContainerPlacementTool { ... _appName = // read from commandline _appId = // read from commandline _deploymentId = // read from commandline _processorId = // read from commandline _destinationHost = // read from commandline _requestExpiry = // read from commandline MetadataStore metadataStore = buildMetadataStore(_appName, _appId); try { ContainerPlacementMetadataStore containerPlacementMetadataStore = new ContainerPlacementMetadataStore(metadataStore); containerPlacementMetadataStore.start(); Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null; UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId, _destinationHost, _requestExpiry, System.currentTimeMillis()); System.out.println("Request received query the status using: " + uuid); } finally { metadataStore.close(); } }
Public Interfaces
/** * Encapsulates the request or response payload information between the ContainerPlacementHandler service and external * controllers issuing placement actions */ @InterfaceStability.Evolving public abstract class ContainerPlacementMessage { public enum StatusCode { /** * Indicates that the container placement action is created */ CREATED, /** * Indicates that the container placement action was rejected because request was deemed invalid */ BAD_REQUEST, /** * Indicates that the container placement action is accepted and waiting to be processed */ ACCEPTED, /** * Indicates that the container placement action is in progress */ IN_PROGRESS, /** * Indicates that the container placement action is in progress */ SUCCEEDED, /** * Indicates that the container placement action is in failed */ FAILED; } /** * UUID attached to a message which helps in identifying duplicate request messages written to metastore and not * retake actions even if metastore is eventually consistent */ protected final UUID uuid; /** * Unique identifier for a deployment so messages can be invalidated across a job restarts * for ex yarn bases cluster manager should set this to app attempt id */ protected final String deploymentId; // Logical container Id 0, 1, 2 protected final String processorId; // Destination host where container is desired to be moved protected final String destinationHost; // Optional request expiry which acts as a timeout for any resource request to cluster resource manager protected final Duration requestExpiry; // Status of the current request protected final StatusCode statusCode; // Timestamp of the request or response message protected final long timestamp; protected ContainerPlacementMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, Duration requestExpiry, StatusCode statusCode, long timestamp) {…} }
/** * Encapsulates the request sent from the external controller to the JobCoordinator to take a container placement action */ public class ContainerPlacementRequestMessage extends ContainerPlacementMessage { public ContainerPlacementRequestMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...} public ContainerPlacementRequestMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, long timestamp) {...} }
/** * Encapsulates the response sent from the JobCoordinator for a container placement action */ public class ContainerPlacementResponseMessage extends ContainerPlacementMessage { // Returned status of the request private String responseMessage; public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...} public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, StatusCode statusCode, String responseMessage, long timestamp) {...}
Other Interfaces
/** * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage} * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage} */ public class ContainerPlacementMetadataStore { /** * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers * to issue a request to JobCoordinator * * @param deploymentId identifier of the deployment * @param processorId logical id of the samza container 0,1,2 * @param destinationHost host where the container is desired to move * @param requestExpiry optional per request expiry timeout for requests to cluster manager * @param timestamp timestamp of the request * @return uuid generated for the request */ public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...} /** * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore * @param uuid uuid of the request * @return ContainerPlacementRequestMessage is its present */ public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(UUID uuid) {...} /** * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore * @param uuid uuid of the request * @return ContainerPlacementResponseMessage is its present */ public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(UUID uuid) {..} /** * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code uuid} * @param uuid uuid of the request */ public void deleteContainerPlacementRequestMessage(UUID uuid) {..} /** * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code uuid} * @param uuid uuid of the request */ public void deleteContainerPlacementResponseMessage(UUID uuid) {..} /** * Deletes both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} identified by uuid * @param uuid uuid of request and response message */ public void deleteAllContainerPlacementMessages(UUID uuid) {...} /** * Deletes all {@link ContainerPlacementMessage} * @param uuid uuid of the request or response message */ public void deleteAllContainerPlacementMessages(UUID uuid) {..} /** * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. * This method should be used by Job Coordinator only to write responses to Container Placement Action * @param responseMessage response message */ void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..} }
/** * Stateless handler that periodically dispatches {@link ContainerPlacementRequestMessage} read from Metadata store to Job Coordinator */ public class ContainerPlacementRequestAllocator implements Runnable { @Override public void run() {...} }
public class ContainerManager { /** * Registers a container placement action to move the running container to destination host * * @param requestMessage request containing details of placement request * @param containerAllocator to request physical resources */ public void registerContainerPlacementAction(ContainerPlacementRequestMessage requestMessage, ContainerAllocator containerAllocator) {...} /** * Handles the container start action for both active & standby containers. This method is invoked by the allocator thread * * @param request pending request for the preferred host * @param preferredHost preferred host to start the container * @param allocatedResource resource allocated from {@link ClusterResourceManager} * @param resourceRequestState state of request in {@link ContainerAllocator} * @param allocator to request resources from @{@link ClusterResourceManager} * * @return true if the container launch is complete, false if the container launch is in progress. */ boolean handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource, ResourceRequestState resourceRequestState, ContainerAllocator allocator) {..} /** * Handle the container launch failure for active containers and standby (if enabled). * * @param processorId logical id of the container eg 1,2,3 * @param containerId last known id of the container deployed * @param preferredHost host on which container is requested to be deployed * @param containerAllocator allocator for requesting resources */ void handleContainerLaunchFail(String processorId, String containerId, String preferredHost, ContainerAllocator containerAllocator) {...} /** * Handles the state update on successful launch of a container * * @param processorId logical processor id of container 0,1,2 */ void handleContainerLaunchSuccess(String processorId) {...} /** * Handles the action to be taken after the container has been stopped. * * @param processorId logical id of the container eg 1,2,3 * @param containerId last known id of the container deployed * @param preferredHost host on which container was last deployed * @param exitStatus exit code returned by the container * @param preferredHostRetryDelay delay to be incurred before requesting resources * @param containerAllocator allocator for requesting resources */ void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus, Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {..} /** * Handles an expired resource request for both active and standby containers. * * @param processorId logical id of the container * @param preferredHost host on which container is requested to be deployed * @param request pending request for the preferred host * @param allocator allocator for requesting resources * @param resourceRequestState state of request in {@link ContainerAllocator} */ void handleExpiredRequest(String processorId, String preferredHost, SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {..} }
public class ContainerPlacementMetadata { /** * State to track container failover */ public enum ContainerStatus { RUNNING, STOP_IN_PROGRESS, STOPPED } // Container Placement request message private final ContainerPlacementRequestMessage requestMessage; // Host where the container is actively running private final String sourceHost; // Resource requests issued during this failover private final Set<SamzaResourceRequest> resourceRequests; // State of the control action private ContainerPlacementMessage.StatusCode actionStatus; // State of the active container to track failover private ContainerStatus containerStatus; // Represents information on current status of action private String responseMessage; }
Implementation and Test Plan
Implementation Plan
Phase | Feature | Timeline |
Phase 1 |
| Q4 2019 |
Phase 2 |
| TBD |
Test Specifications
- Unit testing: Mocking to simulate an end to end behavior
- Manual Testing: Complete testing plan: https://docs.google.com/spreadsheets/d/1v-fw0pHxKRobGkALDCno4FuPCsBhdepQ86vIGLHWu54/edit?usp=drive_web&ouid=115242259601904072922
- Local deployment testing with Virtual Private Cluster (LXC with Samza)
- Integration Testing: Create a test job using VPC or in the real cluster that automates testing for taking control actions on containers
Compatibility, Deprecation, and Migration Plan
- The new interfaces & API introduced should not affect any part of the User code & should be backward compatible. No migration needed for this new feature.
- Jobs using older samza version shall be able to discard the Control messages written to the metastore
Rejected Alternatives
Note: These are described in the implementation along with preferred options above for the sake of consistency in reading
References
- https://www.cloudera.com/documentation/enterprise/5-10-x/topics/cm_mc_rolling_restart.html
- SEP-19: Hot standby state for Samza applications
6 Comments
Prateek Maheshwari
Sanil Jain
Its exactly same as ContainerPlacementMessage but in future this class can be easily evolved to include more params
Prateek Maheshwari
Sanil Jain
Shall I abstract it out as another util?
Prateek Maheshwari
Sanil Jain
I have changed it in code to handleExpiredRequest for the sake of brevity, expired request only apply to host affinity enabled cases & in cases of control actions