You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »

Status

Current state: Accepted

Discussion threadthread

JIRAs:

Release: 2.1

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams represents continuously updating computations over entities via KTable. 

Kafka Streams currently provides record caches to control the rate at which updates are emitted from KTables: 

The relevant configuration parameters (cache.max.bytes.buffering and commit.interval.ms) apply to the entire Streams app. This is based on the theory that (1) rate-limiting is primarily an operational concern for the app operator and (2) cache size is best configured at the machine level so that the total memory footprint can be managed. In terms of stream processing logic, rate-limiting does not affect the semantics of the application, so it didn't seem to belong in the Streams DSL directly.

Request shaping

The current design does not account for the effect of rate-limiting on downstream consumers. For example, some applications will contact an external system for every event, so they may prefer to limit this stream more aggressively than the rest of the application.


Easier config

Users of Streams have reported difficulty in achieving their desired results with the existing cache-based model. This is variously due to confusion (because the model yields complex interactions with the various operators) or because of actual bugs in the implementation (but the mentioned complexity makes it difficult to decide if observed behavior is a bug or not). Having a more explicit and semantically tighter controls will make improve both of these struggles.

"Final result" of a window aggregation

Windowed computations in particular present a unique opportunity. The output of a windowed computation is a KTable in which the keys are annotated with window ids. Because the window has a defined end, we know that the keys belonging to that window can never be modified again once that end time has passed, except by late-arriving events. If we bound the lateness of events, we can know the "final value" of the window. This is especially useful for taking irrevocable actions in response to window results, such as sending an email or alert when a windowed aggregation is below a certain threshold.

See  KAFKA-6556 - Getting issue details... STATUS  for one example of this.

Proposed Changes

New operator: "suppress"

I'm proposing to add a new KTable operator: suppress.

Suppress is for dropping some updates from a KTable's changelog stream in a semantically sound way.

At the moment, I'm proposing to provide two main kinds of suppressions:

  1. All but the final result for windows. You can use suppress get exactly one final result per window/key for windowed computations. This includes both time and session windows.
    1. This feature requires adding a "grace period" parameter for windows.
  2. Intermediate Events. You can use suppress to wait for more updates on the same key before emitting them downstream. There are two main configuration knobs to tune here:
    1. How long to wait for more updates before emitting. This is an amount of time, measured either from the event time (for regular KTables) or from the window end (for windowed KTables), to buffer up each key before emitting them downstream.
    2. How much memory to use for buffering events (and what to do when it fills up). Suppress needs to remember the last observed event for each key, which takes memory. You can configure the amount of memory either by number of keys or by raw heap size. You can configure what to do when the buffer runs out of memory:
      1. Emit when full. This option is basically "best effort" suppression. If suppress runs out of memory, it will emit the oldest event.
      2. Spill to disk. This option is for when you need suppression to be strict. If the operator runs out of memory, it will allocate a RocksDB database and begin storing new records there. This may be complicated to implement, and consequently might be a future addition.
      3. Shut down. There is an option to run with the assumption that your "live set" of suppression events will fit within memory. If this assumption turns out to be false, the application will run out of memory and shut down. This may sound strange, but it's no different than using an InMemoryKeyValueStore or even an ArrayList. In the case of suppress, you can either leave the memory buffer unconstrained, which will cause an application crash if it runs out of memory, or you can configure it with a memory constraint and tell it to shut down gracefully when it runs out. Either one is safe from a correctness standpoint, but the latter will result in faster recovery. 

Suppression patterns

Final window results per key

You can achieve exactly one event per key per window by configuring:

  • A grace period on the window. This would be a balance of how late you expect events to be and how long you're willing to wait before finalizing the window and emitting the result.
  • Suppress with "untilWindowClose". There's a specific "Suppressed" configuration for this feature. It allows you to configure the memory buffer and decide whether to shut down or switch to disk.

Best-effort rate limit per key

Configure:

  • Emit-when-full memory strategy
  • Suppress with "untilWindowClose": the inverse of the desired rate. If the desired rate is R (events per second), the inverse is the buffering time: 1/R (seconds per event).

Strict rate limit per key

Configure:

  • Unlimited memory (either with spill-to-disk or just not limiting the memory)
  • Suppress with "untilWindowClose": the inverse of the desired rate. If the desired rate is R (events per second), the inverse is the buffering time: 1/R (seconds per event).

