Status

Current state: "Under Discussion"

Discussion thread: TDB

JIRA: KAFKA-15143 - Getting issue details... STATUS

Released: 3.8 (target)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When KIP-478 and KIP-820 were implemented and merged, they introduced the new Processor API, replacing the previously weakly-typed Transformer that were used.

This change to the Processor API also introduced the MockProcessorContext class to the Kafka Streams test-utils to enable unit testing for any topologies, however, with the strongly-typed nature of the API, FixedKeyProcessor cannot use ProcessorContext and in addition the MockProcessorContext, this KIP adds the MockFixedKeyProcessorContext to enable unit testing for FixedKeyProcessor as well.

Along with the introduction of the MockFixedKeyProcessorContext, another interface - TestFixedKeyRecordFactory, is also required to allow for the creation of FixedKeyRecords in a testing capacity, since the existing InternalFixedKeyRecordFactory  should not be used for such a scenario. Internally to the TestFixedKeyRecordFactory  the InternalFixedKeyRecordFactory  is called to abstract any changes away from the public API.

Public Interfaces

The new MockFixedKeyProcessorContext class.

// module: kafka-streams-test-utils
// package: org.apache.kafka.streams.processor.api;

public class MockFixedKeyProcessorContext<K, V> extends MockProcessorContext<K, V> implements FixedKeyProcessorContext<K, V> {

    public MockFixedKeyProcessorContext();
    public MockFixedKeyProcessorContext(final Properties config);
    public MockFixedKeyProcessorContext(final Properties config, final TaskId taskId, final File dummyFile);

    @Override
    public <K1 extends K, V1 extends V> void forward(final FixedKeyRecord<K1, V1> record);

    @Override
    public <K1 extends K, V1 extends V> void forward(final FixedKeyRecord<K1, V1> record, final String childName),
}

The new TestFixedKeyRecordFactory class

// module: kafka-streams-test-utils
// package: org.apache.kafka.streams.processor.api;

public final class TestFixedKeyRecordFactory {
    
	private TestFixedKeyRecordFactory();
	
 	public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value, final Headers headers);
 	public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value, final long timestamp);
	public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value);
	public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final K key, final V value, final long timestamp, final Headers headers);
	
	// All methods call this method and this calls InternalFixedKeyRecordFactory
 	public static <K, V> FixedKeyRecord<K, V> createFixedKeyRecord(final Record<K, V> record);

}


Proposed Changes

We add new classes to allow testing custom processor that implement the FixedKeyProcessor class.

Compatibility, Deprecation, and Migration Plan

This KIP only adds a new class, so there is no compatibility concerns.

Test Plan

Regular unit-testing is sufficient.

Documentation Plan

The section on testing should be updated: https://kafka.apache.org/37/documentation/streams/developer-guide/testing.html

Rejected Alternatives

n/a

  • No labels