This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-221: Enhance KStream with Connecting Topic Creation and Repartition Hint
Skip to end of metadata
Go to start of metadata



Current stateUnder Discussion

Discussion thread: TBD

JIRA:   KAFKA-6037 - Getting issue details... STATUS KAFKA-4835 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


The main motivation of this KIP is stated in the related JIRA: "Today the downstream sub-topology's parallelism (aka the number of tasks) are purely dependent on the upstream sub-topology's parallelism, which ultimately depends on the source topic's num.partitions. However this does not work perfectly with dynamic scaling scenarios". By delegating the stream topology power to create repartition topic with customized number of partitions gives user more flexibility. Also, for API like #to or #through, KStream has no access to sink topic creation, which means user has to create their own connecting Kafka topic for every single new application which is cumbersome. Thus we are proposing extending the capability of #Produced and #Grouped API to automatically create connecting topics through KStream application when the target topic has not created.

Public interfaces

We shall expand the Produced API to contain numPartitions:
public class Produced<K, V> {
	protected Serde<K> keySerde;
	protected Serde<V> valueSerde;
	protected StreamPartitioner<? super K, ? super V> partitioner;
	protected final Integer numPartitions; // new

	public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
                                         	 final Serde<V> valueSerde,
                                             final StreamPartitioner<? super K, ? super V> partitioner,
										     final Integer numPartitions);

Also expand Grouped API with a numPartitions configuration:
public class Grouped<K, V> {
 	protected final Serde<K> keySerde;
 	protected final Serde<V> valueSerde;
 	protected final String name;
	protected final Integer numPartitions; // new

	public static <K, V> Grouped<K, V> with(final String name,
    	                                    final Serde<K> keySerde,
        	                                final Serde<V> valueSerde,
											final Integer numPartitions);

Proposed Changes

When configured number of partitions,  KStream application will first issue the topic lookup request and check whether the target topic is already up and running. If the target topic already exists, KStream application will log the error for topic creation failure, and still continue the initialization. For grouping operation, repartition topic gets a hint on how many partitions it should be created with. If repartition topic is not created yet, create one with specified numPartitions; otherwise use the upstream topic partition size as the new topic number of partitions. One open question is whether we should prevent destructive operation such as changing the repartition topic size if current topic numPartitions != configured numPartitions, since there could be potential data loss.

Backward Compatibility

This is a pure KStream library change that shouldn't affect previously setup applications. If numPartitions is not configured by user, it will use a default value -1 which won't affect existing topology initialization logic.

Rejected Alternatives


  • No labels