Current state: [ UNDER DISCUSSION ]

Discussion thread




Samza today supports RocksDB and MemDB as local data stores, which enables users to cache adjunct data for later usage during stream processing. However, the population of a data store is end user’s responsibility. This introduced additional effort from end user to develop and maintain data stores, and to deal with corner cases such as reload after consumers falling off. To avoid these issues, some people employed alternative solutions such as Voldemort, CouchBase, etc. In addition, table oriented operations of fluent API would require working data to be made available by the system. As we look at the issue more closely, it appears generic enough to be addressed by data infrastructure.

A dataset delivered in a stream can be either bounded or unbounded, an example of an unbounded dataset could be a database change stream, and an example of a bounded dataset could be the content of a file. When Samza is running in 24x7 mode, the stream for a bounded dataset may deliver multiple versions.

This proposal focuses on unbounded datasets.


We want to have an adjunct data (AD) store that is a read-only cache. It automatically stores streaming data for later usage. Adjunct data can be accessed the same way as accessing a key-value store in Samza, we provide a consistent view of data from a Samza task’s or container's perspective. Data can be either partitioned or unpartitioned. If the dataset is small enough to fit in a RocksDB instance, the same copy would be populated in every container via a broadcast stream; if it is too large fit in one database instance it would be partitioned across containers of a Samza job. 


Theoretically an AD store could be either local (RocksDB and MemDB) or centralized (CouchBase), however for most cases the use of a centralized data store is more of a side effect of the lack of a local adjunct data store. For now we defer the support of a centralized adjunct data store until we see clear evidence.

Having adjunct data store would enable a number of use cases
  • Automatic maintenance of local cache
  • Table oriented operations

Proposed Changes

The proposed changes include

  • Introduce a new construct AdjunctDataStoreManager, its responsibility includes
    • Maintains mapping between system streams and adjunct data stores
    • Extract the key and value from an IncomingMessageEnvelop
    • Save data in stores
  • Introduce container level storage engines to support broadcast streams as sources
  • TaskInstance will be modified to consult AdjunctDataStoreManager before delivering a message to a task; a message is stored in an adjunct store, it is not delivered to tasks
  • Introduce new configurations to "associate" a Samza K/V store to a system stream
  • Provide hooks to project an incoming message to desired types (this is useful to store a subset of the incoming message or customized transformation)

Data partitioning

For small datasets that can easily fit in a local store, the AD stream can be configured as broadcast stream and delivered to all containers. Within a container, one AD store instance per stream is instantiated and shared among all tasks within that container. 

 For datasets too large to fit in a local store, the AD stream has to be partitioned, there is one AD store instance per stream and task. 


Adjunct data store is particularly useful to store data delivered through a database change capture (CDC) system from a database. It is important to understand the capabilities of this feature and its limitations. In general, best attempt will be made to provide consistency at different levels without sacrificing too much other aspects. Sections below will discuss consistency guarantees provided for unbounded and bounded datasets.

Unbounded dataset

When the content of a database is delivered by a CDC system, though the size of the dataset i.e. the database is limited, however its changes can continue forever, therefore unbounded from the receiver's perspective. The concept of versioning isn't necessary. But it is important for adjunct data store to be able to provide a consistent snapshot of the database, this can be achieved as long as the CDC system provides ordering and at least once delivery guarantees. When a dataset is delivered through one stream, we guarantee consistency at container level, i.e. once bootstrap is complete the adjunct data store can be treated as a snapshot of the database within a container. When a dataset is delivered through multiple streams, we guarantee consistency at task level, i.e. once bootstrap is complete the adjunct data store can be treated as a fraction of a snapshot of the database within a task. No guarantee is provided at job level.

Bounded dataset

