...
Code Block | ||||
---|---|---|---|---|
| ||||
public class SimpleTopicTest { private TopologyTestDriver testDriver; private TestInputTopic<String, String> inputTopic; private TestOutputTopic<String, String> outputTopic; @Before public void setup() { testDriver = new TopologyTestDriver(TestStream.getTopology(), TestStream.getConfig()); inputTopic = new TestInputTopic<String, String>TestInputTopic<>(testDriver, TestStream.INPUT_TOPIC, new Serdes.StringSerde(), new Serdes.StringSerde()); outputTopic = new TestOutputTopic<String, String>TestOutputTopic<>(testDriver, TestStream.OUTPUT_TOPIC, new Serdes.StringSerde(), new Serdes.StringSerde()); } @After public void tearDown() { testDriver.close(); } @Test public void testOneWord() { //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case inputTopic.pipeInput("Hello"); assertThat(outputTopic.readValue()).isEqualTo("Hello"); //No more output in topic assertThat(outputTopic.readRecord()).isNull(); } } |
...