...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
class Window {
//New methods.
Instant startTime();
Instant endTime();
}
JoinWindows {
//Existing methods. Will be deprecated.
static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;
JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;
JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;
//Existing method. Will be removed.
JoinWindows grace(final long millisAfterWindowEnd);
//New methods.
static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;
JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;
JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;
JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException;
}
Materialized {
//Existing method. Will be removed.
Materialized<K, V, S> withRetention(final long retentionMs);
//New method.
Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException;
}
SessionWindows {
//Existing methods. Will be deprecated.
static SessionWindows with(final long inactivityGapMs);
//Existing methods. Will be removed.
SessionWindows grace(final long millisAfterWindowEnd);
//New methods.
static SessionWindows with(final Duration inactivityGap) throws IllegalArgumentException;
SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException;
}
TimeWindows {
//Existing methods. Will be deprecated.
static TimeWindows of(final long sizeMs) throws IllegalArgumentException;
TimeWindows advanceBy(final long advanceMs);
//Existing method. Will be removed.
TimeWindows grace(final long millisAfterWindowEnd);
//New methods.
static TimeWindows of(final Duration size) throws IllegalArgumentException;
TimeWindows advanceBy(final Duration advance);
TimeWindows grace(final Duration afterWindowEnd);
}
UnlimitedWindows {
//Existing methods. Will be deprecated.
UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException;
//New methods.
UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException;
}
ProcessorContext {
//Existing method. Will be deprecated.
Cancellable schedule(final long intervalMs,
final PunctuationType type,
final Punctuator callback);
//New method.
Cancellable schedule(final Duration interval,
final PunctuationType type,
final Punctuator callback) throws IllegalArgumentException;
}
ReadOnlyWindowStore<K, V> {
//Deprecated methods.
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
//New methods.
//This changed after initial KIP voting based on [PR discussion](https://github.com/apache/kafka/pull/5682#discussion_r222494244)
WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException;
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant from, Instant to) throws IllegalArgumentException;
KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException;
}
WindowStore {
//New methods with default implementation that checks arguments and pass it to existing fetch methods.
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) throws IllegalArgumentException;
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) throws IllegalArgumentException;
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) throws IllegalArgumentException;
}
Stores {
//Existing methods. Will be deprecated.
static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriodMs,
final long windowSizeMs,
final boolean retainDuplicates);
//This method added after KIP voting. Based on John Roesler comment(https://github.com/apache/kafka/pull/5682#discussion_r221472187)
static SessionBytesStoreSupplier persistentSessionStore(final String name,
final long retentionPeriod);
static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriodMs,
final long windowSizeMs,
final boolean retainDuplicates,
final long segmentIntervalMs);
//New methods.
static WindowBytesStoreSupplier persistentWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException;
static SessionBytesStoreSupplier persistentSessionStore(final String name,
final Duration retentionPeriod) throws IllegalArgumentException;
}
KafkaStreams {
//Existing method. Will be deprecated.
public synchronized boolean close(final long timeout, final TimeUnit timeUnit);
//New method
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException;
}
|
...
- reject negative numbers
- make 0 just signal and return immediately (after checking the state once)
Default implementation of fetch methods in WindowStore.
Compatibility, Deprecation, and Migration Plan
...