Status

Current stateUnder Discussion

Discussion thread: here and here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP by Julien Brunet, Adam Souquières, Sébastien Viale, Marie-Laure Momplot

The current TopologyTestDriver only supports test input and output topics with a single partition, and therefore does not allow testing of topologies involving repartition operations.

The inability to test multi-partitioned streams leads to complex manual testing that cannot be automated or scripted — or, in many cases, results in missing tests altogether.

As a workaround, developers often rely on the EmbeddedKafkaCluster to run integration-style tests with multiple partitions. However, this approach requires spinning up a local Kafka cluster, managing configurations, and dealing with non-trivial setup and teardown logic. This makes it less user-friendly, slower, and harder to integrate into fast-running unit test suites.

For instance, when a key is modified before applying the .process() operator, Kafka Streams does not automatically create a repartition topic. This behavior can cause issues that go undetected when relying solely on single-partition unit tests.

This KIP proposes to introduce multi-partition support in the TopologyTestDriver, enabling more accurate and convenient stream testing while improving automated unit test coverage.

Public Interfaces

TopologyTestDriverBuilder (new class) 

A new builder class is introduced to support the multi-partition setup phase:

public class TopologyTestDriverBuilder {
    public TopologyTestDriverBuilder(Topology topology);
    public TopologyTestDriverBuilder withConfig(Properties config);
    public TopologyTestDriverBuilder withInitialWallClockTime(Instant initialWallClockTime);
    public TopologyTestDriverBuilder declareTopic(String topicName, int partitions);
    public TopologyTestDriver build();
}

Multi-partition mode is implicitly enabled when at least one topic is declared with a partition count greater than 1.

TopologyTestDriver class changes 

  • Add new methods to get state stores by partition
// New per-partition store accessor ─────────────────────────────────
public StateStore getStateStore(final String name, final int partition) 
public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name, final int partition)
public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name, final int partition)
public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(final String name, final int partition)
public <K, V> WindowStore<K, V> getWindowStore(final String name, final int partition)
public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name, final int partition)
public <K, V> SessionStore<K, V> getSessionStore(final String name, final int partition)

public <K, V> KeyValueStore<K, ValueTimestampHeaders<V>> getTimestampedKeyValueStoreWithHeaders(final String name, final int partition)
public <K, V> WindowStore<K, ValueTimestampHeaders<V>> getTimestampedWindowStoreWithHeaders(final String name, final int partition)
public <K, V> SessionStoreWithHeaders<K, V> getSessionStoreWithHeaders(final String name, final int partition)

TestRecord class changes 

  • Add a new constructor to TestRecord that accepts a partition number as an additional argument.

TestRecord(final K key, final V value, final Headers headers, final Instant recordTime, final int partition) {}
  • All existing constructors that do not take an explicit partition argument default partition to -1, indicating "no explicit partition set".
  • Update equals() and hashCode() to include the partition field. Existing code that does not set a partition explicitly will produce partition = -1 on both sides of an equality check, preserving backward compatibility.
  • Add a utility method for partition-agnostic comparison:
public boolean equalsIgnorePartition(final Object o) {}

This allows tests that do not care about partition placement to use assertTrue(expected.equalsIgnorePartition(actual)).

Note that -1 never appears in output TestRecords , records read from readRecordsToList() always carry the real resolved partition.

-1 only exists as a sentinel value on input records where no explicit partition was set.

Proposed Changes

Internally, TopologyTestDriver will maintain topic metadata that includes:

  • The number of partitions for each topic

  • A task per partition to simulate the log structure

To avoid introducing unnecessary complexity when supporting multiple partitions and tasks, both in the TopologyTestDriver implementation and for users, we propose introducing a dedicated setup phase.

Instead of dynamically creating and updating tasks as topics are defined or as records are piped, all input and output topics must be created upfront, before any records can be processed.

The revised control flow would be:

Setup phase (builder chain):


TopologyTestDriverBuilder is the entry point for multi-partition tests. The driver enters multi-partition mode when at least one declared topic has more than one partition. 

