...
Sometimes, the user may want to query the intermediate state of the suppression. However, as of 2.34, the KTable#suppress
operator (added 2.1) lacks this functionality.
Proposed Changes
This KIP proposes the following to make the suppression buffer state queriable, and add a variant method to specify the queryableStoreName: :
- Add ReadOnlySuppressionBuffer, SuppressionBuffer.
- Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder].
- Add a new QueryableStoreType, SuppressionBufferType.
- Add KTable#suppress(Suppressed,
...
- Materialized).
- Add TopologyTestDriver#getSuppressionBuffer.
User can query the state of suppression buffer like the following:
Code Block |
---|
|
// A queryable interface to the suppression buffer, named "suppression-buffer".
final ReadOnlyKeyValueStore<StringReadOnlySuppressionBuffer<String, String> myMapStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType("suppression-buffer", QueryableStoreTypes.suppressionBuffer(Serdes.String(), Serdes.String())); |
Public Interfaces
...
1. Add ReadOnlySuppressionBuffer, SuppressionBuffer
SuppressionBuffer
is a writable interface of suppression buffer; dislike to TimeOrderedKeyValueStore
(current implementation), this is user-face interface.
Code Block |
---|
|
/**
* Interface for storing the suppressed records.
* <p>
* This class is not thread-safe.
*
* @param <K> type of the record keys
* @param <V> type of the record values
*/
public interface KTable<KSuppressionBuffer<K, V> extends StateStore, ReadOnlySuppressionBuffer<K, V> {
/**
* An ...
evicted record.
*
* @param <K> type of the record keys
/** * @param <V> type of the record values
* @see
* Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
* <p>
* This controls what updates downstream table and stream operations will receive./
final class Eviction<K, V> {
private final K key;
private final Change<V> value;
private final RecordContext recordContext;
public Eviction(final K key, final Change<V> value, final RecordContext recordContext) {
this.key = key;
this.value = value;
this.recordContext = recordContext;
}
public K key() {
return key;
}
public Change<V> value() {
return value;
}
public RecordContext recordContext() {
return recordContext;
}
@Override
public String toString() {
return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}';
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Eviction<?, ?> eviction = (Eviction<?, ?>) o;
return Objects.equals(key, eviction.key) &&
Objects.equals(value, eviction.value) &&
Objects.equals(recordContext, eviction.recordContext);
}
@Override
public int hashCode() {
return Objects.hash(key, value, recordContext);
}
}
/**
* Add a value associated with this key.
*
* @param time
* @param key
* @param suppressedvalue
* @param recordContext
Configuration object determining what, if any, updates to suppress*/
void put(long time, K key, Change<V> value, RecordContext recordContext);
/**
* Evict stored records which satisfy {@code predicate}.
*
* @param predicate a boolean {@link java.util.function.Supplier}
* @param callback the callback invoked after the eviction
*/
void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);
|
ReadOnlySuppressionBuffer
provides read-only view of SuppressionBuffer
. It extends ReadOnlyKeyValueStore
so can be treated like a key-value mappings.
Code Block |
---|
|
/**
* A suppression buffer that only supports read operations.
*
* @param <K> type of the record keys
* @param <V> type of the record values
*/
public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> {
/** queryableStoreName A queryableStoreName of suppression buffer
* @return A new Returns suppressed view of the value associated with {@code KTablekey} with the desired suppression characteristics. timestamp, if exists.
* If not, returns null.
*
* @param key record key
* @return Suppressed view of the value associated with given key, with timestamp (if exists)
*/
NullableValueAndTimestamp<V> priorValueForBuffered(K key);
/**
* Returns the number of key/value pairs suppressed in this buffer.
*
* @return the number of key/value pairs suppressed in this buffer
*/
KTable<K, V> suppress(final Suppressed<? super K> suppressed, final String queryableStoreName);
.... |
...
int numRecords();
/**
* Returns the size of this buffer, in bytes.
*
* @return the size of the buffer, in bytes
*/
long bufferSize();
/**
* Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} iff the buffer is empty.
*
* @return the timestamp of the oldest record in this buffer
*/
long minTimestamp();
} |
2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder]
These methods are counterparts of other Stores
methods, like Stores#[inMemoryKeyValueStore, keyValueStoreBuilder]
. Using this methods, users can instantiate suppression buffer even in the lower level api.
Code Block |
---|
|
public final class QueryableStoreTypes { Stores {
...
/**
* Create an in-memory {@link SuppressionBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link SuppressionBytesStoreSupplier}
*/
public static SuppressionBytesStoreSupplier inMemorySuppressionBuffer(final String name);
...
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<KStoreBuilder<SuppressionBuffer<K, ValueAndTimestamp<V>>>V>> timestampedKeyValueStore() {
suppressionBufferBuilder(final SuppressionBytesStoreSupplier supplier,
final Serde<K> keySerde,
return new TimestampedKeyValueStoreType<>();
}
...
public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
final Serde<V> valueSerde);
...
} |
Here are the public interface of SuppressionBytesStoreSupplier
, SuppressionBufferBuilder
:
Code Block |
---|
|
/**
* A store supplier that can be used to create one or more {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} instances.
*/
public interface SuppressionBytesStoreSupplier extends StoreSupplier<SuppressionBuffer<Bytes, byte[]>> {
/**
* Returns {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} with logging enabled/disabled.
*/
SuppressionBuffer<Bytes, byte[]> get(boolean loggingEnabled);
} |
Code Block |
---|
|
public class SuppressionBufferBuilder<K, V> extends AbstractStoreBuilder<K, V, SuppressionBuffer<K, V>> {
private final SuppressionBytesStoreSupplier storeSupplier;
public SuppressionBufferBuilder(final SuppressionBytesStoreSupplier storeSupplier,
private final Serde<K> keySerde;,
private final Serde<V> valueSerde;
,
final Time time) {
SuppressionBufferType(final Serde<K> keySerde, final Serde<V> valueSerde) {
super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time);
this.storeSupplier = storeSupplier;
}
@Override
public SuppressionBuffer<K, V> build() {
return new MeteredSuppressionBuffer<>(
super(new HashSet<>(Arrays.asList( storeSupplier.get(enableLogging),
storeSupplier.metricsScope(),
time,
keySerde,
valueSerde);
TimeOrderedKeyValueBuffer.class,}
} |
Add a new QueryableStoreType, SuppressionBufferType
This change makes SuppressionBuffer queriable.
Code Block |
---|
|
public final class QueryableStoreTypes {
...
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlySuppressionBuffer}.
*
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link ReadOnlyKeyValueStore.class)))QueryableStoreTypes.SuppressionBufferType}
*/
public static <K, V> QueryableStoreType<ReadOnlySuppressionBuffer<K, V>> suppressionBuffer() {
return new SuppressionBufferType<>();
}
...
public static class SuppressionBufferType<K, V> extends this.keySerde = keySerde;
QueryableStoreTypeMatcher<ReadOnlySuppressionBuffer<K, V>> {
SuppressionBufferType() {
this.valueSerde = valueSerde super(Collections.singleton(ReadOnlySuppressionBuffer.class));
}
@Override
public ReadOnlyKeyValueStore<KReadOnlySuppressionBuffer<K, V> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlyKeyValueStore<>CompositeReadOnlySuppressionBuffer<>(storeProvider, this, storeName);
}
}
} |
4. Add a new method, KTable#suppress(Suppressed, Materialized)
Using thid method, users can specify the queriable name of suppression buffer.
Code Block |
---|
|
public interface KTable<K, V> {
...
/**
* Suppress some updates public Serde<K> keySerde() {from this changelog stream, determined by the supplied {@link Suppressed} configuration.
* <p>
* This returncontrols keySerde;
what updates downstream table and stream operations will }
receive.
*
* @param suppressed Configuration object determining public Serde<V> valueSerde() {
what, if any, updates to suppress
* @param queryableStoreName A queryableStoreName of returnsuppression valueSerde;buffer
* @return A new {@code KTable} with the desired suppression characteristics.
*/
KTable<K, }
} |
Compatibility, Deprecation, and Migration Plan
None.
Rejected Alternatives
Adding KTable#suppress(Suppressed, Materialized)
V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
.... |
5. Add TopologyTestDriver#getSuppressionBuffer
This method provides a testing functionality on suppression buffer.
Code Block |
---|
|
public class TopologyTestDriver implements Closeable {
...
public <K, V> SuppressionBuffer<K, V> getSuppressionBuffer(final String name) {
.... |
Compatibility, Deprecation, and Migration Plan
NoneSince KTable#suppress
does not change the Key, Value type of the KTable
instance, Materialized#with[KeySerde, ValueSerde]
should be ignored. Since the underlying data structure, InMemoryTimeOrderedKeyValueBuffer
, does not support caching or logging, Materialized#withCaching[Enabled, Disabled]
and Materialized#withLogging[Enabled, Disabled]
options are also ignored. For these reasons, a Materialized
instance is too big for parameter, unless InMemoryTimeOrderedKeyValueBuffer follows the other StateStore classes.