DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
The number of partitions for each topic
A task per partition to simulate the log structure
To avoid introducing unnecessary complexity when supporting multiple partitions and tasks—both in the TopologyTestDriver implementation and for users—we propose introducing a dedicated setup phase.
Instead of dynamically creating and updating tasks as topics are defined or as records are piped, all input and output topics must be created upfront, before any records can be processed.
The revised control flow would be:
Instantiate the
TopologyTestDriver.Create all required input and output topics.
Call an explicit initialization method which:
Creates the corresponding Tasks and GlobalTasks.
Completes internal setup.
Only after initialization can records be piped and outputs be read.
When a record is sent via pipeInput, it is routed to the corresponding task based on:
The explicitly provided partition number in the TestRecord or
The partition determined by the key hash (consistent with
DefaultPartitionerbehavior)
Example Usage
| Code Block | ||
|---|---|---|
| ||
// Setup and initialization
TopologyTestDriver driver = new TopologyTestDriver(...);
driver.createInputTopic("topic1", 2);
driver.createOutputTopic("topic2", 2);
// Topics exist but cannot pipe records yet or number of partition is 1 by default
driver.init(); // tasks and global tasks are created
// Partition 0 and 1 are valid
TestRecord<String, String> record0 = new TestRecord<>("key0", "value0", 1000L, 0);
TestRecord<String, String> record1 = new TestRecord<>("key1", "value1", 1000L, 1);
inputTopic.pipeInput(record0);
inputTopic.pipeInput(record1);
// Invalid partition → exception
TestRecord<String, String> record2 = new TestRecord<>("key2", "value2", 1000L, 2);
inputTopic.pipeInput(record2); // throws IllegalArgumentException
// Reading output
List<TestRecord<String, String>> outputRecords = outputTopic.readRecordsToList();
// Filter by partition if needed
List<TestRecord<String, String>> partition0Records = outputRecords.stream()
.filter(r -> r.getPartition() == 0)
.collect(Collectors.toList());
|
...
Backward compatible: Existing methods and behavior remain unchanged.
Existing tests that rely on the current single-partition behavior will continue to work without modification. If no partition count is specified (defaulting to one partition), the
TopologyTestDrivermay automatically initialize internally to preserve existing behavior.The explicit setup phase is only required when using the new multi-partition setup. This ensures:
No breaking changes to existing tests.
A clear and deterministic setup phase for multi-partition scenarios.
A smooth migration path for users who wish to adopt the new functionality incrementally.
No existing code or tests will break
...
Multi-partition input topics route data correctly based on:
Explicit partition
Key-based partitioning
Repartition and join operations behave as expected across multiple partitions.
Backward compatibility: 1-partition default still behaves identically to current implementation.
Rejected Alternatives
Defining the partition number directly in the TopologyTestDriver constructor would simplify the setup. However, this approach restricts testing to topics that share the same number of partitions. To better emulate the actual Kafka behavior, we opted to specify the partition number in the input and output topic declarations instead.
Adding a partition argument to TestInputTopic and TestOutputTopic would introduce many additional overloads. It would be cleaner and more consistent with the Producer API to specify the partition number on the TestRecord instead.
Dynamically updating the Tasks and GlobalTasks whenever an input topic is created with a partition number higher than the current maximum was also considered. However, this would introduce additional internal complexity and require mutating tasks at runtime, making the driver lifecycle less predictable.