Topics declared with a count of 1 (or not declared at all via the legacy constructors) retain existing single-partition semantics.

  1.  Instantiate TopologyTestDriver with TopologyTestDriverBuilder.
  2. Declare all needed configurations via withConfig() and withInitialWallClockTime().
  3.  Declare topic partition counts via declareTopic(name, n).
  4.  Call build() — this creates all Tasks and GlobalTasks.

Only after build() can records be piped and outputs be read.

If all topics are declared with a partition count of 1, or no topics are declared at all, the driver behaves identically to the legacy constructors — the builder is simply a cleaner alternative in that case too.

Existing constructors internally remain the preferred path for legacy single-partition tests and require no migration.

Execution phase (identical in both modes): 

  1. Create input/output topic handles via createInputTopic / createOutputTopic. 
  2. Pipe records via pipeInput: the driver resolves the target partition  by explicit partition on the TestRecord, or by key hash (murmur2 % n,  consistent with BuiltInPartitioner).
  3. After each pipeInput, the driver drains all tasks to quiescence before  returning: tasks are processed in ascending stream-time order, with ascending (subtopologyId, partition) as a deterministic tie-breaker when stream-times are equal (e.g. immediately after a fan-out).
  4.  Read outputs via readRecordsToList(): filter by partition if needed.

Partition routing

When a record is piped via pipeInput, the target partition is resolved using the following priority order: 

  1. Explicit partition: if a partition number is set on the TestRecord, it is used directly. If it is out of range [0, n), an IllegalArgumentException is thrown.
  2. Null key: if no explicit partition is set and the key is null, the record is distributed via round-robin across all partitions of the topic.
  3. Key-based routing: if no explicit partition is set and the key is non-null, the partition is computed as:
    Utils.toPositive(Utils.murmur2(keyBytes)) % n which matches the behaviour of BuiltInPartitioner.

Cases 2 and 3 will be handled internally by a new TopologyTestDriverPartitioner that implements the Partitioner interface

Processing order on fan-out

When a single pipeInput causes records to fan out across multiple downstream tasks, the driver drains all tasks to quiescence before returning.

On each scheduling step, the task with the lowest current stream-time is selected next.

When multiple tasks share the same stream-time, which is the common case immediately after a fan-out, since all newly-enqueued downstream tasks start with no prior stream-time, the tie is broken by ascending (subtopologyId, partition), i.e. TaskId natural order.

This ordering is deterministic and stable, guaranteed by the TreeMap<TaskId, StreamTask> used internally to store tasks.

Store access in multi-partition mode

 In multi-partition mode, state stores are partitioned: each task owns its own store instance. 

  1. Partitioned stores (non-global): the no-argument accessors  getStateStore(), getKeyValueStore(), getSessionStore(), and getWindowStore() throw IllegalStateException in multi-partition mode,  since no single partition can be inferred. 
    The per-partition overload  must be used instead: 
       driver.getKeyValueStore("counts", 0); // partition 0
       driver.getKeyValueStore("counts", 1); // partition 1
  2. Global stores: global stores are not partitioned, they accumulate  records from all partitions into a single shared instance.
    The
    no-argument accessors remain valid and unchanged in multi-partition  mode.

Summary of Contract

The multi-partition functionality introduced by this KIP is strictly opt-in and does not affect existing test setups.

The existing usages of TopologyTestDriver remain fully unchanged. In particular, current code relying on the default behavior where topics are not explicitly declared prior to use will continue to work as before.

  • Single-partition mode (default):
    • fully backward compatible
    • no setup phase required
    • identical behavior to current TopologyTestDriver
  • Multi-partition mode (opt-in):
    • requires explicit setup phase
    • all topics and partitions must be declared upfront
    • execution is deterministic and partition-aware
    • state stores become partition-scoped

Example Usage Multi-Partition Mode

// Multi-partition mode (new)
TopologyTestDriver driver = new TopologyTestDriverBuilder(topology)
    .withConfig(config)
    .withInitialWallClockTime(Instant.now())
    .declareTopic("topic1", 3)
    .declareTopic("topic2", 3)
    .build();

TestInputTopic<String, String> input = driver.createInputTopic(
    "input", Serdes.String().serializer(), Serdes.String().serializer());
