Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here 

JIRA: here

...

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/AN/A
filterNot(Predicate, Named)YESstatic Named#as(String)N/AN/A
selectKey(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
map(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
flatMap(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
print(Printed)NOstatic Printed#as(String)N/AN/A
foreach(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
peek(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
branch(Named, Predicate...)YESstatic Named#as(String)N/AN/A
through(String, Produced)NOstatic Produced#as(String)N/AN/A
to(String, Produced)NOstatic Produced#as(String)N/AN/A
to(TopicNameExtractor, Produced)NOstatic Produced#as(String)N/AN/A
transform(TransformerSupplier, Named, String... )YESstatic Named#as(String)N/AN/A
transformValues(ValueTransformerSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
transformValues( ValueTransformerWithKeySupplier, Named, String...)YESstatic Named#as(String)N/AN/A
process(ProcessorSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
join( KStream, ValueJoiner, JoinWindows windows, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)static Joined#as(final String name)
leftJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)static Joined#as(final String name)
outerJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)static Joined#as(final String name)
join(KTable, ValueJoiner, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)N/A
leftJoin(KTable, ValueJoiner, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)N/A
join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)YESstatic Named#as(String)
N/AN/A
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner, Named)YESstatic Named#as(String)
N/AN/A
flatTransform(TransformerSupplier, Named named, String... stateStoreNames)YESstatic Named#as(String)N/AN/A
flatTransformValues(ValueTransformerWithKeySupplier, Named,  String... )YESstatic Named#as(String)N/AN/A
flatTransformValues(ValueTransformerSupplier, Named, String...)YESstatic Named#as(String)N/AN/A


KTable (16 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
filter(Predicate, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
filterNot(Predicate, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
filterNot(Predicate, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
mapValues(ValueMapper, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
mapValues(ValueMapperWithKey, Named, Materialized);YESstatic Named#as(String)N/Astatic Materialized#as(String)
suppress(Suppressed)NOSuppressed#withName(String)N/AN/A
transformValues(ValueTransformerWithKeySupplier, Named, String...)YESstatic Named#as(String)N/A(PREFIX + COUNT)
transformValues(ValueTransformerWithKeySupplier, Materialized, Named, String...)YESstatic Named#as(String)N/Astatic Materialized#as(String)
groupBy(KeyValueMapper, KeyValue, Grouped)NOstatic Grouped#as(String)static Grouped#as(String)N/A
join(KTable, ValueJoiner, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
join(KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
leftJoin(KTable, ValueJoiner, Named);YESstatic Named#as(String)N/A(PREFIX + COUNT)
leftJoin(KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
outerJoin(KTable, ValueJoiner, Named);YESstatic Named#as(String)N/A(PREFIX + COUNT)
outerJoin(KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
toStream(Named)YESstatic Named#as(String)N/AN/A
toStream(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A

...

User provided node name should follow the same restrictions that ones currently apply to state stores during the create of Materialized instance.

The Currently, the Materialized class relies on the static method TopicTopic#validate. validate(). This method ensure that a provided name only contains legal characters [a-zA-Z0-9._-] and have a maximum length of 249.

...

We propose to copy methods from Topic.validate to Named.validate in order to be able to Topic#validate into Named. This new method will be used validate both store names and node names using the same method. This also have the benefit . The benefit is to remove a dependency with the code core module.

In addition, in case of a bad name, the Materialized class will throw a TopologyException instead of InvalidTypeException while building the topologytopology in case of a unvalid name instead of InvalidTopicException .


Proposed Changes

  • Implement the new interface NamedOperation and default class Named 
  • Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
  • Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
  • Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length us of 249.


Below is an application example : 

...