...
Current state: Under Discussion
Discussion thread: thread
JIRA: KAFKA-8403
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||
---|---|---|
| ||
/** * A suppression buffer that only supports read operations. * * @param <K> type of the record keys * @param <V> type of the record values */ public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> { /** * Returns suppressed view of the value associated with {@code key} with timestamp, if exists. * If not, returns null. * * @param key record key * @return Suppressed view of the value associated with given key, with timestamp (if exists) */ NullableValueAndTimestamp<V> priorValueForBuffered(K key); /** * Returns the number of key/value pairs suppressed in this buffer. * * @return the number of key/value pairs suppressed in this buffer */ int numRecords(); /** * Returns the size of this buffer, in bytes. * * @return the size of the buffer, in bytes */ long bufferSize(); /** * Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} iff the buffer is empty. * * @return the timestamp of the oldest record in this buffer */ long minTimestamp(); } |
Dislike to ValueAndTimestamp
, NullableValueAndTimestamp
can contain null as its value; it was introduced since suppression buffer can return a null value with a timestamp.
...
Add a new QueryableStoreType, SuppressionBufferType
This change makes SuppressionBuffer
queriable.
Code Block | ||
---|---|---|
| ||
public final class QueryableStoreTypes { ... /** * A {@link QueryableStoreType} that accepts {@link ReadOnlySuppressionBuffer}. * * @param <K> key type of the store * @param <V> value type of the store * @return {@link QueryableStoreTypes.SuppressionBufferType} */ public static <K, V> QueryableStoreType<ReadOnlySuppressionBuffer<K, V>> suppressionBuffer() { return new SuppressionBufferType<>(); } ... public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySuppressionBuffer<K, V>> { SuppressionBufferType() { super(Collections.singleton(ReadOnlySuppressionBuffer.class)); } @Override public ReadOnlySuppressionBuffer<K, V> create(final StateStoreProvider storeProvider, final String storeName) { return new CompositeReadOnlySuppressionBuffer<>(storeProvider, this, storeName); } } } |
...