...
Current state: Under Discussion
Discussion thread: thread
JIRA: KAFKA-8403
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Sometimes, the user may want to query the intermediate state of the suppression. However, as of 2.3, the KTable#suppress
operator (added 2.1) lacks this functionality.
Proposed Changes
This KIP proposes to make the suppression buffer state queriable, and add a variant method to specify the queryableStoreName: KTable#suppress(Suppressed, String)
. User can query the state of suppression buffer like the following:
Code Block | ||
---|---|---|
| ||
// A queryable interface to the suppression buffer, named "suppression-buffer".
final ReadOnlyKeyValueStore<String, String> myMapStore =
kafkaStreams.store("suppression-buffer", QueryableStoreTypes.suppressionBuffer(Serdes.String(), Serdes.String())); |
Public Interfaces
From 2.1, Kafka Streams provides a way to suppress the intermediate state of KTable (KIP-328: Ability to suppress updates for KTables). The 'KTable#suppress' operator introduced in KIP-328 controls what updates downstream table and stream operations will receive. With this feature, the contents of the upstream table are disjointed into two groups, one for the intermediate state in the suppression buffer and the other for final states emitted to the downstream table. The user can query the associated value to a specific key in the downstream table by querying the upstream table (KIP-67: Queryable state for Kafka Streams), since all of the key-value mappings in the downstream table are also stored in the upstream table.
However, there is a limitation; if the user only wants to retrieve the associated value to a specified key (like `ReadOnlyKeyValueStore#get`), it is okay. But if what the user wants is getting an iterator to a suppressed view (like `ReadOnlyKeyValueStore#range` or `ReadOnlyKeyValueStore#all`), we stuck in trouble - since there is no way to identify which key is flushed out beforehand.
One available workaround is materializing the downstream table like `downstreamTable.filter(e -> true, Materialized.as("final-state"))`. However, this way is cumbersome.
Proposed Changes
This KIP proposes to add an option to make suppression state queriable by adding a queriable flag to Suppressed.
Public Interfaces
...
Code Block | ||
---|---|---|
| ||
public interface KTable<K, V>Suppressed<K> extends NamedOperation<Suppressed<K>> { ... /** * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configurationMake the suppression queryable. * <p> * This@return controlsThe whatsame updatesconfiguration downstreamwith table and stream operations will receivequery enabled. */ * @param suppressed Configuration object determining what, if any, updates to suppressSuppressed<K> enableQuery(); * @param queryableStoreName A queryableStoreName of suppression buffer/** * @return AReturns new {@code KTable} withtrue iff the desiredquery suppressionis characteristicsenabled. */ KTable<K, V> suppress(final Suppressed<? super K> suppressed, final String queryableStoreNameboolean isQueryEnabled(); .... |
Add a new QueryableStoreType, SuppressionBufferType
Code Block | ||
---|---|---|
| ||
public final class QueryableStoreTypes {
...
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> timestampedKeyValueStore() {
return new TimestampedKeyValueStoreType<>();
}
...
public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
SuppressionBufferType(final Serde<K> keySerde, final Serde<V> valueSerde) {
super(new HashSet<>(Arrays.asList(
TimeOrderedKeyValueBuffer.class,
ReadOnlyKeyValueStore.class)));
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
@Override
public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
}
public Serde<K> keySerde() {
return keySerde;
}
public Serde<V> valueSerde() {
return valueSerde;
}
}
} |
Compatibility, Deprecation, and Migration Plan
None.
Rejected Alternatives
Adding KTable#suppress(Suppressed, Materialized)
The user can query the suppressed view with Suppressed#name
, if Suppressed.isQueryEnabled
is true.
Calling Suppressed#enableQuery
without specifying name with Suppressed#withName
is not allowed. For this case, IllegalArgumentException
is thrown.
Compatibility, Deprecation, and Migration Plan
None.
Rejected Alternatives
KTable#suppress(Suppressed, Materialized<K, V, KeyValueStore>)
This approach feels more consistent with existing APIs with Materialized
variant (e.g., KTable#filter(Predicate)
- KTable#filter(Predicate, Materialized)
) at first appearance. However, this approach introduces two concepts of the name for the same operation: Suppressed#name
and Materialized#name
. It is not feasible.
The current API for the Materialized
variant is just a legacy of nameless operators before KIP-307. In this case, we already have Suppressed
class and don't need to keep consistency with the old Materialized
variant methods. So rejected.
KTable#suppress(Suppressed, String)
Another alternative is passing the state store name directly. This approach is neither consistent with the existing APIs nor has clear semantics, since it also introduces two concepts for the same operation. So rejectedSince KTable#suppress
does not change the Key, Value type of the KTable
instance, Materialized#with[KeySerde, ValueSerde]
should be ignored. Since the underlying data structure, InMemoryTimeOrderedKeyValueBuffer
, does not support caching or logging, Materialized#withCaching[Enabled, Disabled]
and Materialized#withLogging[Enabled, Disabled]
options are also ignored. For these reasons, a Materialized
instance is too big for parameter, unless InMemoryTimeOrderedKeyValueBuffer follows the other StateStore classes.