You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status

Current stateUnder Discussion

Discussion thread: here
JIRA: KAFKA-7277 - Getting issue details... STATUS

Motivation

Right now Streams API unversally represents time as ms-since-unix-epoch.

There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.

What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.

Public Interfaces

We need to add following method to public API:

Public API changes
Windows {
	//Existing methods. Will be deprecated.
	public Windows<W> until(final long durationMs) throws IllegalArgumentException;
	public long maintainMs();
	public abstract long size();

	//New methods.
	public Windows<W> until(final Duration duration) throws IllegalArgumentException;
	public Duration maintain();
	public abstract Duration size();
}

class Window {
	//Existing methods. Will be deprecated.
	public Window(final long startMs, final long endMs) throws IllegalArgumentException;
	public long start();
	public long end();

	//New methods.
	public Window(final Instant start, final Instant end) throws IllegalArgumentException;
	public LocalDateTime start();
	public LocalDateTime end();
}

JoinWindows {
    //Existing methods. Will be deprecated.
    public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;
    public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;
    public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;
    public JoinWindows until(final long durationMs) throws IllegalArgumentException;
    public long maintainMs();

    //New methods.
    public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;
    public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;
    public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;
    public JoinWindows until(final Duration duration) throws IllegalArgumentException;
    public Duration maintain();
}

SessionWindows {
	//Existing methods. Will be deprecated.
    public static SessionWindows with(final long inactivityGapMs);
    public SessionWindows until(final long durationMs) throws IllegalArgumentException;
    public long inactivityGap();
    public long maintainMs();

	//New methods.
    public static SessionWindows with(final Duration inactivityGap);
    public SessionWindows until(final Duration duration) throws IllegalArgumentException;
    public Duration inactivityGap();
    public Duration maintain();
}

TimeWindowedDeserializer {
	//Existing methods. Will be deprecated.
	public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize);
	public Long getWindowSize();

	//New methods.
	public TimeWindowedDeserializer(final Deserializer<T> inner, final Duration windowSize);
	public Duration getWindowSize();
}

TimeWindows {
	//Existing methods. Will be deprecated.
	public static TimeWindows of(final long sizeMs) throws IllegalArgumentException;
	public TimeWindows advanceBy(final long advanceMs);
	public Map<Long, TimeWindow> windowsFor(final long timestamp);
    public long size();
    public TimeWindows until(final long durationMs) throws IllegalArgumentException;
    public long maintainMs();

	//New methods.
	public static TimeWindows of(final Duration size) throws IllegalArgumentException;
	public TimeWindows advanceBy(final Duration advance);
	public Map<Long, TimeWindow> windowsFor(Instant timestamp);
    public Duration size();
    public TimeWindows until(final Duration duration) throws IllegalArgumentException;
    public Duration maintain();
}

UnlimitedWindows {
	//Existing methods. Will be deprecated.
    public UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException;
    public Map<Long, UnlimitedWindow> windowsFor(final long timestamp);
	public long size();
	public long maintainMs();

	//New methods.
    public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException;
    public Map<Long, UnlimitedWindow> windowsFor(final Instant timestamp);
	public Duration size();
	public Duration maintain();
}

ProcessorContext {
    //Existing method. Will be deprecated.
    Cancellable schedule(final long intervalMs,
                         final PunctuationType type,
                         final Punctuator callback);

	//New method.
    Cancellable schedule(final Duration interval,
                         final PunctuationType type,
                         final Punctuator callback);
}

ReadOnlyWindowStore<K, V> {
    //Existing methods. Will be deprecated.
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);

	//New methods.
    WindowStoreIterator<V> fetch(K key, Instant from, Duration duration);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant from, Duration duration);
    KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Duration duration);
}

SessionBytesStoreSupplier {
    //Existing methods. Will be deprecated.
    long segmentIntervalMs();
    long retentionPeriod();

    //New methods. 
    Duration segmentInterval();
    Duration retentionPeriod();
}

SessionStore {
	//Existing methods. Will be deprecated.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime);
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime);

	//New methods.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, Instant earliestSessionEnd, final Instant latestSessionStart);
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, Instant earliestSessionEnd, final Instant latestSessionStart);
}

Stores {
	//Existing methods. Will be deprecated.
	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final long retentionPeriod,
        	                                                     final long windowSize,
            	                                                 final boolean retainDuplicates);

	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final long retentionPeriod,
        	                                                     final long windowSize,
            	                                                 final boolean retainDuplicates,
                	                                             final long segmentInterval);

	public static SessionBytesStoreSupplier persistentSessionStore(final String name,
    	                                                           final long retentionPeriod);

	//New methods.
	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final Duration retentionPeriod,
        	                                                     final Duration windowSize,
            	                                                 final boolean retainDuplicates);

	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final Duration retentionPeriod,
        	                                                     final Duration windowSize,
            	                                                 final boolean retainDuplicates,
                	                                             final Duration segmentInterval);

	public static SessionBytesStoreSupplier persistentSessionStore(final String name,
    	                                                           final Duration retentionPeriod);
}

WindowBytesStoreSupplier {
    //Existing methods. Will be deprecated.
    long segmentIntervalMs();
    long windowSize();
    long retentionPeriod();

	//New methods.
    Duration segmentInterval();
    Duration windowSize();
    Duration retentionPeriod();
}

KafkaStreams {
	//Existing method. Will be deprecated.
	public synchronized boolean close(final long timeout, final TimeUnit timeUnit);

	//New method
	public synchronized boolean close(final Duration timeout)
}

StreamsMetrics {
	//Existing method. Will be deprecated.
	void recordLatency(final Sensor sensor,
    	               final long startNs,
        	           final long endNs);

	//New method.
	void recordLatency(final Sensor sensor,
    	               final Instant start,
        	           final Duration recDuration);
}


Proposed Changes

New methods in public API are proposed. See "Public Interfaces" section.

Compatibility, Deprecation, and Migration Plan

No compatibility issues foreseen.

Rejected Alternatives

An alternative solution with long parameters is implemented right now.

  • No labels