Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKTable
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, GroupByOptions<KR, VR> groupByOptions);

KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized);

KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<K, V>> materialized);

void to(final String topic, final TopicOptions<V, V> options);

KTable<K, V> through(final String topic, final Materialized<K, V> options);

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Materialized<K, V, KeyValueStore<K, VR>> materialized);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Materialized<K, V, KeyValueStore<K, VR>> materialized);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Materialized<K, V, KeyValueStore<K, VR>> materialized);

 

We add some new helper methods to Stores so people can conveniently and quickly create basic StateStoreSuppliers for use in the DSL or PAPI

Code Block
languagejava
titleStores
public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> persistentKeyValueStore(final String name,
                                                                                     final Serde<K> keySerde,
                                                                                     final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> inMemoryKeyValueStore(final String name,
                                                                                final Serde<K> keySerde,
                                                                                final Serde<V> valueSerde)

public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final String name,
                                                                final int capacity,
                                                                final Serde<K> keySerde,
                                                                final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<WindowStore<K, V>> persistentWindowStore(final String name,
                                                                            final Windows windows,
                                                                            final Serde<K> keySerde,
                                                                            final Serde<V> valueSerde)

public static <K, V> StateStoreSupplier<SessionStore<K, V>> persistentSessionStore(final String name,
                                                                              final SessionWindows windows,
                                                                              final Serde<K> keySerde,
                                                                              final Serde<V> valueSerde) 

/**
 *  The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with
 *  caching, logging, and any other convenient wrappers provided by the KafkaStreams library
 */ 
public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final StateStoreSupplier<WindowStore<K, V>> supplier)

public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier)

public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final StateStoreSupplier<SessionStore<K, V>> supplier)

...