Table of Contents |
---|
Status
Current state: Under DiscussionDiscarded
Discussion thread: here
JIRA: KAFKA-10578
...
The way to achieve this with Kafka Streams would be with something like this:
Code Block | ||
---|---|---|
| ||
public class DiffAggregate {
private final int currentValue;
private final int lastDifference;
public DiffAggregate() {
this(0, 0);
}
public DiffAggregate(int currentValue, int lastDifference) {
this.currentValue = currentValue;
this.lastDifference = lastDifference;
}
public int getLastDifference() {
return this.lastDifference;
}
public DiffAggregate next(int newValue) {
return new DiffAggregate(newValue, newValue - currentValue);
}
}
...
stream.groupByKey()
.aggregate(DiffAggregate::new, (key, value, aggregate) -> aggregate.next(value))
.mapValues(DiffAggregate::getLastDifference); |
...