Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Adopted (DRAFT) "Under Discussion"1.0)

Discussion thread: [DISCUS] KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

JIRA: KAFKA-5651  here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As the Kafka Streams DSL has evolved, some of the APIs have become very overload heavy. For example, we have 8 different overloads for KStream#print. As we add more overloads it becomes harder for a developer using a modern IDE to discover the interfaces hence interrupting the flow and becoming an API usability issue.

...

Before we go and add many more overloaded methods it is worth while exploring other options to see if we can provide a more concise and intuitive API.

Public Interfaces

New methods added to existing interfaces:

Code Block
languagejava
titleKStream
void print(final PrintOptions<KPrinted<K, V> printOptionsprinted);

KStream<K, V> through(final String topic, final TopicOptions<KProduced<K, V> topicOptionspartitioned);

void to(final String topic, final TopicOptions<VProduced<V, V> topicOptionspartitioned);

KGroupedStream<K, V> groupByKey(final GroupByOptions<KSerialized<K, V> groupByOptionsserialized);

<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, GroupByOptions<KRSerialized<KR, V> groupByOptionsserialized);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VO> options);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinOptions<KJoined<K, V, VT> options);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VO> options);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinWindows windowsJoined<K, final JoinOptions<K, V, VT> options);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VO> options);

<VT


Code Block
languagejava
titleKTable
<KR, VR> KStream<KKGroupedTable<KR, VR> outerJoingroupBy(final KTable<K, VT> other, final ValueJoiner<KeyValueMapper<? super VK, ? super VTV, ? extends VR> joinerKeyValue<KR, finalVR>> JoinWindows windowsselector, final JoinOptions<KSerialized<KR, V, VT> options);
Code Block
languagejava
titleKTable
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<VR> serialized);

KTable<K, V> filter(final Predicate<? super K, ? super V,V> KeyValue<KRpredicate, VR>>final selectorMaterialized<K, GroupByOptions<KRV, VR> groupByOptionsKeyValueStore<Bytes, byte[]>> materialized);

KTable<K, V> filterfilterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<KKeyValueStore<Bytes[], V>>byte[]>> materialized);

<VR> KTable<K, V>VR> filterNotmapValues(final Predicate<ValueMapper<? super KV, ? superextends V>VR> predicatemapper, final Materialized<K, V, KeyValueStore<KKeyValueStore<Bytes[], V>>byte[]>> materialized);

<VR><VO, VR> KTable<K, VR> mapValuesjoin(final ValueMapper<?KTable<K, superVO> Vother,
 ? extends VR> mapper, final Materialized<K, V, KeyValueStore<K, V>> materialized);

void to(final String topic, final TopicOptions<V, V> options);

KTable<K, V> through(final String topic, final Materialized<K, V> options);

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Materialized<K, VVR, KeyValueStore<KKeyValueStore<Bytes, VR>>byte[]>> materialized);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Materialized<K, VVR, KeyValueStore<KKeyValueStore<Bytes, VR>>byte[]>> materialized);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Materialized<K, VVR, KeyValueStore<KKeyValueStore<Bytes, VR>>byte[]>> materialized);
 

 

We add some new helper methods to Stores so people can conveniently and quickly create basic StateStoreSuppliers for use in the DSL or PAPI. We will also deprecate the existing Stores.create(...)

Code Block
languagejava
titleStores
public static <K, V> StateStoreSupplier<KeyValueStore<K,KeyValueBytesStoreSupplier V>> persistentKeyValueStore(final String name,) 

public static <K, V> KeyValueBytesStoreSupplier  inMemoryKeyValueStore(final String name)

public static <K, V> KeyValueBytesStoreSupplier lruMap(final String name) 

public static <K, V> WindowBytesStoreSupplier persistentWindowStore(final String name, 
																	final long retentionPeriod, 
																	final                                                          int numSegments, 
																	final long windowSize, 
																	final boolean retainDuplicates)
