...
Code Block |
---|
|
/**
* A suppression buffer that only supports read operations.
*
* @param <K> type of the record keys
* @param <V> type of the record values
*/
public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> {
/**
* Returns suppressed view of the value associated with {@code key} with timestamp, if exists.
* If not, returns null.
*
* @param key record key
* @return Suppressed view of the value associated with given key, with timestamp (if exists)
*/
NullableValueAndTimestamp<V> priorValueForBuffered(K key);
/**
* Returns the number of key/value pairs suppressed in this buffer.
*
* @return the number of key/value pairs suppressed in this buffer
*/
int numRecords();
/**
* Returns the size of this buffer, in bytes.
*
* @return the size of the buffer, in bytes
*/
long bufferSize();
/**
* Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} iff the buffer is empty.
*
* @return the timestamp of the oldest record in this buffer
*/
long minTimestamp();
} |
Dislike to ValueAndTimestamp, NullableValueAndTimestamp can contain null as its value; it was introduced since suppression buffer can return a null value with a timestamp.
Code Block |
---|
|
public final class NullableValueAndTimestamp<V> {
protected final V value;
protected final long timestamp;
protected NullableValueAndTimestamp(final V value, final long timestamp) {
this.value = value;
this.timestamp = timestamp;
}
/**
* Create a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance.
*
* @param value the value
* @param timestamp the timestamp
* @param <V> the type of the value
* @return a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance
*/
public static <V> NullableValueAndTimestamp<V> make(final V value,
final long timestamp) {
return new NullableValueAndTimestamp<>(value, timestamp);
}
/**
* Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
* if the parameter is not {@code null}.
*
* @param nullableValueAndTimestamp a {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance; can be {@code null}
* @param <V> the type of the value
* @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null}
*/
public static <V> V getValueOrNull(final NullableValueAndTimestamp<V> nullableValueAndTimestamp) {
return nullableValueAndTimestamp == null ? null : nullableValueAndTimestamp.value();
}
public V value() {
return value;
}
public long timestamp() {
return timestamp;
}
public ValueAndTimestamp<V> toValueAndTimestamp() {
return value == null ? null : ValueAndTimestamp.make(value, timestamp);
}
@Override
public String toString() {
return "(" + value + "," + timestamp + ")";
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final NullableValueAndTimestamp<?> that = (NullableValueAndTimestamp<?>) o;
return timestamp == that.timestamp &&
Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value, timestamp);
}
} |
2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder]
...