Current state: ACCEPTED

Discussion thread:

JIRA SAMZA-1714 - Getting issue details... STATUS



At runtime, application code can be initialized through InitableTask.init for the low-level API and InitiableFunction.init for the high-level API. These init functions have access to a TaskContext object which contains ways to access certain functionality provided by the Samza framework, such as tables and metrics. For the high-level API, applications can also define their own runtime context objects through the ContextManager.

There are a few challenges with this API:

  • Scope of context objects can be unclear (e.g. task context vs. container context)
  • Inconsistencies between low-level and high-level API

In addition, there is some new functionality to add:

  • ApplicationDescriptor needs to have an API to specify how to create application-defined context, and this description needs to be serializable so that it can be passed throughout all containers
  • Applications should be able to specify container-level context

Proposed Changes

  1. The API for initializing low-level and high-level applications will be updated to accept a new Context object type. This Context will have a structure which clarifies the scope of context objects and provides consistent functionality across low-level and high-level.
  2. New methods will be added to ApplicationDescriptor for accepting factories that will be used to create application-defined context objects.

Public Interfaces


public interface Context {
  JobContext getJobContext();
  ContainerContext getContainerContext();
  TaskContext getTaskContext();
  ApplicationContainerContext getApplicationContainerContext();
  ApplicationTaskContext getApplicationTaskContext();

Here are further descriptions for what is in the context:

  • getJobContext: framework-provided context at the job level
    • getJobName: Name of the job
    • getJobId: Id for the instance of the job
    • getConfig: Configuration for the job
  • getContainerContext: framework-provided context at the container level
    • getContainerMetricsRegistry: used to register application metrics at the container level
    • getContainerModel: metadata about the container and its tasks
      • getProcessorId: id for this container
      • getTasks: mapping from task name to task model for the tasks on this container
  • getTaskContext: framework-provided context at the task level
    • getTaskMetricsRegistry: used to register application metrics at the task level
    • getStore(storeName): key-value store corresponding to the storeName
    • getTable(tableId): Table corresponding to the tableId
    • getTaskModel: metadata about the task
      • getTaskName: name of the task
      • getSystemStreamPartitions: get all of the SystemStreamPartitions for this task
      • getChangelogPartition: get the partition that is used for changelogs for this task
    • getCallbackScheduler: used to register/delete scheduled callbacks
    • setStartingOffset(systemStreamPartition, offset): set the offset to start processing at for a given systemStreamPartition
  • getApplicationContainerContext: application-provided context at the container level (needs to be casted to concrete type after accessed by application code)
  • getApplicationTaskContext: application-provided context at the task level (needs to be casted to concrete type after accessed by application code)

Application Context

Application Context and Factories
public interface ApplicationContainerContext {
  void start();
  void stop();

public interface ApplicationTaskContext {
  void start();
  void stop();

public interface ApplicationContainerContextFactory<T extends ApplicationContainerContext> {
  T create(JobContext jobContext, ContainerContext containerContext);

public interface ApplicationTaskContextFactory<T extends ApplicationTaskContext> {
  T create(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext, ApplicationContainerContext applicationContainerContext);

Initable APIs

Initable APIs
public interface InitableTask {
  void init(Context context);
public interface InitableFunction {
  void init(Context context);


public interface ApplicationDescriptor<S extends ApplicationDescriptor> {

  S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory);
  S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory);

Implementation and Test Plan


One main part of the implementation is to add the new interfaces that make up the Context and its internal objects.

The existing InitableTask.init and InitableFunction.init methods need to be changed to accept the Context. Several other internal components, such as operators and tables, also use the old TaskContext and/or SamzaContainerContext, so those will need to be updated to use the new APIs.

Application context lifecycles

For ApplicationContainerContext, the start() method will be called before the init() methods of the Initable objects are called. The stop() method will be called after the close() methods of the Closable objects are called.

For ApplicationTaskContext, the start() method will be called after the init() methods of the Initable objects are called and before the container enters the run loop. The stop() method will be called after the run loop exits and before the close() methods of the Closable objects are called.

Compatibility, Deprecation, and Migration Plan

The new Context APIs are not backwards compatible with the old ones. However, they do provide the same access to context information, just through different methods. Therefore, it should be relatively easy to change to use the new Context APIs.

Rejected Alternatives

  1. Access metadata (e.g. container model, task model) separately from runtime objects (e.g. metrics registry): This could help to differentiate certain types of context objects, but it requires some extra wiring and would not be as well consolidated as the proposed solution.
  2. Continue to use the existing TaskContext API, and put the additional necessary context into that object: This would be a bit more compatible with the old flow, but the structure of the context will not be as intuitive.
  3. Add a way to save context objects on the processor while in rebalancing mode: In the current proposed solution, if the SamzaContainer gets restarted (e.g. during rebalancing), then the container context will get recreated. However, that could take time, so if some of the context could be saved, then the restart could be faster. However, it would add more complexity to provide a way to determine if some context can be reused, as some context may not be able to be reused if the job model changes.

  • No labels