Details and Public Interfaces

There are two primary changes we're proposing: Altering the window definition and adding a KTable#suppress operator.

Add Grace Period to Window Spec Builders

For our new Suppress operator to support window final results, we need to define a point at which window results are actually final!

Currently, the only hint we have about this is the retention period (called until/maintainMs/retentionPeriod). Once we drop an old window, we can obviously never update it again. However, this isn't a convenient "final" point for our purpose. Retention time is typically very large (the default is one day), and users of IQ may need to keep the retention time large in order to support queries even over final windows. Plus, how long a window is retained is really a property of the window store implementation. In principle, an in-memory implementation might choose to retain events for a short time while a remote distributed store may keep them forever. This shouldn't prohibit the usage of final results, though.

To resolve this conflict, we're adding a new concept to the window spec: grace period. This is an amount of time that the window should accept late-arriving events after the window ends. After the grace period passes, the window is considered "closed" and will never be updated again. The grace period places a lower-bound constraint on the retention time, but otherwise has no implication on retention.

We will add a "grace" configuration to the window spec builders (TimeWindows, JoinWindows, UnlimitedWindows, and SessionWindows).

We are also deprecating the retention time and segment interval as specified on the window spec itself and moving them to the lower-level store configurations.

On TimeWindows, JoinWindows, SessionWindows:
/**
 * Reject late events that arrive more than {@code millisAfterWindowEnd}
 * after the end of its window.
 *
 * Lateness is defined as (stream_time - record_timestamp).
 *
 * @param millisAfterWindowEnd The grace period to admit late-arriving events to a window.
 * @return this updated builder
 */
@SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
public XWindows grace(final long millisAfterWindowEnd); // where XWindows is the appropriate builder


