You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state: Discussion

Discussion threadthread

JIRAs:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams represents continuously updating computations over entities via KTable. 

Kafka Streams currently provides record caches to control the rate at which updates are emitted from KTables: 

The relevant configuration parameters (cache.max.bytes.buffering and commit.interval.ms) apply to the entire Streams app. This is based on the theory that (1) rate-limiting is primarily an operational concern for the app operator and (2) cache size is best configured at the machine level so that the total memory footprint can be managed. In terms of stream processing logic, rate-limiting does not affect the semantics of the application, so it didn't seem to belong in the Streams DSL directly.

Request shaping

The current design does not account for the effect of rate-limiting on downstream consumers. For example, some applications will contact an external system for every event, so they may prefer to limit this stream more aggressively than the rest of the application.


Easier config

Users of Streams have reported difficulty in achieving their desired results with the existing cache-based model. This is variously due to confusion (because the model yields complex interactions with the various operators) or because of actual bugs in the implementation (but the mentioned complexity makes it difficult to decide if observed behavior is a bug or not). Having a more explicit and semantically tighter controls will make improve both of these struggles.

"Final value" of a window

Windowed computations in particular present a unique opportunity. The output of a windowed computation is a KTable in which the keys are annotated with window ids. Because the window has a defined end, we know that the keys belonging to that window can never be modified again once that end time has passed, except by late-arriving events. If we bound the lateness of events, we can know the "final value" of the window. This is especially useful for taking irrevocable actions in response to window results, such as sending an email or alert when a windowed aggregation is below a certain threshold.

See  KAFKA-6556 - Getting issue details... STATUS  for one example of this.

Proposed Changes

New operator: "suppress"

I'm proposing to add a new KTable operator: suppress.

Suppress is for dropping some updates from a KTable's changelog stream in a semantically sound way.

At the moment, I'm proposing to provide two main kinds of suppressions:

  1. All but the final result for windows. You can use suppress get exactly one final result per window/key for windowed computations. This includes both time and session windows.
    1. This feature requires adding a "grace period" parameter for windows.
  2. Intermediate Events. You can use suppress to wait for more updates on the same key before emitting them downstream. There are two main configuration knobs to tune here:
    1. How long to wait for more updates before emitting. This is an amount of time, measured either from the event time (for regular KTables) or from the window end (for windowed KTables), to buffer up each key before emitting them downstream.
    2. How much memory to use for buffering events (and what to do when it fills up). Suppress needs to remember the last observed event for each key, which takes memory. You can configure the amount of memory either by number of keys or by raw heap size. You can configure what to do when the buffer runs out of memory:
      1. Emit when full. This option is basically "best effort" suppression. If suppress runs out of memory, it will emit the oldest event.
      2. Spill to disk. This option is for when you need suppression to be strict. If the operator runs out of memory, it will allocate a RocksDB database and begin storing new records there. This may be complicated to implement, and consequently might be a future addition.
      3. Shut down. There is an option to run with the assumption that your "live set" of suppression events will fit within memory. If this assumption turns out to be false, the application will run out of memory and shut down. This may sound strange, but it's no different than using an InMemoryKeyValueStore or even an ArrayList. In the case of suppress, you can either leave the memory buffer unconstrained, which will cause an application crash if it runs out of memory, or you can configure it with a memory constraint and tell it to shut down gracefully when it runs out. Either one is safe from a correctness standpoint, but the latter will result in faster recovery. 

Suppression patterns

Final window results per key

You can achieve exactly one event per key per window by configuring:

  • A grace period on the window. This would be a balance of how late you expect events to be and how long you're willing to wait before finalizing the window and emitting the result.
  • Suppress with "emitFinalResultsOnly". There's a specific "Suppress" configuration for this feature. It allows you to configure the memory buffer and decide whether to shut down or switch to disk.

Best-effort rate limit per key

Configure:

  • Emit-when-full memory strategy
  • The inverse of the desired rate. If the desired rate is R (events per second), the inverse is the buffering time: 1/R (seconds per event).

Strict rate limit per key

Configure:

  • Unlimited memory (either with spill-to-disk or just not limiting the memory)
  • The inverse of the desired rate. If the desired rate is R (events per second), the inverse is the buffering time: 1/R (seconds per event).

Details and Public Interfaces

There are two primary changes we're proposing: Altering the window definition and adding a KTable#suppress operator.

Add Grace Period to Window Spec Builders

For our new Suppress operator to support window final results, we need to define a point at which window results are actually final!

Currently, the only hint we have about this is the retention period (called until/maintainMs/retentionPeriod). Once we drop an old window, we can obviously never update it again. However, this isn't a convenient "final" point for our purpose. Retention time is typically very large (the default is one day), and users of IQ may need to keep the retention time large in order to support queries even over final windows. Plus, how long a window is retained is really a property of the window store implementation. In principle, an in-memory implementation might choose to retain events for a short time while a remote distributed store may keep them forever. This shouldn't prohibit the usage of final results, though.

