...
Proposed Changes
This KIP proposes to add a variant of suppress method with Materialize configuration, 'KTable#suppress(Suppressed, Materialized)'an option to make suppression state queriable by adding a queriable flag to Suppressed.
Public Interfaces
...
Code Block | ||
---|---|---|
| ||
public interface Suppressed<K> KTable<K,extends V>NamedOperation<Suppressed<K>> { ... /** * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configurationMake the suppression queryable. * <p> * This@return controlsThe whatsame updatesconfiguration downstreamwith table and stream operations will receivequery enabled. */ * @param suppressed Configuration object determining what, if any, updates to suppressSuppressed<K> enableQuery(); * @param queryableStoreName A queryableStoreName of suppression buffer/** * @return AReturns new {@code KTable} withtrue iff the desiredquery suppressionis characteristicsenabled. */ KTable<K, V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materializedboolean isQueryEnabled(); .... |
2. Scala DSL
Code Block | ||
---|---|---|
| ||
class KTable[K, V](val inner: KTableJ[K, V]) {
...
/**
* Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration.
*
* This controls what updates downstream table and stream operations will receive.
*
* @param suppressed Configuration object determining what, if any, updates to suppress.
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
* should be materialized.
* @return A new KTable with the desired suppression characteristics.
* @see `org.apache.kafka.streams.kstream.KTable#suppress`
*/
def suppress(suppressed: org.apache.kafka.streams.kstream.Suppressed[_ >: K],
materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
new KTable(inner.suppress(suppressed, materialized))
.... |
Compatibility, Deprecation, and Migration Plan
The user can query the suppressed view with Suppressed#name
, if Suppressed.isQueryEnabled
is true.
Calling Suppressed#enableQuery
without specifying name with Suppressed#withName
is not allowed. For this case, IllegalArgumentException
is thrown.
Compatibility, Deprecation, and Migration Plan
None.
Rejected Alternatives
KTable#suppress(Suppressed, Materialized<K, V, KeyValueStore>)
This approach feels more consistent with existing APIs with Materialized
variant (e.g., KTable#filter(Predicate)
- KTable#filter(Predicate, Materialized)
) at first appearance. However, this approach introduces two concepts of the name for the same operation: Suppressed#name
and Materialized#name
. It is not feasible.
The current API for the Materialized
variant is just a legacy of nameless operators before KIP-307. In this case, we already have Suppressed
class and don't need to keep consistency with the old Materialized
variant methods. So rejected.
KTable#suppress(Suppressed, String)
Another alternative is passing the state store name directly. This approach is neither consistent with the existing APIs nor has clear semantics, since it also introduces two concepts for the same operation. So rejectedNone.