Child pages
  • KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This mocked ProcessorContext would enable unit tests like the following:

Code Block
languagejava
// ===========================
// Initialize the test harness
 
final Properties config = new Properties();
final MockProcessorContext context = new MockProcessorContext("test-app", new TaskId(0, 0), config);
final KeyValueStore<String, Long> myStore = ...;
context.register(myStore, false, null);
final Processor<String, Long> processor = new MyProcessor();
processor.init(context);

// ==============================
// Verify some processor behavior
processor.process("a", 1L);
processor.process("a", 2L);
final KeyValue<Object, Object> capture1 = context.forwarded().get(0);
Assert.assertEquals(new KeyValue<>("a", 1L), capture1);
final KeyValue<Object, Object> capture2 = context.forwarded().get(1);
Assert.assertEquals(new KeyValue<>("a", 2L), capture2);

Assert.assertTrue(context.committed());

context.resetForwards();
context.resetCommits();

processor.process("a", 3L);
final KeyValue<Object, Object> capture3 = context.forwarded().get(0);
Assert.assertEquals(new KeyValue<>("a", 3L), capture3);
Assert.assertFalse(context.committed());
 
// ===============================
// Verify some Punctuator behavior
context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
Assert.assertNull(myStore.get("a")); // assuming the Processor registers a Punctuator that clears this key

 

Compatibility, Deprecation, and Migration Plan

...