Table of Contents |
---|
Status
Current state: Under Discussion Accepted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
To resolve these issues, we propose to add a new API to control windowed aggregation output behavior and possibly other behavior later on.
Public Interfaces
There are three options to change the public interface in order of preference:
Option 1:
In existing TimeWindowedKStream
and SessionWindowedKStream
interfaces.
Code Block |
---|
public interface TimeWindowedKStream<K, V> { TimeWindowedKStream<K, V> emitFinalemitStrategy(EmitStrategy strategy); } public interface SessionWindowedKStream<K, V> { SessionWindowedKStream<K, V> emitFinal(); } |
Similar API will also be added to TimeWindowedCogroupedKStream and
SessionWindowedCogroupedKStream.
Option 2:
In existing KGroupedStream
and CogroupedKStream
interfaces.
emitStrategy(EmitStrategy strategy);
}
public interface EmitStrategy {
enum StrategyType {
ON_WINDOW_CLOSE, // output final result
ON_WINDOW_UPDATE; // output for every record
}
StrategyType type();
static EmitStrategy onWindowClose() {
return new WindowCloseStrategy();
}
static EmitStrategy onWindowUpdate() {
return new WindowUpdateStrategy();
}
}
public class WindowCloseStrategy implements EmitStrategy {
WindowCloseStrategy() {}
StrategyType type() {
return ON_WINDOW_CLOSE;
}
} |
I also propose to add following two metrics measuring the latency to emit final and number of records emitted for emit final:
Each exposed metric will have the following tags:
- thread-id = [thread ID],
- task-id = [task ID]
- processor-node-id = [node ID]
The following metrics will be exposed in the Kafka Streams' metrics
- emit-final-latency-max (max latency to emit final records when it COULD be emitted)
- emit-final-latency-avg (avg latency to emit final records when it could be emitted)
- emit-final-records-rate (rate of records emitted when it COULD be emitted)
- emit-final-records-total (total number of records emitted)
The recording level for all metrics will be DEBUG and their group will be stream-processor-node-metrics.
As for the implementation to support emit-final, for time windows (hopping, tumbling, sliding) we already have the needed API to range-over the finalized and immutable windows to emit them, however for session windows we do not yet have the corresponding APIs to do so. Hence as part of this KIP, we also propose to add a new range API in the SessionStore to support this feature. Note this will be only add on the SessionStore API not the ReadOnlySessionStore API so that it would not be exposed for interactive query usage.
Code Block |
---|
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {
/**
* Return all the session window entries that ends between the specified range (both ends are inclusive).
* This function would be used to retrieve all closed and immutable windows.
*
* @param earliestSessionEndTime earliest session end time to search from, inclusive
* @param latestSessionEndTime latest session end time to search to, inclusive
*/
default KeyValueIterator<Windowed<K>, AGG> findSessions(final long earliestSessionEndTime,
final long latestSessionEndTime) {
throw new UnsupportedOperationException(
"This API is not supported by this implementation of SessionStore.");
}
// other existing functions
} |
Proposed Changes
We introduce several options of API changes discussed above to support output final result for windowed aggregations. We also introduce two metrics to measure the emit final latency as well as the number of records emitted.
Compatibility, Deprecation, and Migration Plan
We also plan to introduce a new default state store supporting more efficient time range lookup for emit final windows. This won't be backward compatible with existing state store and windowed aggregation type. However,
if users continue to use their existing state stores, this should be backward compatible with worse performance maybe.
Rejected Alternatives
First rejected option
Code Block |
---|
public interface KGroupedStream<K, V> { <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows); // Already exist TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows); // Already exist SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows); // Already exist <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows, WindowConfigEmitConfig config); // new TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows, WindowCongigEmitConfig config); // New SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows, WindowConfigEmitConfig config); // New } public interface EmitConfig { enum ConfigType { EMIT_FINAL, // output final result EMIT_EAGER; // output for every record } ConfigType type(); static EmitConfig emitFinal() { return new EmitFinalConfig(); } static EmitConfig emitEager() { return new EmitEagerConfig(); // EmitEagerConfig will be similar to EmigFinalConfig } } public class EmitFinalConfig implements EmitConfig { EmitFinalConfig() {} ConfigType type() { return EMIT_FINAL; } } |
Similar API will be added to CogroupedKStream.
...
This option is rejected because
- It creates more overloading of the
windowdedBy
function. - It operates on
KGroupedStream
and tries to make it a windowed stream as well as configuring the emit policy for the stream. I think this is against the builder pattern.
Second rejected option
In existing Windows
, SlidingWindows
and SessionWindows
classes.
Code Block |
---|
public abstract class Windows<W extends Window> { public abstract EmitConfig getEmitConfig(); // New } public final class SlidingWindows { public abstract EmitConfig getEmitConfig(); // New } public final class SessionWindows { public abstract EmitConfig getEmitConfig(); // New } public interface EmitConfig { enum ConfigType { EMIT_FINAL, // output final result EMIT_EAGER; // output for every record } ConfigType type(); static EmitConfig emitFinal() { return new EmitFinalConfig(); } static EmitConfig emitEager() { return new EmitEagerConfig(); // EmitEagerConfig will be similar to EmigFinalConfig } } public class EmitFinalConfig implements EmitConfig { EmitFinalConfig() {} ConfigType type() { return EMIT_FINAL; } } |
...
We introduce several options of API changes discussed above to support output final result for windowed aggregations.
Compatibility, Deprecation, and Migration Plan
We also plan to introduce a new default state store supporting more efficient time range lookup for emit final windows. This won't be backward compatible with existing state store and windowed aggregation type. However,
if users continue to use their existing state stores, this should be backward compatible with worse performance maybe.
Rejected Alternatives
In the Public Interface section, I listed 3 API change options. I prefer option 1 and the reasons are:
- It's concise and is a builder kind of interface which is more descriptive when applied to
TimeWindowedKStream
andSessionWindowedKStream
- In option 2, I feel
WindowConfig
should already be part of Window. Then in option 3, I feelEMIT_FINAL
andEMIT_EAGER
should be applied toStream
instead ofWindow
The drawbacks of option 1 and 2 are:
...
This option is rejected because
- Emit config should operator on the stream and configure how it should be outputted. Window definition shouldn't control that.