You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Discussion

Discussion thread: (tbd)

JIRAs:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams represents continuously updating computations over entities via KTable. 

Kafka Streams currently provides record caches to control the rate at which updates are emitted from KTables: 

The relevant configuration parameters (cache.max.bytes.buffering and commit.interval.ms) apply to the entire Streams app. This is based on the theory that (1) rate-limiting is primarily an operational concern for the app operator and (2) cache size is best configured at the machine level so that the total memory footprint can be managed. In terms of stream processing logic, rate-limiting does not affect the semantics of the application, so it didn't seem to belong in the Streams DSL directly.

Request shaping

The current design does not account for the effect of rate-limiting on downstream consumers. For example, some applications will contact an external system for every event, so they may prefer to limit this stream more aggressively than the rest of the application.


Easier config

Users of Streams have reported difficulty in achieving their desired results with the existing cache-based model. This is variously due to confusion (because the model yields complex interactions with the various operators) or because of actual bugs in the implementation (but the mentioned complexity makes it difficult to decide if observed behavior is a bug or not). Having a more explicit and semantically tighter controls will make improve both of these struggles.

"Final value" of a window

Windowed computations in particular present a unique opportunity. The output of a windowed computation is a KTable in which the keys are annotated with window ids. Because the window has a defined end, we know that the keys belonging to that window can never be modified again once that end time has passed, except by late-arriving events. If we bound the lateness of events, we can know the "final value" of the window. This is especially useful for taking irrevocable actions in response to window results, such as sending an email or alert when a windowed aggregation is below a certain threshold.

See  KAFKA-6556 - Getting issue details... STATUS  for one example of this.

Proposed Changes

New operator: "suppress"

I'm proposing to add a new KTable operator: suppress.

Suppress is for dropping some updates from a KTable's changelog stream in a semantically sound way.

At the moment, I'm proposing to provide two main kinds of suppressions:

  1. Late Events. You can use suppress to bound the lateness of events in the stream. "Lateness" specifically means the difference between record time and stream time.
  2. Intermediate Events. You can use suppress to wait for more updates on the same key before emitting them downstream. There are two main configuration knobs to tune here:
    1. How long to wait for more updates before emitting. This is an amount of time, measured either from the event time (for regular KTables) or from the window end (for windowed KTables), to buffer up each key before emitting them downstream.
    2. How much memory to use for buffering events (and what to do when it fills up). Suppress needs to remember the last observed event for each key, which takes memory. You can configure the amount of memory either by number of keys or by raw heap size. You can configure what to do when the buffer runs out of memory:
      1. Emit when full. This option is basically "best effort" suppression. If suppress runs out of memory, it will emit the oldest event.
      2. Spill to disk. This option is for when you need suppression to be strict. If the operator runs out of memory, it will allocate a RocksDB database and begin storing new records there. This may be complicated to implement, and consequently might be a future addition.
      3. Shut down. There is an option to run with the assumption that your "live set" of suppression events will fit within memory. If this assumption turns out to be false, the application will run out of memory and shut down. This may sound strange, but it's no different than using an InMemoryKeyValueStore or even an ArrayList. In the case of suppress, you can either leave the memory buffer unconstrained, which will cause an application crash if it runs out of memory, or you can configure it with a memory constraint and tell it to shut down gracefully when it runs out. Either one is safe from a correctness standpoint, but the latter will result in faster recovery. 

Suppression patterns

Final window results per key

You can achieve exactly one event per key per window by configuring:

  • Unlimited memory (either with spill-to-disk or just not limiting the memory)
  • Choose a maximum time (T) to wait after window-end for late events. Configure suppress to drop late events after T and also to suppress intermediate events for up to T. This will result in exactly one event for each key in each window, which represents the "final" event for that window.

Best-effort rate limit per key

Configure:

  • Emit-when-full memory strategy
  • The inverse of the desired rate. If the desired rate is R (events per second), the inverse is the buffering time: 1/R (seconds per event).

Strict rate limit per key

Configure:

  • Unlimited memory (either with spill-to-disk or just not limiting the memory)
  • The inverse of the desired rate. If the desired rate is R (events per second), the inverse is the buffering time: 1/R (seconds per event).

Public Interfaces

We will add the new operator to KTable:

public interface KTable<K, V> {


