...
Change to TimeWindowedKStream.aggregate - adding overloaded method with additional parameter.
Proposed Changes
Add overloaded aggregate method, which accepts additional lateMessagesTopicName as last parameter:
...
Code Block language java <VR> KTable<Windowed<K>, VR> aggregate(final
...
Initializer<VR> initializer,
...
final
...
Aggregator<? super K, ? super V,
...
VR> aggregator,
...
final Named named,
...
final
...
Materialized<K, VR,
...
WindowStore<Bytes, byte[]>> materialized,
...
final String lateMessagesTopicName);
- Optionally create additional SinkNode, if respective parameter is filled.
- Conditionally forward messages to the SinkNode
- Minor change to the forward-implementation: by default each message is sent to all sub-nodes, however new node-for-late-messages should be excluded from generic processing.
...
Alternative approach would be providing api-users ability to define additional handler for late-messages, e.g.
...
Code Block | ||
---|---|---|
| ||
<VR> KTable<Windowed<K>, VR> aggregate(final |
...
Initializer<VR> initializer, |
...
final |
...
Aggregator<? super K, ? super V, |
...
VR> aggregator, |
...
final Named named, |
...
final |
...
Materialized<K, VR, |
...
WindowStore<Bytes, byte[]>> materialized, |
...
final |
...
BiConsumer<? super K, ? super |
...
V> lateMessageConsumer); |
However, common use-case of kafka-streams is writing data to topics.
...