public static <K, V> SessionBytesStoreSupplier persistentSessionStore(final String name, final long retentionPeriod)
/**
 *  The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with
 *  caching, logging, and any other convenient wrappers provided by the KafkaStreams library
 */ 
public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final WindowBytesStoreSupplier supplier, 
																	  final Serde<K> keySerde, 
																      final Serde<V>               valueSerde)

public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
 																		  final Serde<K> keySerde, 
																          final Serde<V> valueSerde)

public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final      SessionBytesStoreSupplier supplier,
																	    final Serde<K> keySerde, 
																        final Serde<V> valueSerde)


Code Block
languagejava
titleTopology
public synchronized <K, V> Topology addGlobalStore(final StateStoreBuilder storeSupplier,
                final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> inMemoryKeyValueStore(final String name,
                               final String sourceName,
                                               final Serde<K> keySerde,
         final TimestampExtractor timestampExtractor,
                                                          final Deserializer keyDeserializer,
         final  Serde<V> valueSerde)

public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final String name,
                                      final Deserializer valueDeserializer,
                        final int capacity,
                                final String topic,
                              final Serde<K> keySerde,
                          final String processorName,
                                    final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<WindowStore<K, V>> persistentWindowStore(final String name,
           final ProcessorSupplier stateUpdateSupplier)
 
public synchronized final Topology addStateStore(final StateStoreBuilder supplier, final String... processorNames)
 

 

 

Code Block
languagejava
titleKGroupedStream
<W extends Window> TimeWindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<K, V> windowedBy(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized<K, Long> materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                         final Windows windows,
  final Aggregator<? super K, ? super V, VR> aggregator,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);


Code Block
languagejava
titleKGroupedTable
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer                                 final Serde<K> keySerde,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                                       final Serde<V> valueSerde)

public static <K, V> StateStoreSupplier<SessionStore<K, V>> persistentSessionStore(final String nameAggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VR, KeyValueStore<Bytes,                                              final SessionWindows windows,
                                                                              final Serde<K> keySerde,
                                                                              final Serde<V> valueSerde) 
 
public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final StateStoreSupplier<WindowStore<K, V>> supplier)

public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier)

public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final StateStoreSupplier<SessionStore<K, V>> supplier)
Code Block
languagejava
titleTopologyBuilder
byte[]>> materialized);

 

For StreamsBuilder we remove all stream, table, and globalTable overloads that take more than a single argument and replace them with:

Code Block
languagejava
titleStreamsBuilder
public synchronized <K, V> KStream<K, V> stream(final String topic)
 
public synchronized <K, V> KStream<K, V> stream(final String topic, final Consumed<K, V> options)
 
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topic, final Consumed<K, V> options)
 
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topic)

public synchronized <K, V> KStream<K, V> stream(final Pattern pattern, final Consumed<K, V> options)

public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed)

public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
 
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)

public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed)

public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
 
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)

 


New classes and interfaces:

Code Block
languagejava
titleWindowedKStream
public interface TimeWindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
public synchronized <K, V> TopologyBuilder addGlobalStore(final StateStoreBuilder storeSupplier,
                                                          final String sourceName,
     final Aggregator<? super K, ? super V, VR>   aggregator,
                                           final TimestampExtractorMaterialized<K, timestampExtractorVR,
 WindowStore<Bytes, byte[]>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final       Reducer<V> reducer,
                                  final DeserializerMaterialized<K, keyDeserializerV,
 WindowStore<Bytes, byte[]>> materializedAs);


} 


Code Block
languagejava
titleSessionWindowedKStream
public interface SessionWindowedKStream<K, V> {

           KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materializedAs);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                       final Deserializer valueDeserializer,
                     final Aggregator<? super K, ? super V, VR> aggregator,
                             final String topic,
               final Merger<? super K, T> sessionMerger);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                               final String processorName,
             final Aggregator<? super K, ? super V, VR> aggregator,
                                     final ProcessorSupplier stateUpdateSupplier)
 
