Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When writing low-level Processors and Transformers that are stateful using kafka streams, often the processors (or transformers, I'll use "processors" to refer to both for brevity) want to "own" one or more state stores, the details of which are not important to the business logic of the application.  However, when incorporating these into a topology defined by the high level DSL, using KStream:process, you're forced to specify the state store names so the topology is wired up correctly.  This creates a clumsy pattern where the "owned" state store's name must be passed alongside the TransformerSupplier, when the supplier itself could just as easily supply that information on their own.

...

This allows for the same "reads top to bottom" type of clarity as when using Processors (and Transformers) as when using the high-level DSL.

Public Interfaces

Add an interface StateStoreConnector that allows the implementor to specify state stores that should be connected to this processor/transformer (defaulting to no stores).

StateStoreConnector

Code Block
public interface StateStoreConnector {
    default List<StoreBuilder> stateStores() {
        return Collections.emptyList();
    }
}

...

Change all Processor/TransformerSupplier  interfaces to extend from it:


TransformerSupplier

Code Block
public interface TransformerSupplier<K, V, R> extends StateStoreConnector {
    ...
}

ValueTransformerSupplier

Code Block
public interface ValueTransformerSupplier<V, VR> extends StateStoreConnector {
    ...
}

ValueTransformerWithKeySupplier

Code Block
public interface ValueTransformerWithKeySupplier<K, V, VR> extends StateStoreConnector {
    ...
}

ProcessorSupplier

Code Block
public interface ProcessorSupplier<K, V> extends StateStoreConnector {
    ...
}

...

Additionally, the process method would add the StoreBuilders to the topology using builder.addStateStore() rather than requiring the user to do it themselves.  In order to solve the problem of addStateStore potentially being called twice for the same store (because more than one Supplier specifies it), the check for duplicate stores in addStateStores will be relaxed so allow for duplicates if the same StoreBuilder instance for the same store name.

Compatibility, Deprecation, and Migration Plan

Because the added interface methods are default with a reasonable default, those additions are backwards compatible.  However, given that now there would be two ways to "connect" state stores to a low level processor, we would have to specify how they behave together.  I believe the correct decision is to enforce that for a given call of stream.process(...)  or stream.transform(...) it should only be possible to specify state stores one of the two ways, either through the stateStoreNames  argument or by implementing the stateStores method of StateStoreConnector .  Attempting to do both should cause an exception to be thrown making it clear that the user should choose one way or the other.

No migration tools are required since it's a relatively minor library change.

Alternatives

Have the added method on the Supplier interfaces only return store names, not builders

This solves the original issue only partially, but with perhaps less "API risk."  The String... stateStoreNames argument would no longer be needed on the KStream methods, but the user would still need to manually add the StoreBuilders to the Topology.  The downside is we don't achieve the full reduction of "wiring up" code required when building the topology (the user still needs to know to call topology.addStateStore()), but the upside is that the StoreBuilder is less coupled to the *Supplier.  I don't consider this upside significant, but perhaps there are other use cases I'm not considering.

Do nothing

This is a "quality of life" API improvement, and nothing more, so maybe it's unneeded churn.  I favor doing something (obviously) because I think that while small, this change can be a major usability improvement for the low-level API.

...