On Windows (and SessionWindows, although it's not abstract there):
+ /**
+  * Return the window grace period (the time to admit
+  * late-arriving events after the end of the window.
+  */
+ public abstract long gracePeriodMs();


/**
 ...
+ * @deprecated since 2.1. Use {@link Joined#retention()} 
+ * or {@link Materialized#retention}
+ * or directly configure the retention in a store supplier and use
+ * {@link Materialized#as(WindowBytesStoreSupplier)}.
 */
+ @Deprecated
public Windows<W> until(final long durationMs);

/**
 ...
 * @deprecated since 2.1. Use {@link Joined#retention()} or {@link Materialized#retention} instead.
 */
@Deprecated
public long maintainMs();


/**
 ...
 * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
 */
@Deprecated
public long segmentInterval();


The Window/Session BytesStoreSupplier interface already includes retention period. The existing behavior is that that retention period overrides maintainMs if set on the window. We'll preserve this behavior.

We will add retention to Materialized:

/**
 * Configure retention period for window and session stores. Ignored for key/value stores.
 *
 * Overridden by pre-configured store suppliers
 * ({@link Materialized#as(SessionBytesStoreSupplier)} or {@link Materialized#as(WindowBytesStoreSupplier)}).
 *
 * @return itself
 */
public Materialized<K, V, S> withRetention(final long retentionMs);

New Suppress Operator

We will add the new operator to KTable:

public interface KTable<K, V> {


 /**
 * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed}.
 *
 * This controls what updates downstream table and stream operations will receive.
 *
 * @param suppressed Configuration object determining what, if any, updates to suppress.
 * @return A new KTable with the desired suppress characteristics.
 */
 KTable<K, V> suppress(final Suppressed<K, V> suppress);


}

Note the absence of a variant taking Materialized. The result of a suppression will always be (eventually) consistent with the source KTable, so I'm thinking right now that it would be "bad advice" to present the option to materialize it.

We will also create the config object Suppressed:

public interface Suppressed<K> {

/**
 * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly
 * enforce the time bound and never emit early.
 */
interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {}

interface BufferConfig<BC extends BufferConfig<BC>> {

        /**
         * Create a size-constrained buffer in terms of the maximum number of keys it will store.
         */
        static BufferConfig<?> maxRecords(final long recordLimit);

        /**
         * Set a size constraint on the buffer in terms of the maximum number of keys it will store.
         */
        BC withMaxRecords(final long recordLimit);

        /**
         * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
         */
        static BufferConfig<?> maxBytes(final long byteLimit);

        /**
         * Set a size constraint on the buffer, the maximum number of bytes it will use.
         */
        BC withMaxBytes(final long byteLimit);

        /**
         * Create a buffer unconstrained by size (either keys or bytes).
         *
         * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
         *
         * If there isn't enough heap available to meet the demand, the application will encounter an
         * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
         * JVM processes under extreme memory pressure may exhibit poor GC behavior.
         *
         * This is a convenient option if you doubt that your buffer will be that large, but also don't
         * wish to pick particular constraints, such as in testing.
         *
         * This buffer is "strict" in the sense that it will enforce the time bound or crash.
         * It will never emit early.
         */
        static StrictBufferConfig unbounded();

        /**
         * Set the buffer to be unconstrained by size (either keys or bytes).
         *
         * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
         *
         * If there isn't enough heap available to meet the demand, the application will encounter an
         * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
         * JVM processes under extreme memory pressure may exhibit poor GC behavior.
         *
         * This is a convenient option if you doubt that your buffer will be that large, but also don't
         * wish to pick particular constraints, such as in testing.
         *
         * This buffer is "strict" in the sense that it will enforce the time bound or crash.
         * It will never emit early.
         */
        StrictBufferConfig withNoBound();

        /**
         * Set the buffer to gracefully shut down the application when any of its constraints are violated
         *
         * This buffer is "strict" in the sense that it will enforce the time bound or shut down.
         * It will never emit early.
         */
        StrictBufferConfig shutDownWhenFull();

        /**
         * Sets the buffer to use on-disk storage if it requires more memory than the constraints allow.
         *
         * This buffer is "strict" in the sense that it will never emit early.
         */
        StrictBufferConfig spillToDiskWhenFull();

        /**
         * Set the buffer to just emit the oldest records when any of its constraints are violated.
         *
         * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
         * duplicate results downstream, but does not promise to eliminate them.
         */
        BufferConfig emitEarlyWhenFull();
}

/**
 * Configure the suppression to emit only the "final results" from the window.
 *
 * By default all Streams operators emit results whenever new results are available.
 * This includes windowed operations.
 *
 * This configuration will instead emit just one result per key for each window, guaranteeing
 * to deliver only the final result. This option is suitable for use cases in which the business logic
 * requires a hard guarantee that only the final result is propagated. For example, sending alerts.
 *
 * To accomplish this, the operator will buffer events from the window until the window close (that is,
 * until the end-time passes, and additionally until the grace period expires). Since windowed operators
 * are required to reject late events for a window whose grace period is expired, there is an additional
 * guarantee that the final results emitted from this suppression are eventually consistent with the upstream
 * operator and its queriable state, if enabled.
 *
 * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
 * This is required to be a "strict" config, since it would violate the "final results"
 * property to emit early and then issue an update later.
 * @param <K> The key type for the KTable to apply this suppression to. "Final results" mode is only available
 * on Windowed KTables (this is enforced by the type parameter).
 * @return a "final results" mode suppression configuration
 */
static <K extends Windowed> Suppressed<K> untilWindowCloses(final StrictBufferConfig bufferConfig);

/**
 * Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record
 * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
 * the first record in the buffer but does <em>not</em> re-start the timer.
 *
 * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
 * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
 * @param <K> The key type for the KTable to apply this suppression to.
 * @return a suppression configuration
 */
 static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig);
}


Along with the suppression operator, we will add several metrics. Note that suppress will not add to the skipped-records metrics. "Skipped" records are records that are for one reason or another invalid. "Suppressed" records are intentionally dropped, just like filtered records. Likewise with events arriving later than the grace period for windows.

Metrics

Note: I'm not proposing roll-up metrics for these. They would be reported at the processor-node level. I suspect this is actually fine, and roll-ups can easily be added later if necessary.