TestOutputTopic<String, String> output = driver.createOutputTopic(
    "output", Serdes.String().deserializer(), Serdes.String().deserializer());

// Pipe with explicit partition
input.pipeInput(new TestRecord<>("key0", "value0", 1000L, 0));
// Pipe with key-based routing (murmur2 % 3)
input.pipeInput("key1", "value1");
// Null key → distributed round-robin across partitions (partition = counter++ % 3)
// First call  → partition 0
// Second call → partition 1
// Third call  → partition 2
input.pipeInput(new TestRecord<>(null, "value2", 1000L));
input.pipeInput(new TestRecord<>(null, "value3", 1000L));
input.pipeInput(new TestRecord<>(null, "value4", 1000L));

// Read all outputs, filter by partition if needed
List<TestRecord<String, String>> all = output.readRecordsToList();
List<TestRecord<String, String>> p0  = all.stream()
    .filter(r -> r.getPartition() == 0)
    .collect(Collectors.toList());

Example Usage Single-Partition Mode

// OPTION 1
// Single-partition mode (unchanged, zero migration cost)
TopologyTestDriver driver = new TopologyTestDriver(topology, config);

TestInputTopic<String, String> input = driver.createInputTopic(
    "input", Serdes.String().serializer(), Serdes.String().serializer());
TestOutputTopic<String, String> output = driver.createOutputTopic(
    "output", Serdes.String().deserializer(), Serdes.String().deserializer());

input.pipeInput("key1", "value1"); // identical to current behaviour


// OPTION 2
// Single-partition mode using TopologyTestDriverBuilder - no need to declare topics
TopologyTestDriver driver = new TopologyTestDriverBuilder(topology)
    .withConfig(config)
    .withInitialWallClockTime(Instant.now())
    .build();

TestInputTopic<String, String> input = driver.createInputTopic(
    "input", Serdes.String().serializer(), Serdes.String().serializer());
TestOutputTopic<String, String> output = driver.createOutputTopic(
    "output", Serdes.String().deserializer(), Serdes.String().deserializer());

input.pipeInput("key1", "value1"); // identical to current behaviour
 

Compatibility, Deprecation, and Migration Plan

  • Backward compatible: All existing TopologyTestDriver constructors, methods, and behaviors remain unchanged and compatible. Existing tests require no modification. 

Single-partition mode observes strictly identical behaviour to the current release: no new code path is entered, no state changes occur, and no risk of regression exists for pre-existing tests. 

  • Topics not declared via the builder default to a single partition, preserving current semantics.
  • All existing TestRecord constructors default partition to -1. Existing assertEquals calls between two records created without an explicit partition remain valid since both sides will carry partition = -1
  • There is no distinct single-partition mode in the builder. A topic declared with 1 partition simply has one partition, and the same code path is used regardless. This means the existing TopologyTestDriver constructors can be deprecated and eventually removed in a follow-up KIP without loss of functionality, users will simply migrate to the builder with declareTopic("topic", 1) or no declaration at all.

Note:

In single-partition mode, the null-key rule is a no-op since partition 0 is the only partition. 

Test Plan

Add unit tests verifying:

  1. Multi-partition input topics route data correctly based on:

    • Explicit partition

    • Key-based partitioning

    • Null-key records are distributed via round-robin across all partitions, and never cause a NullPointerException.
  2. Repartition and join operations behave as expected across multiple partitions.

  3. Backward compatibility: 1-partition default still behaves identically to current implementation.

Rejected Alternatives

Defining the partition number directly in the TopologyTestDriver constructor would simplify the setup. However, this approach restricts testing to topics that share the same number of partitions. To better emulate the actual Kafka behavior, we opted to specify the partition number in the input and output topic declarations instead.

Adding a partition argument to TestInputTopic and TestOutputTopic would introduce many additional overloads. It would be cleaner and more consistent with the Producer API to specify the partition number on the TestRecord instead.

Dynamically updating the Tasks and GlobalTasks whenever an input topic is created with a partition number higher than the current maximum was also considered. However, this would introduce additional internal complexity and require mutating tasks at runtime, making the driver lifecycle less predictable.


  • No labels