To resolve this conflict, we're adding a new concept to the window spec: grace period. This is an amount of time that the window should accept late-arriving events after the window ends. After the grace period passes, the window is considered "closed" and will never be updated again. The grace period places a lower-bound constraint on the retention time, but otherwise has no implication on retention.

We will add a "grace()" configuration to the window spec builders (Windows, TimeWindows, JoinWindows, UnlimitedWindows, and SessionWindows.

We are also deprecating the retention time and segment interval as specified on the window spec itself and moving them to the lower-level store configurations.

+ /**
+  * Reject late events that arrive more than {@code afterWindowEnd}
+  * after the end of its window.
+  *
+  * @param afterWindowEnd The grace period to admit late-arriving events to a window.
+  * @return this updated builder
+  */
+ public Windows<W> grace(final Duration afterWindowEnd);

+ /**
+  * Return the window grace period (the time to admit
+  * late-arriving events after the end of the window.
+  */
+ public Duration grace();


/**
 ...
+ * @deprecated since 2.1. Use {@link Joined#retention()} 
+ * or {@link Materialized#retention}
+ * or directly configure the retention in a store supplier and use
+ * {@link Materialized#as(WindowBytesStoreSupplier)}.
 */
+ @Deprecated
public Windows<W> until(final long durationMs);

/**
 ...
 * @deprecated since 2.1. Use {@link Joined#retention()} or {@link Materialized#retention} instead.
 */
@Deprecated
public long maintainMs();


/**
 ...
 * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
 */
@Deprecated
public long segmentInterval();

Note for UnlimitedWindows in particular, the grace period is meaningless because the window never ends. Therefore, we provide this interface instead:

/**
 * Throws an {@link IllegalArgumentException} because the window never ends and the
 * grace period is therefore meaningless.
 *
 * @throws IllegalArgumentException on every invocation
 */
@Override
public Windows<UnlimitedWindow> grace(final Duration afterWindowEnd);


The Window/Session BytesStoreSupplier interface already includes retention period. The existing behavior is that that retention period overrides maintainMs if set on the window. We'll preserve this behavior.

We will add retention to the Joined config object, which is already used to configure the underlying stores for KStream joins:

/**
 * Configure the retention time for events in the join.
 * Events are only considered for joining while they are retained.
 * 
 * @param retention
 * @return
 */
public Joined<K, V, VO> withRetention(final Duration retention);


public Duration retention();

Likewise, we will add retention to Materialized:

/**
 * Configure retention period for window and session stores. Ignored for key/value stores.
 *
 * Overridden by pre-configured store suppliers
 * ({@link Materialized#as(SessionBytesStoreSupplier)} or {@link Materialized#as(WindowBytesStoreSupplier)}).
 *
 * @return itself
 */
public Materialized<K, V, S> withRetention(final Duration retention);

New Suppress Operator

We will add the new operator to KTable:

public interface KTable<K, V> {


 /**
 * Suppress some updates from this changelog stream, determined by the supplied {@link Suppress}.
 *
 * This controls what updates downstream table and stream operations will receive.
 *
 * @param suppress Configuration object determining what, if any, updates to suppress.
 * @return A new KTable with the desired suppress characteristics.
 */
 KTable<K, V> suppress(final Suppress<K, V> suppress);


}

Note the absence of a variant taking Materialized. The result of a suppression will always be (eventually) consistent with the source KTable, so I'm thinking right now that it would be "bad advice" to present the option to materialize it.

We will also create the config object Suppress:

public interface Suppress<K, V> {

    interface TimeDefinition<K, V> {
        long time(ProcessorContext context, K k, V v);
    }

    enum BufferFullStrategy {
        EMIT,
        SPILL_TO_DISK,
        SHUT_DOWN
    }

    interface BufferConfig<K, V> {
        static <K, V> BufferConfig<K, V> withBufferKeys(final long numberOfKeysToRemember)

        BufferConfig<K, V> bufferKeys(final long numberOfKeysToRemember);

        static <K, V> BufferConfig<K, V> withBufferBytes(final long bytesToUseForSuppressionStorage,
                                                         final Serializer<K> keySerializer,
                                                         final Serializer<V> valueSerializer)

        BufferConfig<K, V> bufferBytes(final long bytesToUseForSuppressionStorage, 
                                       final Serializer<K> keySerializer,
                                       final Serializer<V> valueSerializer);

        static <K, V> BufferConfig<K, V> withBufferFullStrategy(final BufferFullStrategy bufferFullStrategy)

        BufferConfig<K, V> bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
    }

    interface IntermediateSuppression<K, V> {
        static <K, V> IntermediateSuppression<K, V> withEmitAfter(final Duration timeToWaitForMoreEvents)

        IntermediateSuppression<K, V> emitAfter(final Duration timeToWaitForMoreEvents);

        static <K, V> IntermediateSuppression<K, V> withBufferConfig(final BufferConfig<K, V> bufferConfig)

        IntermediateSuppression<K, V> bufferConfig(final BufferConfig<K, V> bufferConfig);
    }

    static <K extends Windowed, V> Suppress<K, V> emitFinalResultsOnly(final BufferConfig<K, V> bufferConfig)

    static <K, V> Suppress<K, V> intermediateEvents(final IntermediateSuppression<K, V> intermediateSuppression)
}

Metrics

Along with the suppression operator, we will add several metrics. Note that suppress will not add to the skipped-records metrics. "Skipped" records are records that are for one reason or another invalid. "Suppressed" records are intentionally dropped, just like filtered records. Likewise with events arriving later than the grace period for windows.

Note: I'm not proposing roll-up metrics for these. They would be reported at the processor-node level. I suspect this is actually fine, and roll-ups can easily be added later if necessary.

Metrics we'll add:

  • late records (new records for expired windows) are currently metered as "skipped-records" and logged at WARN level. As noted, this is not correct, so we will change the logs to DEBUG level and add new metrics:
    • average and max observed lateness of all records: to help configure the grace period
      • (INFO) event-lateness-[avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of dropped events for closed windows
      • (INFO) late-event-drop-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
  • intermediate event suppression
    • current, average, and peak intermediate suppression buffer size
      • (INFO) suppression-mem-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • current, average, and peak intermediate suppression disk buffer size (only present when using the "SPILL_TO_DISK" buffer-full strategy)
      • (INFO) suppression-disk-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • intermediate suppression buffer eviction rate and total: to how often events are emitted early (only present when using the "EMIT" buffer-full strategy)
      • (INFO) suppression-mem-buffer-evict-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of intermediate event suppressions
      • (INFO) intermediate-event-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • min and average intermediate suppression overtime: to determine whether the intermediate suppression emitAfter is delaying longer than necessary. This metric may be unnecessary, since it's equivalent to (timeToWaitForMoreEventsConfig - observedLatenessMetric).
      • (INFO) intermediate-event-suppression-overtime-[min | avg] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>

Examples

Here are some examples of programs using the new operator to achieve various goals.

Final window results per key

Imagine we wish to send an alert if a user has fewer than 3 events in an hour. For the example, we'll wait up to 10 minutes after the hour ends for late-arriving events.

builder.<Integer, String>stream("events")
    .groupByKey()
    .windowedBy(TimeWindows.of(3600_000).grace(Duration.ofMinutes(10))
    .count()
    .suppress(emitFinalResultsOnly(withBufferFullStrategy(SHUT_DOWN)))
    .toStream()
    .filter((key, value) -> value < 3)
    .foreach((key, value) -> sendAlert("User " + key.key() + " had fewer than 3 events in window " + key.window()));

Note that we can handle limited memory in a few different ways:

// Option 1: expect not to run out of memory
windowCounts
  .suppress(emitFinalResultsOnly(withBufferKeys(Long.MAX_VALUE)))


// Option 2: shut down gracefully if we need too much memory
windowCounts
  .suppress(
    emitFinalResultsOnly(
      withBufferBytes(5_000_000, keySerializer, valueSerializer)
        .bufferFullStrategy(SHUT_DOWN)
    )
  );


// Option 3: Start using disk if we need too much memory
windowCounts
  .suppress(
    emitFinalResultsOnly(
      withBufferBytes(5_000_000, keySerializer, valueSerializer)
        .bufferFullStrategy(SPILL_TO_DISK)
    )
  );

Any of these constructions yield a strict guarantee that each windowed key will emit exactly one event.

Rate-limited updates

Suppose we wish to reduce the rate of updates from a KTable to roughly one update every 30s per key. We don't want to use too much memory for this, and we don't think we'll have updates for more than 1000 keys at any one time.

table.suppress(
  intermediateEvents(
    withEmitAfter(Duration.ofSeconds(30))
      .bufferConfig(
        withBufferKeys(1000).bufferFullStrategy(EMIT)
      )
  )
).toStream(); // etc.


Compatibility, Deprecation, and Migration Plan

The only part of the KIP that's relevant to existing APIs is the deprecation of Windows#until/maintainMs. I've described above how the deprecation warnings will look, and also what new APIs will replace them. All the implementations will be done in such a way that existing Streams applications will have exactly the same semantics before and after this KIP, so there's no concern about continuing to use the deprecated APIs.

One other change we could consider in the future is to revisit the state store caching mechanism, but that also serves the function of limiting i/o to the state store, so I think that should be a separate discussion.

Rejected Alternatives

There are many alternative ways to achieve this behavior.

At the top of the list, I also considered having dedicated operators for final events on windowed tables and update suppression on non-windowed ones. But the current proposal supports the same behavior with just one new operator.

We also considered having windowed computations directly provide the "final results" feature via an "Emitted" config object, but ultimately settled on adding the grace period to the window and letting "suppress" deal with suppressing all but the final result.

In fact, I previously proposed not to support "final results" directly, but instead to allow a suppression with an upper bound on lateness. using the same time for this lateness bound and intermediate suppression would naturally yield final results only. But we judged that this API was too esoteric. The version we have now is much more straightforward for this use case.


  • No labels