You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Motivation

Right now Streams API unversally represents time as ms-since-unix-epoch.

There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.

What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.

Public Interfaces

We need to add following method to public API:

Public API changes
public interface ProcessorContext {
    //Existing method.
    Cancellable schedule(final long intervalMs,
                         final PunctuationType type,
                         final Punctuator callback);


	//New method.
    Cancellable schedule(final Duration interval,
                         final PunctuationType type,
                         final Punctuator callback);
}


ReadOnlyWindowStore<K, V> {
    //Existing methods.
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);

    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);

    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);


	//New methods.
    WindowStoreIterator<V> fetch(K key, LocalDateTime from, Duration duration);

    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, LocalDateTime from, Duration duration);

    KeyValueIterator<Windowed<K>, V> fetchAll(LocalDateTime from, Duration duration);
}


SessionBytesStoreSupplier {
    //Existing methods.
    long segmentIntervalMs();

    long retentionPeriod();


    //New methods.
    Duration segmentInterval();

    Duration retentionPeriod();

}


WindowBytesStoreSupplier {
    //Existing methods.
    long segmentIntervalMs();

    long windowSize();

    long retentionPeriod();


	//New methods.
    Duration segmentInterval();

    Duration windowSize();

    Duration retentionPeriod();
}


JoinWindows {
    //Existing methods.
    public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;


    public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;

    public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;


    public JoinWindows until(final long durationMs) throws IllegalArgumentException;


    public long maintainMs();


    //New methods.
    public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;

    public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;

    public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;

    public JoinWindows until(final Duration duration) throws IllegalArgumentException;

    public Duration maintain();
}


SessionWindows {
	//Existing methods.
    public static SessionWindows with(final long inactivityGapMs);


    public SessionWindows until(final long durationMs) throws IllegalArgumentException;


    public long inactivityGap();


    public long maintainMs();


	//New methods.
    public static SessionWindows with(final Duration inactivityGap);

    public SessionWindows until(final Duration duration) throws IllegalArgumentException;

    public Duration inactivityGap();

    public Duration maintain();
}


TimeWindows {
	//Exising methods.
	public static TimeWindows of(final long sizeMs) throws IllegalArgumentException;


	public TimeWindows advanceBy(final long advanceMs);


	public Map<Long, TimeWindow> windowsFor(final long timestamp);


    public long size();


    public TimeWindows until(final long durationMs) throws IllegalArgumentException;


    public long maintainMs();


	//New methods.
	public static TimeWindows of(final Duration size) throws IllegalArgumentException;

	public TimeWindows advanceBy(final Duration advance);

	public Map<Long, TimeWindow> windowsFor(LocalDateTime timestamp);


    public Duration size();

    public TimeWindows until(final Duration duration) throws IllegalArgumentException;

    public Duration maintain();
}


Windows {
	//Existing methods.
	public Windows<W> until(final long durationMs) throws IllegalArgumentException;


	public long maintainMs();


	public abstract long size();


	//New methods.
	public Windows<W> until(final Duration duration) throws IllegalArgumentException;

	public Duration maintain();

	public abstract Duration size();
}


Proposed Changes

New methods in public API are proposed. See "Public Interfaces" sesion.

Compatibility, Deprecation, and Migration Plan

No compatibility issues foreseen.

Rejected Alternatives

Alternative solution with long parameters are implemented right now.

  • No labels