Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP by Julien Brunet, Adam Souquières, Sébastien Viale, Marie-Laure Momplot

The current TopologyTestDriver only supports test input and output topics with a single partition, and therefore does not allow testing of topologies involving repartition operations.

The inability to test multi-partitioned streams leads to complex manual testing that cannot be automated or scripted — or, in many cases, results in missing tests altogether.

As a woraround, developers often rely on the EmbeddedKafkaCluster to run integration-style tests with multiple partitions. However, this approach requires spinning up a local Kafka cluster, managing configurations, and dealing with non-trivial setup and teardown logic. This makes it less user-friendly, slower, and harder to integrate into fast-running unit test suites.

For instance, when a key is modified before applying the .process() operator, Kafka Streams does not automatically create a repartition topic. This behavior can cause issues that go undetected when relying solely on single-partition unit tests.

This KIP proposes to introduce multi-partition support in the TopologyTestDriver, enabling more accurate and convenient stream testing while improving automated unit test coverage.

Public Interfaces

TopologyTestDriver class changes 

  • Add a new method createInputTopic to TopologyTestDriver, which includes a parameter for the partition number.
  • Add a new method createOutputTopic to TopologyTestDriver, which includes a parameter for the partition number.
public final <K, V> TestInputTopic<K, V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Instant startTimestamp, Duration autoAdvance, int partition) {}
public final <K, V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer, int partition) {}

TestRecord class changes 

  • Add a new constructor to TestRecord that accepts a partition number as an additional argument.

TestRecord(final K key, final V value, final Headers headers, final Instant recordTime, int partition) {}

Proposed Changes

Internally, TopologyTestDriver will maintain topic metadata that includes:

  • The number of partitions for each topic

  • A task per partition to simulate the log structure

To avoid introducing unnecessary complexity when supporting multiple partitions and tasks—both in the TopologyTestDriver implementation and for users—we propose introducing a dedicated setup phase.

Instead of dynamically creating and updating tasks as topics are defined or as records are piped, all input and output topics must be created upfront, before any records can be processed.

The revised control flow would be:

  1. Instantiate the TopologyTestDriver.

  2. Create all required input and output topics.

  3. Call an explicit initialization method which:

    • Creates the corresponding Tasks and GlobalTasks.

    • Completes internal setup.


  4. Only after initialization can records be piped and outputs be read.

When a record is sent via pipeInput, it is routed to the corresponding task based on:

  1. The explicitly provided partition number in the TestRecord or

  2. The partition determined by the key hash (consistent with DefaultPartitioner behavior)

Example Usage

// Setup and initialization
TopologyTestDriver driver = new TopologyTestDriver(...);
driver.createInputTopic("topic1", 2);
driver.createOutputTopic("topic2", 2);

// Topics exist but cannot pipe records yet or number of partition is 1 by default
driver.init(); // tasks and global tasks are created

// Partition 0 and 1 are valid
TestRecord<String, String> record0 = new TestRecord<>("key0", "value0", 1000L, 0);
TestRecord<String, String> record1 = new TestRecord<>("key1", "value1", 1000L, 1);

inputTopic.pipeInput(record0);
inputTopic.pipeInput(record1);

// Invalid partition → exception
TestRecord<String, String> record2 = new TestRecord<>("key2", "value2", 1000L, 2);
inputTopic.pipeInput(record2); // throws IllegalArgumentException

// Reading output
List<TestRecord<String, String>> outputRecords = outputTopic.readRecordsToList();

// Filter by partition if needed
List<TestRecord<String, String>> partition0Records = outputRecords.stream()
.filter(r -> r.getPartition() == 0)
.collect(Collectors.toList());

Compatibility, Deprecation, and Migration Plan

  • Backward compatible: Existing methods and behavior remain unchanged.

    Existing tests that rely on the current single-partition behavior will continue to work without modification. If no partition count is specified (defaulting to one partition), the TopologyTestDriver may automatically initialize internally to preserve existing behavior.

    The explicit setup phase is only required when using the new multi-partition setup. This ensures:

    • No breaking changes to existing tests.

    • A clear and deterministic setup phase for multi-partition scenarios.

    • A smooth migration path for users who wish to adopt the new functionality incrementally.


  • No existing code or tests will break

Test Plan

Add unit tests verifying:

  1. Multi-partition input topics route data correctly based on:

    • Explicit partition

    • Key-based partitioning

  2. Repartition and join operations behave as expected across multiple partitions.

  3. Backward compatibility: 1-partition default still behaves identically to current implementation.

Rejected Alternatives

Defining the partition number directly in the TopologyTestDriver constructor would simplify the setup. However, this approach restricts testing to topics that share the same number of partitions. To better emulate the actual Kafka behavior, we opted to specify the partition number in the input and output topic declarations instead.

Adding a partition argument to TestInputTopic and TestOutputTopic would introduce many additional overloads. It would be cleaner and more consistent with the Producer API to specify the partition number on the TestRecord instead.

Dynamically updating the Tasks and GlobalTasks whenever an input topic is created with a partition number higher than the current maximum was also considered. However, this would introduce additional internal complexity and require mutating tasks at runtime, making the driver lifecycle less predictable.


  • No labels