This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: In Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Stream supports different ways to window a stream for aggregation, but all of them are extremely sensitive to late arriving data. To handle this, we introduced the concept of a "grace period" by which a window can be retained for longer than the window end to allow for updating the result of a window with a record that arrives after the window end time has passed. This concept is powerful, but doesn't help users that want to aggregate based on offset order alone, not discarding any late arriving record.

A common use case for such a windowed aggregation is to collect records in a very small window (usually a few seconds) into a small batch before being passed to a consumer that can handle batched messages more effectively than individual messages. An example here is collecting records for a few seconds and suppressing them before sending them all together as a single RPC to another service for processing. A naive implementation of that using existing mechanics would look like:

stream
	.windowBy(TimeWindows.ofSizeAndNoGrace(Duration.ofSeconds(10))
	.aggregate((k, v, kvList) -> kvList.add((k, v)))
	.suppress(Suppressed.untilWindowCloses(...))

This approach, however, makes handling late arriving data intractable. The user is forced between two choices:

  1. Use a tumbling window with grace period of 0 and ignore late arriving events
  2. Use a tumbling window with a grace period large enough to account for late arriving data and suppress for much longer than desired, introducing excessive end-to-end latency (and potentially undue memory pressure)

Typically the behavior that users want with this type of operation is to group keys together until a certain amount of time passes and emit them as a single record, always accepting late arriving records and just putting them into whatever the “current” window is. Today the only way to implement this semantic is using a punctuation and manually storing events in an aggregation (which is inefficient for many reasons and can cause severe bottlenecks).

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

We will be introducing a new type of Windows that always computes the current window based on the current stream time instead of event timestamp:

public final class BatchWindows extends Windows<TimeWindow> {
		
		/**
		 * Return a window definition with the given window size.
		 * <p> 
		 * This represents "batching" window semantics, which are fixed-size,
		 * gap-less, non-overlapping windows. Batched windows use the current
		 * stream time when determining which window an event fits into instead
		 * of the event time (as the other window types use). This allows users
         * to batch together records of the same key over a window of time.
		 * <p>
 		 * The window boundaries are inclusive of the start point and exclusive,
		 * of the end point. This means that a BatchWindows of size 1000 would have
		 * windows from [0,1000),[1000,2000),etc...
         * <p>
         * Note that batch windows will always accept late arriving data, and
		 * should only be used in situations where the ordering of data is not
		 * necessarily important to the semantics of your application.
		 */
		public static BatchWindows ofSize(final Duration size) { ... }
    
}

Here is an example of how events would map to windows:

The example in the motivation section then becomes:

stream
	.windowBy(BatchWindows.ofSize(Duration.ofSeconds(10))
	.aggregate((k, v, kvList) -> kvList.add((k, v)))
	.suppress(Suppressed.untilWindowCloses(...))

And all events that come within 10 seconds of stream time (independent of their event time) will be aggregated together, suppressed and emitted when the stream time elapses by 10 seconds.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

First, we need to expand the Windows class to take in the current stream time when computing the window for a given event:

public abstract class Windows<W extends Window> {
  ...
  public Map<Long, W> windowsFor(
    final long timestamp, 
    final long observedStreamTime
  ) {
    return windowsFor(timestamp);
  }

  // as part of this KIP we will deprecate this method, which is technically public
  // although users are not expected to implement this (it lives in a public package)
  @Deprecated
  public abstract Map<Long, W> windowsFor(final long timestamp);
}

The implementation of BatchWindows will then use the observedStreamTime when computing which window the current event should fall into:

@Override
public Map<Long, TimeWindow> windowsFor(
  final long timestamp, 
  final long observedStreamTime
) {
	long windowStart = (observedStreamTime / sizeMs) * sizeMs;
  return Map.of(windowStart, new TimeWindow(windowStart, windowStart + sizeMs));
}

The gracePeriodMs for BatchWindows is always zero, which means there will only ever be one open window per key and events will always fall within this window. Here is the documentation for the remaining public members of the class:

/**
 * A fixed-size, stream-time based window specification used for aggregations.
 * <p>
 * The semantics of batched aggregations are: Every size() milliseconds, compute the aggregate total for the last
 * size() milliseconds. This is equivalent in semantics to TimeWindows with an advance of 0.
 * <p>
 * This class differs from TimeWindows in that the windows for a given event is computed based on the current observed
 * stream time as opposed to the timestamp of the given record. This effectively makes the windows ordered with respect
 * to the offset of the record instead of the timestamp and allows for batching together records within a window.
 */
public final class BatchWindows extends Windows<TimeWindow> {
  
  /**
   * Returns the window size, measured in milliseconds.
   */
  public long size() { }

  /**
   * Returns 0, as offset ordered windows have no concept of "late arriving" records
   * because stream time is always monotonically increasing
   */
   public long gracePeriodMs() { return 0L; }

}


Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Nothing special needs to be tested beyond unit and integration tests to make sure we cover situations like out of order and late arriving events.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • Instead of introducing a new window type, we could add a property like Strictness  on the existing TimeWindows  type. This would end up being more confusing than the proposed API since grace period do not apply to the type of windows suggested here.
  • Names we discussed but didn't like: GlobalWindow, StreamTimeWindow, ProcessTimeWindow, OffsetOrderedWindow.
  • Implementing a more general purpose batching mechanism that can batch based on number of records, wall clock or any other batching strategy. This is useful, but out of scope for this proposal as it introduces a different level of non-determinism that is not covered by the existing grace/windowing semantics. The way batching/suppression interrelate are also
  • No labels