Status
Current state: Under Discussion
Discussion thread: thread
JIRA: KAFKA-8403
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Sometimes, the user may want to query the intermediate state of the suppression. However, as of 2.3, the KTable#suppress
operator (added 2.1) lacks this functionality.
Proposed Changes
This KIP proposes to make the suppression buffer state queriable, and add a variant method to specify the queryableStoreName: KTable#suppress(Suppressed, String)
. User can query the state of suppression buffer like the following:
// A queryable interface to the suppression buffer, named "suppression-buffer". final ReadOnlyKeyValueStore<String, String> myMapStore = kafkaStreams.store("suppression-buffer", QueryableStoreTypes.suppressionBuffer(Serdes.String(), Serdes.String()));
Public Interfaces
Add a new method, KTable#suppress(Suppressed, String)
public interface KTable<K, V> { ... /** * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration. * <p> * This controls what updates downstream table and stream operations will receive. * * @param suppressed Configuration object determining what, if any, updates to suppress * @param queryableStoreName A queryableStoreName of suppression buffer * @return A new {@code KTable} with the desired suppression characteristics. */ KTable<K, V> suppress(final Suppressed<? super K> suppressed, final String queryableStoreName); ....
Add a new QueryableStoreType, SuppressionBufferType
public final class QueryableStoreTypes { ... public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> timestampedKeyValueStore() { return new TimestampedKeyValueStoreType<>(); } ... public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> { private final Serde<K> keySerde; private final Serde<V> valueSerde; SuppressionBufferType(final Serde<K> keySerde, final Serde<V> valueSerde) { super(new HashSet<>(Arrays.asList( TimeOrderedKeyValueBuffer.class, ReadOnlyKeyValueStore.class))); this.keySerde = keySerde; this.valueSerde = valueSerde; } @Override public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider storeProvider, final String storeName) { return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName); } public Serde<K> keySerde() { return keySerde; } public Serde<V> valueSerde() { return valueSerde; } } }
Compatibility, Deprecation, and Migration Plan
None.
Rejected Alternatives
Adding KTable#suppress(Suppressed, Materialized)
Since KTable#suppress
does not change the Key, Value type of the KTable
instance, Materialized#with[KeySerde, ValueSerde]
should be ignored. Since the underlying data structure, InMemoryTimeOrderedKeyValueBuffer
, does not support caching or logging, Materialized#withCaching[Enabled, Disabled]
and Materialized#withLogging[Enabled, Disabled]
options are also ignored. For these reasons, a Materialized
instance is too big for parameter, unless InMemoryTimeOrderedKeyValueBuffer follows the other StateStore classes.