When the sources are read-only files, for example a machine learning model, they are by nature size-bounded. However, we should expect new versions of the dataset to be produced over time. It is desirable to be able to incorporate new versions without interrupting current operation. Similar to unbounded dataset, a copy of a set of files can be produced by bootstrap process, and thereafter processing of main input follows. The requires the delivery system (system connector) to be able to inject markers in a stream to signal the end of a dataset. When an adjunct data store sees the marker, it knows the current dataset is complete, and it can "seal" the store and prepare for the next version. Any data coming thereafter would be stored in the next version. While building a new version, an adjunct data store continues to serve the current version, after the new version is built, it switches to the new version and discards the old version. This can work seamlessly and user would only see one versions at a time. 

When a file is delivered through one stream (unpartitioned), we guarantee a consistent snapshot (copy) of the entire file at container level; when a file is delivered through multiple streams (partitioned), we guarantee a consistent fraction of the snapshot at task level. No guarantee is provided at job level.

For record based datasets that are addressable using keys, at least once delivery semantics is sufficient. However, if the underlying dataset is not K/V in nature, exactly once semantics might be needed.

As the solution for bounded dataset is not out of scope of this proposal, this is still subject to future changes.


When an AD stream is marked as bootstrap, it guarantees that an initial snapshot is built before processing of input streams starts; otherwise input stream and AD streams are processed at the same time. After bootstrap, for change capture data, we keep updating its AD store when new updates arrives.

Key and Value

By Default, we use the key and value in IncomingMessageEnvelop to store an AD data item in a store, user provides serde as done today. Configuration options will be provided to allow user to

  • construct user defined keys
  • construct user defined values

User defined objects will be treated as POJO's, and we may provide a default serde for POJO.

Broadcast Streams

Broadcast streams are shared by all tasks within a container, likewise fashion when a broadcast stream is associated to a adjunct store, its content is visible to all tasks as well. To support this, we will introduce storage engines at container level. When a is marked broadcast using configuration task.broadcast.inputs and also associated to an adjunct data store, the storage engine will be instantiated at container level. In parallel to TaskStorageManager, we will introduce class ContainerStorageManager, which manages the lifecycle of underlying stores. The population of such a store is designated to one of the tasks, the rest of the tasks will only read from the store.


A job or container could be bought down intentionally or unintentionally, in either case it is required to recover the states before shutdown. The recovery process is carry out during startup, specifically for adjunct data stores, the requirement is to recover the state of stores to prior to shutdown as much as possible. Aspects to be considered are bootstrap/non-bootstrap and persistent/non-persistent stores.

Persistent stores

The table below summarizes the decision to be taken to recover for persistent stores

 No store foundStore found but not validStore found and valid
Stream marked for bootstrap

Create new store

Force bootstrap

Re-create store

Force bootstrap

Continue from last offset
Stream not marked for bootstrap

Create new store

Continue from last offset

Re-create store

Continue from last offset
Continue from last offset


Non-persistent stores

For non-persistent stores, bootstrap is always forced if a stream is marked for bootstrap; otherwise continue from last offset

Store validity detection

There is multiple ways to detect if a store is valid, the initial implementation will compare the latest timestamp of files with current time, if it's larger than a predefined value, a store is deemed invalid. In the future, we may record latest offset consumed before shutdown and compare it with current offset available.


During initialization of a Samza container, the following steps will be added to the sequence

  • During container initialization
    • All adjunct stores associated broadcast are retrieved from configuration
    • Instantiate each store using last step as input
    • Instantiate ContainerStorageManager
  • During task instance initialization
    • Instantiate storage engines and TaskStorageEngineManager (current flow)
    • Instantiate AdjunctDataStorageManager, it takes configuration, ContainerStorageManager and TaskStorageManager as input



Factory class to instantiate an adjunct data store manager. If not configured, a built-in implementation will be used.

stores.<store>.adstore.input The name of the system stream to be associated with this store, a store can only be associated to one stream
stores.<store>.adstore.key.value.extractor.factory Factory class to instantiate a key/value extractor object that extracts keys and value from an IncomingMessageEnvelop. If not provided, the key and value in IncomingMessageEnvelop is used.