public synchronized final TopologyBuilder addStateStore(final StateStoreBuilder supplier, final String... processorNames)
 

 

 

Code Block
languagejava
titleKGroupedStream
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
Merger<? super K, T> sessionMerger,
                                              final Materialized<K, VR, SessionStore<Bytes, byte[]>> materializedAs);


    final Aggregator<? super KKTable<Windowed<K>, ?V> super V, VR> aggregator,reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                     final Serde<VR> aggValueSerde,
            final Materialized<K, V, SessionStore<Bytes,              final Materialized<K, V, KeyValueStore<K, V>> materialized);

...

byte[]>> materializedAs);
}


Code Block
languagejava
titleWindowedKStream
public interface WindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator,
                                           final Materialized<K, VR, WindowStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, WindowStore<K, V>> materializedAs);


} 
Code Block
languagejava
titleSessionWindowedKStream
public interface SessionWindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                              final Merger<? super K, T> sessionMerger);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                              final Merger<? super K, T> sessionMerger,
                                              final Materialized<K, VR, SessionStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, SessionStore<K, V>> materializedAs);
}
Code Block
languagejava
titleMaterialized
/**
 * Used when materializing a state store
 */
public class Materialized<K, V, S extends StateStore> {
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)

    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier)

    public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)

    public Materialized<K, V, S> withKeySerde(final Serde<K> valueSerde)

    public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)

    public Materialized<K, V, S> withLoggingDisabled()

    public Materialized<K, V, S> withCachingEnabled()

    public Materialized<K, V, S> withCachingDisabled()

} 
Code Block
languagejava
titleGroupByOptions
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class GroupByOptions<K, V> {

    public static <K, V> GroupByOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new GroupByOptions<>();
    }
    
    public GroupByOptions<K, V> withKeySerde()

    public GroupByOptions<K, V> withValueSerde()
}
Materialized
/**
 * Used when materializing a state store, i.e, during an aggregation operation or KTable operations
 */
public class Materialized<K, V, S extends StateStore> {

	public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)

    public static <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> as(final WindowBytesStoreSupplier supplier);
 
    public static <K, V> Materialized<K, V, SessionStore<Bytes, byte[]>> as(final SessionBytesStoreSupplier supplier);
 
    public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(final KeyValueBytesStoreSupplier supplier)
 
    public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)

    public Materialized<K, V, S> withKeySerde(final Serde<K> valueSerde)

    public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)

    public Materialized<K, V, S> withLoggingDisabled()

    public Materialized<K, V, S> withCachingEnabled()

    public Materialized<K, V, S> withCachingDisabled()

	public String storeName();

	public StoreSupplier<S> storeSupplier()

	public Serde<K> keySerde()

	public Serde<V> valueSerde()

	public boolean loggingEnabled()

	public Map<String, String> logConfig()

	public boolean cachingEnabled()

} 


Code Block
languagejava
titleSerialized
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class Serialized<K, V> {

    public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

    public Serialized<K, V> withKeySerde(final Serde<K> keySerde)

    public Serialized<K, V> withValueSerde(final Serde valueSerde)
 
	public Serde<K> keySerde()
 
    public Serde<V> valueSerde()
}


Code Block
languagejava
titleJoined
/**
 * Optional params that can be passed to join, leftJoin, outerJoin operations
 */
public class Joined<K, V, VO> {

    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)

    public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde)

    public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde)

    public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<V> valueSerde)

    public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde)

    public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde)

    public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
 
	public Serde<K> keySerde()
 
    public Serde<V> valueSerde()
 
	public Serde<VO> otherValueSerde()
}


Code Block
languagejava
titleProduced
/**
 * Optional arguments that can be specified when doing to and through operations
 */
public static <K, V> Produced<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

