Status

Current state: Accepted

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201705.mbox/%3CCANxwKLaVro6MBvUJW2RvoNLDO9-G87Y3Ox%2B5W66K_CxBqeVfgQ%40mail.gmail.com%3E

JIRA: SAMZA-871

Released: 0.13.0

Problem

Right now, Samza relies on YARN to detect whether a container is alive/valid or not. This has a few problems as the YARN based liveness detection fails when the NM crashes, causing the container to be rescheduled on a different host without killing the old container, leading to double processing of messages. We need a way to make sure that invalid containers are killed in order to handle duplicate containers being launched.

The proposal is to solve this by implementing a JobCoordinator HTTP endpoint for a heart beat between the containers and the JobCoordinator.

Motivation

With the direct heart beat mechanism between the JobCoordinator and SamzaContainer, we can be agnostic to whatever the YARN RM/NM/AM sync status is. It is also simple to implement and understand due to its synchronous flow. 

Proposed Changes

JobCoordinator side

  • Expose a REST endpoint (eg: /containerHeartbeat) who's purpose is to get requests from the Samza container periodically and respond back weather the container is in the Job Coordinator's current list of valid containers.

    $ curl <host>:<port>/containerHeartbeat?executionContainerId=container_1490224420978_0323_01_000282
    {
    	alive: true
    }
  • Endpoint could be a part of the JobModelManager's servlet which is currently used for retrieving the JobModel by the containers during startup.
  • Endpoint can accept a "executionContainerId" (eg: YARN container ID) and validate it against state maintained by the Job Coordinator (eg: YarnAppState) and future implementations of other cluster managers need to implement this endpoint and expose the same validation.

Container side

  • In the LocalContainerRunner we can start a monitor that periodically polls the above endpoint to check if the container is valid.
    This new ContainerHeartbeatMonitor class accepts a callback and a ContainerHeartbeatClient (which implements the business logic to make heartbeat checks on the JC endpoint).
    The ContainerHeartbeatMonitor schedules a thread at a fixed rate which uses the client to check if the heartbeat is still valid. On failure of the heartbeat, the callback is executed, which is used to shutdown the container and set state on LocalContainerRunner to shutdown the main thread with a non-zero code.

Public Interfaces

  • Set an environment variable "EXECUTION_ENV_CONTAINER_ID" (eg: YARN container ID) during container launch. This can be read from the container to make requests to the above endpoint.

Implementation and Test Plan

  • Introduce the new REST endpoints for YARN.
  • Implement heartbeat monitor on container side that is responsible for killing the container if needed.
  • Setup metrics for number of invalid containers that made a call to the endpoint.
  • Add unit tests to test/verify compatibility

Compatibility, Deprecation, and Migration Plan

The changes in this SEP currently only targets the YARN deployment model and is not intended for Standalone.

Rejected Alternatives

  • Use CoordinatorStream as the heart beat channel. 
    • Pros: use async pub-sub model to avoid timeouts in sync methods, easy to scale to a large number of containers
    • Cons: protocol is more complex to implement, message/token delivery latency maybe uncertain and make the heart beat process much longer.

 

  • No labels