...
Code Block | ||||
---|---|---|---|---|
| ||||
void print(final PrintOptions<KPrinted<K, V> printOptionsprinted); KStream<K, V> through(final String topic, final TopicOptions<KPartitioned<K, V> topicOptionspartitioned); void to(final String topic, final TopicOptions<VPartitioned<V, V> topicOptionspartitioned); KGroupedStream<K, V> groupByKey(final GroupByOptions<KSerialized<K, V> groupByOptionsserialized); <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, GroupByOptions<KRSerialized<KR, V> groupByOptionsserialized); <VO, VR> KStream<K, VR> join(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VO> options); <VT, VR> KStream<K, VR> join(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinOptions<KJoined<K, V, VT> options); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VO> options); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VT> options); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VO> options); <VT, VR> KStream<K, VR> outerJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VT> options); |
Code Block | ||||
---|---|---|---|---|
| ||||
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, GroupByOptions<KRSerialized<KR, VR> groupByOptionsserialized); KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized); KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized); <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<K, V>> materialized); void to(final String topic, final TopicOptions<VPartitioned<V, V> options); KTable<K, V> through(final String topic, final Materialized<K, V> options); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VVR, KeyValueStore<K, VR>> materialized); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VVR, KeyValueStore<K, VR>> materialized); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VVR, KeyValueStore<K, VR>> materialized); |
...
We add some new helper methods to Stores so people can conveniently and quickly create basic StateStoreSuppliers for use in the DSL or PAPI. We will also deprecate the existing Stores.create(...)
Code Block | ||||
---|---|---|---|---|
| ||||
public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> persistentKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> inMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final String name, final int capacity, final Serde<K> keySerde, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<WindowStore<K, V>> persistentWindowStore(final String name, final Windows windows, final Serde<K> keySerde, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<SessionStore<K, V>> persistentSessionStore(final String name, final SessionWindows windows, final Serde<K> keySerde, final Serde<V> valueSerde) /** * The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with * caching, logging, and any other convenient wrappers provided by the KafkaStreams library */ public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final StateStoreSupplier<WindowStore<K, V>> supplier) public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier) public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final StateStoreSupplier<SessionStore<K, V>> supplier) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows); SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows); KTable<K, Long> count(final Materialized materialized); KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final Materialized<K, VVR, KeyValueStore<K, V>>VR>> materialized); |
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized); KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VR> subtractor, final Materialized<K, VVR, KeyValueStore<K, V>>VR>> materialized); |
New classes and interfaces:
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Optional params that can be passed to groupBy and groupByKey operations */ public class GroupByOptions<KSerialized<K, V> { public static <K, V> GroupByOptions<KSerialized<K, V> serdeswith(final Serde<K> keySerde, final Serde<V> valueSerde) { public static <K, V> returnSerialized<K, newV> GroupByOptions<>with(); final Serde<K> keySerde, final } Serde<V> valueSerde, final Serde<V> otherValueSerde) public GroupByOptions<KSerialized<K, V> withKeySerde(final Serde<K> keySerde) public GroupByOptions<KSerialized<K, V> withValueSerde(final Serde valueSerde) } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Optional params that can be passed to join, leftJoin, outerJoin operations */ public class JoinOptions<KJoined<K, V, VO> { public static <K, V, VO> JoinOptions<KJoined<K, V, VO> serdeswith(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde) public JoinOptions<K static <K, V, VO> Joined<K, V, VO> withKeySerdekeySerde(final Serde<K> keySerde) public JoinOptions<Kstatic <K, V, VO> withValueSerde(final Serde<V> valueSerde) Joined<K, V, VO> valueSerde(final Serde<V> valueSerde) public JoinOptions<K static <K, V, VO> Joined<K, V, VO> withOtherValueSerdeotherValueSerde(final Serde<VO>Serde<V> otherValueSerdevalueSerde) } | ||||
Code Block | ||||
| ||||
/** * Optional arguments thatpublic canJoined<K, beV, specifiedVO> whenwithKeySerde(final doing to and through operations */ public class TopicOptions<K, V> {Serde<K> keySerde) public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) public static <KJoined<K, V> TopicOptions<KV, V>VO> serdeswithOtherValueSerde(final Serde<VO> otherValueSerde) } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Optional arguments that can be specified when doing to and through operations */ public static <K, V> Partitioned<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) Serde<K> keySerde, final Serde<V> valueSerde) { return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde); } public static <K, V> TopicOptions<KPartitioned<K, V> optionswith(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) { public static <K, V> Partitioned<K, V> keySerde(final Serde<K> keySerde) public static return<K, newV> TopicOptions<KPartitioned<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde).withStreamPartitioner(partitioner); } public TopicOptions<K valueSerde(final Serde<V> valueSerde) public static <K, V> Partitioned<K, V> withStreamPartitionerstreamPartitioner(final StreamPartitioner<K, V> partitioner) { public Partitioned<K, V> withStreamPartitioner(final StreamPartitioner<K, V> return null; } public TopicOptions<Kpartitioner) public Partitioned<K, V> withValueSerde(final Serde<V> valueSerde) { public Partitioned<K, V> withKeySerde(final return null; } public TopicOptions<K, V> withKeySerde(final Serde<K> keySerde) { return null; } }Serde<K> keySerde) |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * options that can be used when printing to stdout our writing to a file */ public class PrintOptions<K, V> { public static <K, V> PrintOptions<K, V> labeled(final String label) public static <K, V> PrintOptions<K, V> toFile(final String filepath) public static <K, V> PrintOptions<K, V> sysOut(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label) class Printed<K, V> { public static <K, V> PrintOptions<KPrinted<K, V> toFile(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String labelfilepath) public static <K, V> Printed<K, V> toSysOut() public PrintOptions<KPrinted<K, V> withLabel(final String label) publicprivate PrintOptions<KPrinted<K, V> withFile(final String filepath) public PrintOptions<KPrinted<K, V> withKeySerde(final Serde<K> keySerde) public PrintOptions<KPrinted<K, V> withValueSerde(final Serde<V> keySerde) public PrintOptions<KPrinted<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper) } |
...
|
Code Block |
---|
/** * Implementations of this will provide the ability to wrap a given StateStore * with or without caching/loggging etc. */ public interface StateStoreBuilder<T extends StateStore> { StateStoreBuilder<T> withCachingEnabled(); StateStoreBuilder<T> withCachingDisabled(); StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config); StateStoreBuilder<T> withLoggingDisabled(); T build(); } |
...