  /**
  * Suppress some updates from this changelog stream, determined by the supplied {@link Suppression}.
  *
  * This controls what updates downstream table and stream operations will receive.
  *
  * @param suppression Configuration object determining what, if any, updates to suppress.
  * @return A new KTable with the desired suppression characteristics.
  */
  KTable<K, V> suppress(final Suppression suppression);


}


We will also create the config object Suppression:

public class Suppression {
    public enum BufferFullStrategy {
        EMIT,
        SPILL_TO_DISK,
        SHUT_DOWN
    }

    public static class IntermediateSuppression {
        public IntermediateSuppression();

        public IntermediateSuppression emitAfter(final Duration timeToWaitForMoreEvents);

        public IntermediateSuppression bufferKeys(final long numberOfKeysToRemember);

        public IntermediateSuppression bufferBytes(final long bytesToUseForSuppressionStorage);

        public IntermediateSuppression bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
    
    }

    public Suppression();

    public Suppression suppressLateEvents(final Duration maxAllowedLateness);

    public Suppression suppressIntermediateEvents(final IntermediateSuppression intermediateSuppression);

}


Examples

Here are some examples of programs using the new operator to achieve various goals.

Final window results per key

Imagine we wish to send an alert if a user has fewer than 3 events in an hour. For the example, we'll wait up to 10 minutes after the hour ends for late-arriving events.

builder.<Integer, String>stream("events")
    .groupByKey()
    .windowedBy(TimeWindows.of(3600_000))
    .count()
    .suppress(
        new Suppression()
            .suppressLateEvents(Duration.ofMinutes(10))
            .suppressIntermediateEvents(
                new IntermediateSuppression().emitAfter(Duration.ofMinutes(10))
            )
    )
    .toStream()
    .filter((key, value) -> value < 3)
    .foreach((key, value) -> sendAlert("User " + key.key() + " had fewer than 3 events in window " + key.window()));

Note that we can handle limited memory in a few different ways:

// Option 1: expect not to run out of memory
windowCounts
    .suppress(
        new Suppression()
            .suppressLateEvents(Duration.ofMinutes(10))
            .suppressIntermediateEvents(
                new IntermediateSuppression().emitAfter(Duration.ofMinutes(10))
            )
    );


// Option 2: shut down gracefully if we need too much memory
windowCounts
    .suppress(
        new Suppression()
            .suppressLateEvents(Duration.ofMinutes(10))
            .suppressIntermediateEvents(
                new IntermediateSuppression()
                    .emitAfter(Duration.ofMinutes(10))
                    .bufferBytes(5_000_000)
                    .bufferFullStrategy(SHUT_DOWN)      // <- NOTE
            )
    );


// Option 3: Start using disk if we need too much memory
windowCounts
    .suppress(
        new Suppression()
            .suppressLateEvents(Duration.ofMinutes(10))
            .suppressIntermediateEvents(
                new IntermediateSuppression()
                    .emitAfter(Duration.ofMinutes(10))
                    .bufferBytes(5_000_000)
                    .bufferFullStrategy(SPILL_TO_DISK)   // <- NOTE
            )
    );

Any of these constructions yield a strict guarantee that each windowed key will emit exactly one event.

Rate-limited updates

Suppose we wish to reduce the rate of updates from a KTable to roughly one update every 30s per key. We don't want to use too much memory for this, and we don't think we'll have updates for more than 1000 keys at any one time.

table.suppress(
    new Suppression()
        .suppressIntermediateEvents(
            new IntermediateSuppression()
                .emitAfter(Duration.ofSeconds(30))
                .bufferKeys(1000)
                .bufferFullStrategy(EMIT)
        )
).toStream(); // etc.

Bounded lateness

If for some reason, we wish to guarantee that late updates won't flow downstream from a certain point, we can use suppress for this as well:

table
    .suppress(new Suppression().suppressLateEvents(Duration.ofMinutes(10)))
    .toStream(); // etc


Compatibility, Deprecation, and Migration Plan

This is a completely new and orthogonal operation, so I don't expect any compatibility or migration problems.

One deprecation we could consider in the future is to revisit the state store caching mechanism, but that also serves the function of limiting i/o to the state store, so I think that should be a separate discussion.

Rejected Alternatives

There are many alternative ways to achieve this behavior.

At the top of the list, I also considered having dedicated operators for final events on windowed tables and update suppression on non-windowed ones. But the current proposal supports the same behavior with just one new operator.


  • No labels