In addition to configuration above, the following configuration items may be necessary

  • systems.<system>.streams.<stream>.samza.bootstrap: true
  • systems.<system>.streams.<stream>.samza.reset.offset: true

The latter is needed when a memory store is used

Implementation notes

In addition to TaskStorageManager, two additional classes will be introduced: AdjunctDataStoreManager, ContainerStorageManager. Class AdjunctDataStoreManager is introduced mainly to encapsulate adjunct data related functionality. See next section for its responsibilities. ContainerStorageManager provides a subset of functionality of TaskStorageManager, but at container level. It may be help to extract a base class from these two classes to avoid duplication. It's main responsibility includes

  • Maintain directories for the underlying stores (creation, deletion)
  • Detect if the stores under management is still valid during startup
  • Delegate operations such as flush, commit to underlying stores

Key classes


 * Factory class for AdjunctDataStoreManager
public interface AdjunctDataStoreManagerFactory {
    AdjunctDataStoreManager getAdjunctDataStoreManager(
      TaskName taskName,
      Config config,
      Collection<SystemStreamPartition> ssps,
      BaseStorageManager containerStorageManager,
      BaseStorageManager taskStorageManager,
      boolean shouldWriteToContainerStores,
      OffsetManager offsetManager);


 * An AdjunctDataStoreManager instance is responsible for
 * 1. maintaining the mapping between system streams and adjunct data stores
 * 2. extracting the key and value from an IncomingMessageEnvelop
 * 3. populating adjunct data stores
 * 4. Determine system stream partitions, whose offset need to be reset during startup
public interface AdjunctDataStoreManager {
    * Invoked before a message is passed to a task
    * @returns true if the message is saved in a store, false otherwise
    AdjunctDataStoreManager boolean process(IncomingMessageEnvelop message);


 * A factory to instantiate key/value extractors
public interface KeyValueExtractorFactory {
    KeyValueExtractor getKeyValueExtractor(Config config);


 * A key/value extractor that extracts key and value from in incoming
 * message envelop
public interface KeyValueExtractor {
    Object getKey(IncomingMessageEnvelop input);
    Object getValue(IncomingMessageEnvelop input);

Public Interfaces

No changes to public interface

Implementation and Test Plan

  • Introduce interfaces and implementation described above
  • Add unit tests to test and verify different scenarios
    • plain
    • bootstrap
    • broadcast
    • failover
  • Verify in different deployment environment, note: implementation should be agnostic to deployment environment (YARN, Hadoop or standalone). 

Compatibility, Deprecation, and Migration Plan

As this is a new feature, no plans are required for compatibility, deprecation and migration.

Rejected Alternatives

Scope of an AD store

Within a container, we have multiple choices:
  1. One AD store instance per stream - this option guarantees a consistent view within a container with side effects. Since data in AD streams may arrive in different times, the maintenance of a consistent snapshot becomes difficult. As multiple versions of a dataset evolves, we potentially may have to maintain a large number of intermediate snapshot versions.

  2. One AD store instance per stream partition - this option guarantees a consistent view at stream partition level. Querying the store becomes a problem: when multiple partitions are assigned to a task, we have no way of knowing which store instance holds the value resides and end up querying all instances. The implementation might be a bit simpler, but the price is very high.

  3. One AD store instance per stream and task - this is the compromise between 1 and 2, here we guarantee a consistent view per task. If multiple partitions are assigned to a task, we potentially still have to maintain multiple intermediate snapshot versions of a AD store instance, but number of versions is lower than in #1. Since within LI no one is using the custom system partition grouper feature, only open source portion will be impacted.

#3 chosen

AD store abstraction

For bounded datasets delivered (such as files) as streams, the adjunct data store may need to maintain multiple versions of underlying stores in order to guarantee serving of a consistent version. This would require the abstraction of adjunct data store, which manages internally versions of underlying stores.

Key and value extractor

Another option considered is to embed key extraction and value conversion logic in serde layer. It will work, but will make serdes asymmetric and less portable.



  • No labels