Date: Fri, 29 Mar 2024 13:48:52 +0000 (UTC) Message-ID: <148992834.6630.1711720132220@cwiki-he-fi.apache.org> Subject: Exported From Confluence MIME-Version: 1.0 Content-Type: multipart/related; boundary="----=_Part_6629_2146824697.1711720132220" ------=_Part_6629_2146824697.1711720132220 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Content-Location: file:///C:/exported.html
Current state: 'Accepted'
Discussion thread: here=
JIRA: KAFKA-8233 - Getting issue deta=
ils... STATUS
Please keep the discussion on the mailing list rather than commenting on= the wiki (wiki discussions get unwieldy fast).
This KIP is inspired by the Discussion in KIP-456: Helper classes to make it simpler to wr= ite test logic with TopologyTestDriver
The stream application code is ver= y compact and the test code is a lot of bigger code base than actual implem= entation of the application, that's why it would be good to get test code e= asily readable and understandable and that way also maintainable.
The proposal in KIP-456 was to add alternate way to input and output topic, but this K= IP enhance those classes and deprecate old functionality to make clear inte= rface for test writer to use.
When using the old TopologyTestDriver you need to call ConsumerRecordFac= tory to create ConsumerRecord passed into pipeInput method to write to topi= c. Also when calling readOutput to consume from topic, you need to pro= vide correct Deserializers each time.
You easily end up writing helper methods in your test classes, but this = can be avoided when adding generic input and output topic classes to implem= ent the needed functionality.
Also the logic of the old TopologyTestDriver is confusing, when you need= to pipe ConsumerRecords to produce record to input topic and receive Produ= cerRecords when consuming from output topic.
Non-existing topic and no record in the queue scenarious are modified to= throw Exception instead of returning null.
package org.a= pache.kafka.streams; public class TopologyTestDriver { public TopologyTestDriver(Topology topology, Properties config); // exi= sting constructor @Deprecate public TopologyTestDriver(Topology topology, Properties conf= ig, long initialWallClockTimeMs); public TopologyTestDriver(Topology topology, Properties config, Instant= initialWallClockTime); @Deprecate public void advanceWallClockTime(long advanceMs); // can tri= gger wall-clock-time punctuation =09public void advanceWallClockTime(Duration advance); // can trigger wall-= clock-time punctuation //Deprecate old pipe and read methods @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> r= ecord); // can trigger event-time punctuation @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte= []>> records); // can trigger event-time punctuation @Deprecate public ProducerRecord<byte[], byte[]> readOutput(Strin= g topic); @Deprecate public <K, V> ProducerRecord<K, V> readOutput(St= ring topic, Deserializer<K> keyDeserializer, Deserializer<V> va= lueDeserializer); =09// methods for TestTopic object creation =09public final <K, V> TestOutputTopic<K, V> createOutputTopic(= final String topicName, final Serializer<K> keySerializer, final Seri= alizer<V> valueSerializer); =09// Uses current system time as start timestamp. Auto-advance is disabled= . =09public final <K, V> TestInputTopic<K, V> createInputTopic(fi= nal String topicName, final Deserializer<K> keyDeserializer, final De= serializer<V> valueDeserializer); //Uses provided startTimestamp and autoAdvance duration for timestamp g= eneration =09public final <K, V> TestInputTopic<K, V> createInputTopic(fi= nal String topicName, final Deserializer<K> keyDeserializer, final De= serializer<V> valueDeserializer, final Instant startTimestamp, final = Duration autoAdvance); ... }
package org.a= pache.kafka.streams; public class TestInputTopic<K, V> { //Create by TopologyTestDriver, Constructors are package private //Timestamp handling=20 //Record timestamp can be provided when piping input or use internally = tracked time configured with parameters: //startTimestamp the initial timestamp for generated records, if not pr= ovided uses current system time as start timestamp. //autoAdvance the time increment per generated record, if not provided = auto-advance is disabled. //Advances the internally tracked time. void advanceTime(final Duration advance); =09//Methods to pipe single record void pipeInput(final V value); void pipeInput(final K key, final V value); // Use provided timestamp, does not auto advance internally tracked tim= e. void pipeInput(final V value, final Instant timestamp); void pipeInput(final K key, final V value, final Instant timestamp); // Method with long provided to support easier migration of old tests void pipeInput(final K key, final V value, final long timestampMs); // If record timestamp set, does not auto advance internally tracked ti= me. void pipeInput(final TestRecord<K, V> record); =09//Methods to pipe list of records void pipeValueList(final List<V> values); void pipeKeyValueList(final List<KeyValue<K, V>> keyValues)= ; // Use provided timestamp, does not auto advance internally tracked tim= e. void pipeValueList(final List<V> values, final Instant startTimes= tamp, final Duration advanceMs); void pipeKeyValueList(final List<KeyValue<K, V>> keyValues,= final Instant startTimestamp, final Duration advanceMs); // If record timestamp set, does not auto advance internally tracked ti= me. void pipeRecordList(final List<? extends TestRecord<K, V>> = records); }
package org.a= pache.kafka.streams; public class TestOutputTopic<K, V> { //Create by TopologyTestDriver, Constructors are package private =09//Method to check queue size final long getQueueSize(); final boolean isEmpty(); =09//Methods to readOutput, throw NoSuchElement if no record in queue V readValue(); KeyValue<K, V> readKeyValue(); TestRecord<K, V> readRecord(); //Output as collection List<V> readValuesToList(); List<KeyValue<K, V>> readKeyValuesToList(); Map<K, V> readKeyValuesToMap(); List<TestRecord<K, V>> readRecordsToList(); }
package org.a= pache.kafka.streams.test; public class TestRecord<K, V> { =09//Constructors public TestRecord(final V value); public TestRecord(final K key, final V value); public TestRecord(final K key, final V value, final Headers headers); public TestRecord(final K key, final V value, final Instant recordTime)= ; public TestRecord(final K key, final V value, final Headers headers, fi= nal Instant recordTime); public TestRecord(final K key, final V value, final Headers headers, fi= nal Long timestamp); =09 =09//Constructor based on existing record public TestRecord(final ConsumerRecord<K, V> record); public TestRecord(final ProducerRecord<K, V> record); =09 =09// Methods like in ProducerRecord / ConsumerRecord public Headers headers(); public K key(); public V value(); public Long timestamp(); =09// Getters public Headers getHeaders(); public K getKey(); public V getValue(); public Instant getRecordTime(); =09 =09//Overrides public String toString(); public boolean equals(Object o); public int hashCode(); }
package org.a= pache.kafka.streams.test; //Recommended to use normal assertion library methods=20 @Deprecated public class OutputVerifier { ...
package org.a= pache.kafka.streams.test; //Similar functionality now in TestInputTopic @Deprecated public class ConsumerRecordFactory<K, V> { ...
This improvement adds TestInputTopic class which replaces TopologyT= estDriver and ConsumerRecordFactory methods as one class to be used t= o write to Input Topics and TestOutputTopic class which collects Topol= ogyTestDriver reading methods and provide typesafe read methods.
public class = SimpleTopicTest { private TopologyTestDriver testDriver; private TestInputTopic<String, String> inputTopic; private TestOutputTopic<String, String> outputTopic; @Before public void setup() { testDriver =3D new TopologyTestDriver(TestStream.getTopology(), TestStr= eam.getConfig()); =09inputTopic =3D testDriver.createInputTopic(TestStream.INPUT_TOPIC, new S= tringDeserializer(), new StringDeserializer()); outputTopic =3D testDriver.createOutputTopic(TestStream.OUTPUT_TOPIC, n= ew StringSerializer(), new LongSerializer()); } @After public void tearDown() { testDriver.close(); } @Test public void testOneWord() { //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrele= vant in this case inputTopic.pipeInput("Hello"); assertThat(outputTopic.readValue()).isEqualTo("Hello"); //No more output in topic assertThat(outputTopic.isEmpty()).isTrue(); } }
There are no compatibility issues.
The tests utilizing old TopologyTestDriver can still use deprecated = methods.