...
No Format |
---|
public interface Suppressed<K> { /** * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly * enforce the time bound and never emit early. */ interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {} /** * Marker interface for a buffer configuration that will strictly enforce size constraints * (bytes and/or number of records) on the buffer, so it is suitable for reducing duplicate * results downstream, but does not promise to eliminate them entirely. */ interface EagerBufferConfig extends BufferConfig<EagerBufferConfig> {} interface BufferConfig<BC extends BufferConfig<BC>> { /** * Create a size-constrained buffer in terms of the maximum number of keys it will store. */ static BufferConfig<?>EagerBufferConfig maxRecords(final long recordLimit); /** * Set a size constraint on the buffer in terms of the maximum number of keys it will store. */ BC withMaxRecords(final long recordLimit); /** * Create a size-constrained buffer in terms of the maximum number of bytes it will use. */ static BufferConfig<?>EagerBufferConfig maxBytes(final long byteLimit); /** * Set a size constraint on the buffer, the maximum number of bytes it will use. */ BC withMaxBytes(final long byteLimit); /** * Create a buffer unconstrained by size (either keys or bytes). * * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. * * If there isn't enough heap available to meet the demand, the application will encounter an * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that * JVM processes under extreme memory pressure may exhibit poor GC behavior. * * This is a convenient option if you doubt that your buffer will be that large, but also don't * wish to pick particular constraints, such as in testing. * * This buffer is "strict" in the sense that it will enforce the time bound or crash. * It will never emit early. */ static StrictBufferConfig unbounded(); /** * Set the buffer to be unconstrained by size (either keys or bytes). * * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. * * If there isn't enough heap available to meet the demand, the application will encounter an * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that * JVM processes under extreme memory pressure may exhibit poor GC behavior. * * This is a convenient option if you doubt that your buffer will be that large, but also don't * wish to pick particular constraints, such as in testing. * * This buffer is "strict" in the sense that it will enforce the time bound or crash. * It will never emit early. */ StrictBufferConfig withNoBound(); /** * Set the buffer to gracefully shut down the application when any of its constraints are violated * * This buffer is "strict" in the sense that it will enforce the time bound or shut down. * It will never emit early. */ StrictBufferConfig shutDownWhenFull(); /** * Sets the buffer to use on-disk storage if it requires more memory than the constraints allow. * * This buffer is "strict" in the sense that it will never emit early. */ StrictBufferConfig spillToDiskWhenFull(); /** * Set the buffer to just emit the oldest records when any of its constraints are violated. * * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing * duplicate results downstream, but does not promise to eliminate them. */ BufferConfigEagerBufferConfig emitEarlyWhenFull(); } /** * Configure the suppression to emit only the "final results" from the window. * * By default all Streams operators emit results whenever new results are available. * This includes windowed operations. * * This configuration will instead emit just one result per key for each window, guaranteeing * to deliver only the final result. This option is suitable for use cases in which the business logic * requires a hard guarantee that only the final result is propagated. For example, sending alerts. * * To accomplish this, the operator will buffer events from the window until the window close (that is, * until the end-time passes, and additionally until the grace period expires). Since windowed operators * are required to reject late events for a window whose grace period is expired, there is an additional * guarantee that the final results emitted from this suppression are eventually consistent with the upstream * operator and its queriable state, if enabled. * * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. * This is required to be a "strict" config, since it would violate the "final results" * property to emit early and then issue an update later. * @param <K> The key type for the KTable to apply this suppression to. "Final results" mode is only available * on Windowed KTables (this is enforced by the type parameter). * @return a "final results" mode suppression configuration */ static Suppressed<Windowed> untilWindowCloses(final StrictBufferConfig bufferConfig); /** * Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces * the first record in the buffer but does <em>not</em> re-start the timer. * * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events. * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. * @param <K> The key type for the KTable to apply this suppression to. * @return a suppression configuration */ static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig); /** * Use the specified name for the suppression node in the topology. * <p> * This can be used to insert a suppression without changing the rest of the topology names * (and therefore not requiring an application reset). * <p> * Note however, that once a suppression has buffered some records, removing it from the topology would cause * the loss of those records. * <p> * A suppression can be "disabled" with the configuration {@code untilTimeLimit(Duration.ZERO, ...}. * * @param name The name to be used for the suppression node and changelog topic * @return The same configuration with the addition of the given {@code name}. */ Suppressed<K> withName(final String name); } |
...
Offset order is maintained when a key is updated, regardless of timestamp:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | emit | notes |
---|---|---|---|---|---|---|---|
0 | A | x | 0 | (A,x,0) | --- | --- | |
1 | A | y | 1 | (A,x,1) | --- | --- | Subsequent update to A overwrites the prior value |
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | emit | notes |
---|---|---|---|---|---|---|---|
0 | A | x | 1 | (A,x,1) | --- | --- | |
1 | A | w | 0 | (A,w,0) | --- | --- | Subsequent update to A overwrites the prior value (even though this is an earlier event by time) |
The behavior is straightforward with no late events
Enforcing a key limit of 2:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | emit | notes |
---|---|---|---|---|---|---|---|
0 | A | w | 0 | (A,w,0) | --- | --- | |
1 | A | x | 1 | (A,x,1) | --- | --- | |
2 | B | y | 2 | (A,x,1) | (B,y,2) | --- | |
3 | C | z | 3 | (B,y,2) | (C,z,3) | (A,x,1) | A is the oldest when we violate the key constraint |
Enforcing a byte limit of 3 (each character is one byte)
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | xx | 0 | (A,xx,0) | --- | --- | --- | |
1 | A | yy | 1 | (A,yy,1) | --- | --- | --- | |
2 | B | zz | 2 | (B,zz,2) | --- | --- | (A,yy,1) | A is the oldest entry when the size constraint is violated, so we emit it. |
Enforcing "emit after 2ms":
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | w | 0 | (A,w,0) | --- | --- | --- | |
1 | A | x | 1 | (A,x,1) | --- | --- | --- | |
2 | B | y | 2 | (A,x,1) | (B,y,2) | --- | --- | |
3 | C | z | 3 | (B,y,2) | (C,z,3) | --- | (A,x,1) | The stream time is now 3, so we emit all the records up to time 1 (this is just A) |
Note: newly added late events can be immediately evicted.
Enforcing "emit after 2ms":
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | w | 3 | (A,w,3) | --- | --- | --- | |
1 | A | x | 1 | --- | --- | --- | (A,x,1) | The stream time is 3, and the timestamp of A is now 1, so we have to emit it. |
2 | B | y | 1 | --- | --- | --- | (B,y,1) | The stream time is 3, and the timestamp of B is 1, so we have to emit it. |
Likewise with a key constraint of 2:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | emit | notes |
---|---|---|---|---|---|---|---|
0 | A | w | 0 | (A,w,0) | --- | --- | |
1 | A | x | 1 | (A,x,1) | --- | --- | |
2 | B | y | 2 | (A,x,1) | (B,y,2) | --- | |
3 | C | z | 0 | (A,x,1) | (B,y,2) | (C,z,0) | Even though it is the most recently added, C is still the oldest event when the key constraint is violated |
And of course with a size constraint of 3 bytes as well:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | xx | 0 | (A,xx,0) | --- | --- | --- | |
1 | A | yy | 1 | (A,yy,1) | --- | --- | --- | |
2 | B | zz | 0 | (A,yy,1) | --- | --- | (B,zz,0) | Even though B is the most recently added event, it is still the oldest one by timestamp when the size constraint is violated |
Big records can push multiple events out of the buffer
Size constraint of 3 bytes:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | x | 0 | (A,x,0) | --- | --- | --- | |
1 | B | y | 1 | (A,x,0) | (B,y,1) | --- | --- | |
2 | C | zzz | 2 | (C,zzz,2) | --- | --- | (A,x,0),(B,y,1) | No other records can fit in the buffer with C, and A and B are both older than C |
In fact, events can be so big they don't fit in the buffer at all.
Still 3 bytes:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | x | 0 | (A,x,0) | --- | --- | --- | |
1 | B | y | 1 | (A,x,0) | (B,y,1) | --- | --- | |
2 | C | zzzz | 2 | --- | --- | --- | (A,x,0),(B,y,1),(C,zzzz,2) | A and B are both older than C, so they must be emitted before C, and C itself doesn’t fit in the buffer, so it must then be immediately emitted. |
Rejected alternative: evicting by offset instead of timestamp.
This causes strange behavior when there is a time constraint involved:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
1 | A | x | 2 | (A,x,2) | --- | --- | --- | |
2 | B | y | 1 | (A,x,2) | (B,y,1) | --- | --- | |
3 | C | z | 3 | (A,x,2) | (B,y,1) | (C,z,3) | --- | Even though we B is old enough to emit, it’s not at the head of the queue, so we can’t emit it |
4 | C | zz | 4 | (C,zz,4) | --- | --- | (A,x,2),(B,y,1) | It’s now time to emit A, and once we do, it’s no longer blocking B, so we can emit it as well. |
Rejected alternative: evicting by timestamp only when the buffer is time constrained, and using offset order otherwise
...