public static <K, V> Produced<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final StreamPartitioner<K, V> partitioner) 

public static <K, V> Produced<K, V> keySerde(final Serde<K> keySerde)

public static <K, V> Produced<K, V> valueSerde(final Serde<V> valueSerde) 

public static <K, V> Produced<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner) 

public Produced<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) 
public Produced<K, V> withValueSerde(final Serde<V> valueSerde) 
public Produced<K, V> withKeySerde(final Serde<K> keySerde)
public Serde<K> keySerde()
public Serde<K> valueSerde()
public StreamPartitioner<K, V> streamPartitioner()


Code Block
languagejava
titlePrinted
 
/**
 * options that can be used when printing to stdout our writing to a file
 */
public class Printed<K, V> {
    public static <K, V> Printed<K, V> toFile(final String filepath)

    public static <K, V> Printed<K, V> toSysOut()

    public Printed<K, V> withLabel(final String label)

    public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)

    public ProcessorSupplier<K, V> build(final String processorName)
}


Code Block
languagejava
titleConsumed
/**
 * Options for consuming a topic as a KStream or KTable
 */
public class Consumed<K, V> {
    public static <K, V>  Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor extractor, final Topology.AutoOffsetReset resetPolicy)
    public static <K, V>  Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)
    public static <K, V>  Consumed<K, V> with(final TimestampExtractor extractor)
    public static <K, V>  Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy)
    
    public Consumed<K, V> withKeySerde(final Serde<K> keySerde)

    public Consumed<K, V> withValueSerde(final Serde<V> valueSerde)

    public Consumed<K, V> withTimestampExtractor(final TimestampExtractor timestampExtractor)

    public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy)
 
	public Serde<K> keySerde()
 
	public Serde<V> valueSerde()
 
	public TimestampExtractor timestampExtractor()
 
	public Toploogy.AutoOffsetReset offsetResetPolicy()
}


Code Block
languagejava
titleStateStoreBuilder
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StoreBuilder<T extends StateStore> {

    StoreBuilder<T> withCachingEnabled();
    StoreBuilder<T> withLoggingEnabled(Map<String, String> config);
    StoreBuilder<T> withLoggingDisabled()
    T build();
    Map<String, String> logConfig();
    boolean loggingEnabled();
}


Code Block
languagejava
titleStoreSupplier
public interface StoreSupplier<T extends StateStore> {

    /**
     * Return the name of this state store supplier.
     * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'
     *
     * @return the name of this state store supplier
     */
    String name();

    /**
     * Return a new {@link StateStore} instance.
     *
     * @return a new {@link StateStore} instance of type T
     */
    T get();
 
	/**
     * Return a String that is used as the scope for metrics recorded by Metered stores
 	 * @return metricsScope
 	 */
    String metricsScope();

Code Block
languagejava
titleJoinOptions
/**
 * Optional params that can be passed to join, leftJoin, outerJoin operations
 */
public class JoinOptions<K, V, VO> {

    public static <K, V, VO> JoinOptions<K, V, VO> serdes(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)
    public JoinOptions<K, V, VO> withKeySerde(final Serde<K> keySerde)

    public JoinOptions<K, V, VO> withValueSerde(final Serde<V> valueSerde) 

