Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
KStream<String> words = someWordsWeWantToCount(); // since this is what basically every kafka application does, right? :)
wordCountswords.filter((w, v) -> w.startsWith("a"))
          .peek(w -> metrics.wordsStartingWithAProcessed.increment())


In this particular case, we increment a metric counter (understanding that in failure cases some records may be double-counted due to retried work) and print every word observed out to System.out.  This provides useful information about the processing of the stream as opposed to the contents of the stream, which can be very useful when experimenting with stateful processors or otherwise diagnosing things that went wrong.