Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, while building a new Topology through the KStreams DSL the processors are automatically named.
The genarated names are prefixed depending of the operation (i.e KSTREAM-SOURCE, KSTREAM-FILTER, KSTREAM-MAP, etc).
To debug/understand a topology it is possible to display the processor lineage with the method Topology#describe(). However, a complex topology with dozens of operations can be hard to understand if the processor names are not relevant.
It would be useful to be able to set more meaningful names. For example, a processor name could describe the business rule performed by a map() operation.
Public Interfaces
First, we propose to add one new class Processed that can be used to provide the optional parameters when processing each record (i.e to set the processor name). This class will be use by stateless operations (ie. map(), filter(), etc).
package org.apache.kafka.streams.kstream; import java.util.Objects; /** * This class is used to provide the optional parameters when processing each record. */ public class Processed { protected final String processorName; protected Processed(final String processorName) { this.processorName = processorName; } protected Processed(final Processed processed) { this(processed.getProcessorName()); } /** * Create a Processed instance with provided processor name. * @param processorName name to use to describe the {@link org.apache.kafka.streams.processor.Processor}. * @return A new {@link Processed} instance configured with processorName */ public static Processed with(final String processorName) { return new Processed(processorName); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Processed processed = (Processed) o; return Objects.equals(processorName, processed.processorName); } @Override public int hashCode() { return Objects.hash(processorName); } public String getProcessorName() { return processorName; } }
Then we propose to add new methods to the existing classes : Consumed, Produced, Joined and Printed in order to be able to define custom processor names.
Example for the Consumed class :
/** * Create an instance of {@link Consumed} with provided processor name. * * @param processorName the processor name to be used. If {@code null} a default processor name will be generated * @param <K> key type * @param <V> value type * @return a new instance of {@link Consumed} */ public static <K, V> Consumed<K, V> with(final String processorName) { return new Consumed<>(null, null, null, null, processorName); } /** * Configure the instance of {@link Consumed} with a key {@link Serde}. * * @param processorName the processor name to be used. If {@code null} a default processor name will be generated * @return this */ public Consumed<K, V> withProcessorName(final String processorName) { this.processorName = processorName; return this; }
Proposed Changes
- Implement the new class Processed and update the KStream/KStreamImpl methods that add stateless processor (i.e map(), mapValue(), filter()...).
- Update the Consumed, Produced and Joined classes to be able to set a processor name.
- The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
Below is an application example :
final StreamsBuilder builder = new StreamsBuilder(); builder.<String, String>stream("topic-input", Consumed.with("CONSUME-FROM-INPUT-TOPIC")) .filter( (k, v) -> true, Processed.with("FILTER-NULL-VALUE")) .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()), Processed.with("MAP-TO-UPPERCASE")) .to("topic-output", Produced.with("PRODUCE-TO-OUTPUT-TOPIC")); System.out.println(builder.build().describe().toString()); ---- (output)---- Topologies: Sub-topology: 0 Source: CONSUME-FROM-INPUT-TOPIC-0000000000 (topics: [topic-input]) --> FILTER-NULL-VALUE-0000000001 Processor: FILTER-NULL-VALUE-0000000001 (stores: []) --> MAP-TO-UPPERCASE-0000000002 <-- CONSUME-FROM-INPUT-TOPIC-0000000000 Processor: MAP-TO-UPPERCASE-0000000002 (stores: []) --> PRODUCE-TO-OUTPUT-TOPIC-0000000003 <-- FILTER-NULL-VALUE-0000000001 Sink: PRODUCE-TO-OUTPUT-TOPIC-0000000003 (topic: topic-output) <-- MAP-TO-UPPERCASE-0000000002
Compatibility, Deprecation, and Migration Plan
No compatibility issues foreseen.
Rejected Alternatives
No rejected alternatives