...
Code Block | ||||
---|---|---|---|---|
| ||||
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, GroupByOptions<KR, VR> groupByOptions); KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized); KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized); <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<K, V>> materialized); void to(final String topic, final TopicOptions<V, V> options); KTable<K, V> through(final String topic, final Materialized<K, V> options); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, V, KeyValueStore<K, VR>> materialized); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, V, KeyValueStore<K, VR>> materialized); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, V, KeyValueStore<K, VR>> materialized); |
Code Block | ||||
---|---|---|---|---|
| ||||
public static <K<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows); SessionWindowedKStream<KStateStoreSupplier<KeyValueStore<K, V>V>> sessionWindowedBypersistentKeyValueStore(SessionWindows sessionWindows); KTable<K, Long> count(final Materialized materialized); KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final String name, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR>Serde<K> aggValueSerdekeySerde, final Materialized<K, V, KeyValueStore<K, V>> materialized); | ||||
Code Block | ||||
| ||||
public interface WindowedKStream<K, V> { KTable<Windowed<K>, Long> count(); KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> inMemoryKeyValueStore(final String name, final Aggregator<? super K, ? super V, VR> aggregator); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V,final VR>Serde<K> aggregatorkeySerde, final Materialized<K, VR, WindowStore<K, VR>> materializedAs); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final String name, final Materialized<K, V, WindowStore<K, V>> materializedAs); } | ||||
Code Block | ||||
| ||||
public interface SessionWindowedKStream<K, V> { KTable<Windowed<K>, Long> count(); KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs); <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final int capacity, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<K> keySerde, final Merger<? super K, T> sessionMerger); <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<WindowStore<K, V>> persistentWindowStore(final Aggregator<? super K, ? super V, VR> aggregator, String name, final Merger<? super K, T> sessionMerger, final Windows windows, final Materialized<K, VR, SessionStore<K, VR>> materializedAs); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Serde<K> keySerde, final Materialized<K, V, SessionStore<K, V>> materializedAs); } | ||||
Code Block | ||||
| ||||
/** * Used when materializing a state store */ public class Materialized<K, V, S extends StateStore> { public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier) public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde) public Materialized<K, V, S> withKeySerde(final Serde<K>Serde<V> valueSerde) public static <K, V> public Materialized<KStateStoreSupplier<SessionStore<K, V, S> withLoggingEnabledV>> persistentSessionStore(final Map<StringString name, String> topicConfig) public Materialized<K, V, S> withLoggingDisabled() public Materialized<K, V, S> withCachingEnabled() public Materialized<K, V, S> withCachingDisabled() } | ||||
Code Block | ||||
| ||||
/** * Optional params that can be passed to groupBy and groupByKey operations */ public class GroupByOptions<K, V> { public static <K, V> GroupByOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) { return new GroupByOptions<>(); } final SessionWindows windows, public GroupByOptions<K, V> withKeySerde() public GroupByOptions<K, V> withValueSerde() } | ||||
Code Block | ||||
| ||||
/** * Optional params that can be passed to join, leftJoin, outerJoin operations */ public class JoinOptions<K, V, VO> { public static <K, V, VO> JoinOptions<K, V, VO> serdes(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde) public JoinOptions<K, V, VO> withKeySerde(final Serde<K> keySerde) public JoinOptions<K, V, VO> withValueSerde(final Serde<V> valueSerde) publicfinal JoinOptions<K,Serde<K> VkeySerde, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) } | ||||
Code Block | ||||
| ||||
/** * Optional arguments that can be specified when doing to and through operations */ public class TopicOptions<K, V> { public static <K, V> TopicOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) { return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde); } public static <K, V> TopicOptions<K, V> options(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) { final Serde<V> valueSerde) return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde).withStreamPartitioner(partitioner); } public TopicOptions<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) { return null; } public TopicOptions<K, V> withValueSerde(final Serde<V> valueSerde) { public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final StateStoreSupplier<WindowStore<K, V>> supplier) public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier) public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final StateStoreSupplier<SessionStore<K, V>> supplier) |
Code Block | ||||
---|---|---|---|---|
| ||||
public synchronized <K, V> TopologyBuilder addGlobalStore(final StateStoreBuilder storeSupplier, return null; } public TopicOptions<K, V> withKeySerde(final Serde<K> keySerde) { return null; } } | ||||
Code Block | ||||
/** * options that can be used when printing to stdout our writing tofinal aString filesourceName, */ public class PrintOptions<K, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier) public synchronized final TopologyBuilder addStateStore(final StateStoreBuilder supplier, final String... processorNames) |
Code Block |
---|
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);
SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows);
KTable<K, Long> count(final Materialized materialized);
KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde,
final Materialized<K, V, KeyValueStore<K, V>> materialized); |
Code Block | ||||
---|---|---|---|---|
| ||||
public interface WindowedKStream<K, V> {
KTable<Windowed<K>, Long> count();
KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, WindowStore<K, VR>> materializedAs);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, WindowStore<K, V>> materializedAs);
} |
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SessionWindowedKStream<K, V> {
KTable<Windowed<K>, Long> count();
KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs);
<VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Merger<? super K, T> sessionMerger);
<VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Merger<? super K, T> sessionMerger,
final Materialized<K, VR, SessionStore<K, VR>> materializedAs);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, SessionStore<K, V>> materializedAs);
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Used when materializing a state store
*/
public class Materialized<K, V, S extends StateStore> {
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier)
public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)
public Materialized<K, V, S> withKeySerde(final Serde<K> valueSerde)
public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)
public Materialized<K, V, S> withLoggingDisabled()
public Materialized<K, V, S> withCachingEnabled()
public Materialized<K, V, S> withCachingDisabled()
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional params that can be passed to groupBy and groupByKey operations
*/
public class GroupByOptions<K, V> {
public static <K, V> GroupByOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
return new GroupByOptions<>();
}
public GroupByOptions<K, V> withKeySerde()
public GroupByOptions<K, V> withValueSerde()
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional params that can be passed to join, leftJoin, outerJoin operations
*/
public class JoinOptions<K, V, VO> {
public static <K, V, VO> JoinOptions<K, V, VO> serdes(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)
public JoinOptions<K, V, VO> withKeySerde(final Serde<K> keySerde)
public JoinOptions<K, V, VO> withValueSerde(final Serde<V> valueSerde)
public JoinOptions<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional arguments that can be specified when doing to and through operations
*/
public class TopicOptions<K, V> {
public static <K, V> TopicOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde);
}
public static <K, V> TopicOptions<K, V> options(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) {
return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde).withStreamPartitioner(partitioner);
}
public TopicOptions<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) {
return null;
}
public TopicOptions<K, V> withValueSerde(final Serde<V> valueSerde) {
return null;
}
public TopicOptions<K, V> withKeySerde(final Serde<K> keySerde) {
return null;
}
} |
Code Block |
---|
/** * options that can be used when printing to stdout our writing to a file */ public class PrintOptions<K, V> { public static <K, V> PrintOptions<K, V> labeled(final String label) public static <K, V> PrintOptions<K, V> toFile(final String filepath) public static <K, V> PrintOptions<K, V> sysOut(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label) V> { public static <K, V> PrintOptions<K, V> labeledtoFile(final String label) public static <K, V> filePath, final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label) public PrintOptions<K, V> toFilewithLabel(final String filepathlabel) public static <K, V> PrintOptions<K, V> sysOutwithFile(final KeyValueMapper<? super K, ? super V, String> mapper, String filepath) public PrintOptions<K, V> withKeySerde(final Serde<K> keySerde,) final Serde<V>public valSerdePrintOptions<K, V> withValueSerde(final StringSerde<V> labelkeySerde) public static <K, V> PrintOptions<K, V> toFilewithKeyValueMapper(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label) public PrintOptions<K, V> withLabel(final String label) public PrintOptions<K, V> withFile(final String filepath)String> mapper) } |
Code Block |
---|
/** * Implementations of this will provide the ability to wrap a given StateStore * with or without caching/loggging etc. */ public interface StateStoreBuilder<T extends StateStore> { public PrintOptions<K, V> withKeySerde(final Serde<K> keySerde) StateStoreBuilder<T> withCachingEnabled(); StateStoreBuilder<T> withCachingDisabled(); publicStateStoreBuilder<T> PrintOptions<KwithLoggingEnabled(Map<String, V> withValueSerde(final Serde<V> keySerde) String> config); public PrintOptions<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)StateStoreBuilder<T> withLoggingDisabled(); T build(); } |
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
...