Status
Current state: "Under Discussion"
Discussion thread: TDB
JIRA: - KAFKA-15143Getting issue details... STATUS
Released: 3.8 (target)
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When KIP-478 and KIP-820 were implemented and merged, they introduced the new Processor API, replacing the previously weakly-typed Transformer that were used.
This change to the Processor API also introduced the MockProcessorContext
class to the Kafka Streams test-utils to enable unit testing for any topologies, however, with the strongly-typed nature of the API, FixedKeyProcessor
cannot use ProcessorContext
and in addition the MockProcessorContext
, this KIP adds the MockFixedKeyProcessorContext
to enable unit testing for FixedKeyProcessor
as well.
Along with the introduction of the MockFixedKeyProcessorContext
, another interface - TestFixedKeyRecordFactory,
is also required to allow for the creation of FixedKeyRecords
in a testing capacity, since the existing InternalFixedKeyRecordFactory
should not be used for such a scenario. Internally to the TestFixedKeyRecordFactory
the InternalFixedKeyRecordFactory
is called to abstract any changes away from the public API.
Public Interfaces
The new MockFixedKeyProcessorContext
class.
// module: kafka-streams-test-utils // package: org.apache.kafka.streams.processor.api; public class MockFixedKeyProcessorContext<K, V> extends MockProcessorContext<K, V> implements FixedKeyProcessorContext<K, V> { public MockFixedKeyProcessorContext(); public MockFixedKeyProcessorContext(final Properties config); public MockFixedKeyProcessorContext(final Properties config, final TaskId taskId, final File dummyFile); @Override public <K1 extends K, V1 extends V> void forward(final FixedKeyRecord<K1, V1> record); @Override public <K1 extends K, V1 extends V> void forward(final FixedKeyRecord<K1, V1> record, final String childName), }
The new TestFixedKeyRecordFactory
class
// module: kafka-streams-test-utils // package: org.apache.kafka.streams.processor.api; public final class TestFixedKeyRecordFactory { private TestFixedKeyRecordFactory(); public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value, final Headers headers); public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value, final long timestamp); public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value); public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value, final long timestamp, final Headers headers); // All methods call this method and this calls InternalFixedKeyRecordFactory public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final Record<K, V> record); }
Proposed Changes
We add new classes to allow testing custom processor that implement the FixedKeyProcessor
class.
Compatibility, Deprecation, and Migration Plan
This KIP only adds a new class, so there is no compatibility concerns.
Test Plan
Regular unit-testing is sufficient.
Documentation Plan
The section on testing should be updated: https://kafka.apache.org/37/documentation/streams/developer-guide/testing.html
Rejected Alternatives
n/a