DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Under Discussion"
Discussion thread: TDB
JIRA: KAFKA-15143 - Getting issue details... STATUS
Released: 3.8 (target)
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When KIP-478 and KIP-820 were implemented and merged, they introduced the new Processor API, replacing the previously weakly-typed Transformer that were used.
This change to the Processor API also introduced the MockProcessorContext class to the Kafka Streams test-utils to enable unit testing for any topologies, however, with the strongly-typed nature of the API, FixedKeyProcessor cannot use ProcessorContext and in addition the MockProcessorContext, this KIP adds the MockFixedKeyProcessorContext to enable unit testing for FixedKeyProcessor as well.
Along with the introduction of the MockFixedKeyProcessorContext, another interface - TestFixedKeyRecordFactory, is also required to allow for the creation of FixedKeyRecords in a testing capacity, since the existing InternalFixedKeyRecordFactory should not be used for such a scenario. Internally to the TestFixedKeyRecordFactory the InternalFixedKeyRecordFactory is called to abstract any changes away from the public API.
Public Interfaces
The new MockFixedKeyProcessorContext class.
// module: kafka-streams-test-utils
// package: org.apache.kafka.streams.processor.api;
public class MockFixedKeyProcessorContext<K, V> extends MockProcessorContext<K, V> implements FixedKeyProcessorContext<K, V> {
public MockFixedKeyProcessorContext();
public MockFixedKeyProcessorContext(final Properties config);
public MockFixedKeyProcessorContext(final Properties config, final TaskId taskId, final File dummyFile);
@Override
public <K1 extends K, V1 extends V> void forward(final FixedKeyRecord<K1, V1> record);
@Override
public <K1 extends K, V1 extends V> void forward(final FixedKeyRecord<K1, V1> record, final String childName),
}
The new TestFixedKeyRecordFactory class
// module: kafka-streams-test-utils
// package: org.apache.kafka.streams.processor.api;
public final class TestFixedKeyRecordFactory {
private TestFixedKeyRecordFactory();
public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value, final Headers headers);
public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value, final long timestamp);
public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value);
public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value, final long timestamp, final Headers headers);
// All methods call this method and this calls InternalFixedKeyRecordFactory
public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final Record<K, V> record);
}
Proposed Changes
We add new classes to allow testing custom processor that implement the FixedKeyProcessor class.
Compatibility, Deprecation, and Migration Plan
This KIP only adds a new class, so there is no compatibility concerns.
Test Plan
Regular unit-testing is sufficient.
Documentation Plan
The section on testing should be updated: https://kafka.apache.org/37/documentation/streams/developer-guide/testing.html
Rejected Alternatives
n/a