Metrics we'll add:

  • late records (new records older than the grace period) are currently metered as "skipped-records" and logged at WARN level. As noted, this is not correct, so we will change the logs to DEBUG level and add new metrics: 
    • average and max observed lateness of all records: to help configure the grace period
      • (INFO) record-lateness-[avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of dropped events for closed windows
      • (INFO) late-record-drop-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
  • expired records (new records for segments older than the state store's retention period) are currently not metered and logged at DEBUG level. Since the grace period is currently equivalent to the retention period, this should currently be rare, as such events would never reach the state store but be marked as skipped and dropped in the processor. However, since we are deprecating `Windows.until` and defining retention only on the state store, it would become much more common. Therefore, we'll add some new state store metrics:
    • rate and total of events for expired windows
      • (INFO) expired-window-record-drop-[rate | total] type=stream-[storeType]-state-metrics client-id=<threadId> task-id=<taskId> [storeType]-state-id=[storeName]
  • intermediate event suppression
    • current, average, and peak intermediate suppression buffer size
      • (INFO) suppression-mem-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • current, average, and peak intermediate suppression disk buffer size (only present when using the "SPILL_TO_DISK" buffer-full strategy)
      • (INFO) suppression-disk-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • intermediate suppression buffer eviction rate and total: to how often events are emitted early (only present when using the "EMIT" buffer-full strategy)
      • (INFO) suppression-mem-buffer-evict-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of intermediate result suppressions
      • (INFO) intermediate-result-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • min and average intermediate suppression overtime: to determine whether the intermediate suppression emitAfter is delaying longer than necessary. This metric may be unnecessary, since it's equivalent to (timeToWaitForMoreEventsConfig - observedLatenessMetric).
      • (INFO) intermediate-result-suppression-overtime-[min | avg] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>

Examples

Here are some examples of programs using the new operator to achieve various goals.

Final window results per key

Imagine we wish to send an alert if a user has fewer than 3 events in an hour. For the example, we'll wait up to 10 minutes after the hour ends for late-arriving events.

builder.<Integer, String>stream("events")
    .groupByKey()
    .windowedBy(TimeWindows.of(3600_000).grace(600_000)
    .count()
    .suppress(untilWindowCloses(BufferConfig.unbounded()))
    .toStream()
    .filter((key, value) -> value < 3)
    .foreach((key, value) -> sendAlert("User " + key.key() + " had fewer than 3 events in window " + key.window()));

Note that we can handle limited memory in a few different ways:

// Option 1: expect not to run out of memory
windowCounts
  .suppress(untilWindowCloses(unbounded()))


// Option 2: shut down gracefully if we need too much memory
windowCounts
  .suppress(untilWindowCloses(maxBytes(5_000_000).shutDownWhenFull()))


// Option 3: Start using disk if we need too much memory
windowCounts
  .suppress(untilWindowCloses(maxBytes(5_000_000).spillToDiskWhenFull()))

Any of these constructions yield a strict guarantee that each windowed key will emit exactly one event.

Note about the "shut down when full" option:

This may seem like a strange option for production code, but consider that in practice there is limited heap size available. As with all data structures, if you need to store more data than fits in memory, then you will run out of memory and crash. For Java in particular, as the available heap approaches 0, the garbage collector will consume more and more CPU, which can cause the application to become unresponsive long before an actual OutOfMemoryError occurs.

The idea of this option is to pick some reasonably large bound to allow a graceful and performant shutdown before this occurs.

Then, the operator can increase the heap size and restart, or the developer can decrease the grace period (decreasing the required heap), or choose another strategy, such as on-disk buffering.


Rate-limited updates

Suppose we wish to reduce the rate of updates from a KTable to roughly one update every 30s per key. We don't want to use too much memory for this, and we don't think we'll have updates for more than 1000 keys at any one time.

table
  .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000)))
  .toStream(); // etc.


Compatibility, Deprecation, and Migration Plan

The only part of the KIP that's relevant to existing APIs is the deprecation of Windows#until/maintainMs. I've described above how the deprecation warnings will look, and also what new APIs will replace them. All the implementations will be done in such a way that existing Streams applications will have exactly the same semantics before and after this KIP, so there's no concern about continuing to use the deprecated APIs.

One other change we could consider in the future is to revisit the state store caching mechanism, but that also serves the function of limiting i/o to the state store, so I think that should be a separate discussion.

Rejected Alternatives

There are many alternative ways to achieve this behavior.

At the top of the list, I also considered having dedicated operators for final events on windowed tables and update suppression on non-windowed ones. But the current proposal supports the same behavior with just one new operator.

We also considered having windowed computations directly provide the "final results" feature via an "Emitted" config object, but ultimately settled on adding the grace period to the window and letting "suppress" deal with suppressing all but the final result.

In fact, I previously proposed not to support "final results" directly, but instead to allow a suppression with an upper bound on lateness. using the same time for this lateness bound and intermediate suppression would naturally yield final results only. But we judged that this API was too esoteric. The version we have now is much more straightforward for this use case.

  • No labels