    public JoinOptions<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
}


Code Block
languagejava
titleTopicOptionsWindowBytesStoreSupplier
/**
 * OptionalA store argumentssupplier that can be specifiedused whento doingcreate toone andor throughmore operations
 */
public class TopicOptions<K, V> {
    public static <K, V> TopicOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde);
    }

    public static <K, V> TopicOptions<K, V> options(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) {{@link WindowStore} instances of type &lt;Byte, byte[]&gt;
 */
public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Bytes, byte[]>> {
    /**
     * The number of segments the store has. If your store is segmented then this should be the number of segments
     * in the underlying store. It is also used to reduce the amount of data that is scanned when caching is enabled
     *
   return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde).withStreamPartitioner(partitioner);
    }
* @return number of segments
    public TopicOptions<K,*/
 V> withStreamPartitioner(final StreamPartitioner<K, V>int partitioner) {segments();

    /**
    return null;
* The size of }

the windows any store publiccreated TopicOptions<K,from V>this withValueSerde(finalsupplier Serde<V> valueSerde) {is creating
     * @return window returnsize
 null;
    }
*/
    public TopicOptions<K, V> withKeySerde(final Serde<K> keySerde) {long windowSize();

    /**
    return null;
* Whether or  }
}
Code Block
 
/**
 * options that can be used when printing to stdout our writing to a file
 */
public class PrintOptions<K, V> {
    public static <K, V> PrintOptions<K, V> labeled(final String label) 
    
    public static <K, V> PrintOptions<K, V> toFile(final String filepath)not this store is retaining duplicate keys. Usually only true if the store is being used
     * for joins. Note this should return false if caching is enabled
     * @return true if duplicates should be retained
     */
    boolean retainDuplicates();

    public/**
 static <K, V> PrintOptions<K, V>* sysOut(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label) 
The time period for which the {@link WindowStore} will retain historic data
     * @return retentionPeriod
    public static*/
 <K, V> PrintOptions<K, V>long toFile(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label)

    public PrintOptions<K, V> withLabel(final String label) 

    public PrintOptions<K, V> withFile(final String filepath)

    public PrintOptions<K, V> withKeySerde(final Serde<K> keySerde)

    public PrintOptions<K, V> withValueSerde(final Serde<V> keySerde)

    public PrintOptions<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)
}

 

 

retentionPeriod();
}


Code Block
languagejava
titleKeyValueBytesStoreSupplier
/**
 * A store supplier that can be used to create one or more {@link KeyValueStore} instances of type &lt;Byte, byte[]&gt;
 */
public interface KeyValueBytesStoreSupplier extends StoreSupplier<KeyValueStore<Bytes, byte[]>> {

}


Code Block
/**
 * A store supplier that can be used to create one or more {@link SessionStore} instances of type &lt;Byte, byte[]&gt;
 */
public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> {

    /**
     * The size of a segment, in milliseconds. Used when caching is enabled to segment the cache
     * and reduce the amount of data that needs to be scanned when performing range queries
     *
     * @return segmentInterval in milliseconds
     */
    long segmentIntervalMs(
Code Block
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder<T extends StateStore> {

    StateStoreBuilder<T> withCachingEnabled();
    StateStoreBuilder<T> withCachingDisabled();
    StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
    StateStoreBuilder<T> withLoggingDisabled();
    T build();
}

 

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

...

Add the above methods, interfaces, classes to the DSL. Deprecate existing overloads on KStream, KTable, and KGroupedStream that take more than the required parameters, for example, KTable#filter(Predicate, String) and KTable#filter(Predicate, StateStoreSupplier) will be deprecated. StateStoreSupplier will also be deprecated. All versions of KTable#through and KTable#to will be deprecated in favour of using KTable#toStream()#through and  KTable#toStream()#to  

The new Interface BytesStoreSupplier supersedes the existing StateStoreSupplier (which will remain untouched). This so we can provide a convenient way for users creating custom state stores to wrap them with caching/logging etc if they chose. In order to do this we need to force the inner most store, i.e, the custom store, to be a store of type `<Bytes, byte[]>`. 

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
    • None - we will deprecate the existing methods so that existing users can continue until they decide to change

Rejected Alternatives

  • Using a more fluent api:  this approach always results in intermediate stages that require a final build or apply call to create the underlying KStream/KTable etc. We felt that this wasn't quite right.
  • Builder for all a params: Rather than specifying the required params and optional params separately we could make each method take a Builder that has all of the params. It was felt that this is a but onerous for users that just want to use the required params and don't care about the options