Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionDiscarded

Discussion thread: here

JIRA: KAFKA-10578 

...

The way to achieve this with Kafka Streams would be with something like this:


Code Block
languagejava

public class DiffAggregate {
  private final int currentValue;
  private final int lastDifference;

  public DiffAggregate() {
    this(0, 0);
  }

  public DiffAggregate(int currentValue, int lastDifference) {
    this.currentValue = currentValue;
    this.lastDifference = lastDifference;
  }

  public int getLastDifference() {
    return this.lastDifference;
  }

  public DiffAggregate next(int newValue) {
    return new DiffAggregate(newValue, newValue - currentValue);
  }
}

...

stream.groupByKey()
        .aggregate(DiffAggregate::new, (key, value, aggregate) -> aggregate.next(value))
        .mapValues(DiffAggregate::getLastDifference);

...