Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/gfkxvtoswthf62h1p4t5zktn44qxsp0t

JIRA: KAFKA-18053

Motivation

This KIP aims to address several issues related to the configuration application process in Kafka Streams.

The current API for configuring Kafka Streams before building the topology is cumbersome and prone to errors. It can lead to awkward code and confusion for users, and in the worst case, some configurations may be silently ignored, resulting in unintended application behavior.

We aim to address three key issues:

Silent Misconfiguration

The KafkaStreams constructor requires application-level configurations, while the topology can be built without them. As a result, most users pass their configurations only to KafkaStreams, unaware that certain settings also need to be applied to the topology. This can lead to subtle misconfigurations that go unnoticed.

The list of topology-specific configs includes:

  • topology.optimization
  • processor.wrapper.class 
  • default.dsl.store
  • ensure.explicit.internal.resource.naming

Note that default.dsl.store is deprecated since 3.7

Fragmented Configuration APIs

Over time, multiple APIs have been introduced for setting configurations at different stages of topology construction. Users now face several ways to pass configurations, and some settings may work across multiple APIs, while others require a specific one. This inconsistency complicates the user experience.

Unnecessary Complexity with `TopologyConfig

The TopologyConfig class was originally intended for internal use, specifically to separate configurations between different topologies. A feature that has since been deprecated and will soon be removed. Despite this, the most recent APIs for passing configurations into a topology (such as the Topology and StreamsBuilder constructors) use TopologyConfig instead of StreamsConfig. This extra wrapper is unnecessary, as StreamsConfig would suffice. In fact, TopologyConfig is internally constructed from StreamsConfig, adding an unnecessary layer of complexity for users.

By addressing these issues, we aim to simplify configuration management, reduce the risk of misconfiguration, and provide a more intuitive API for Kafka Streams users.

Proposed Changes

  • Deprecate TopologyConfig in favor of StreamsConfig.
  • Deprecate StreamsBuilder#build(StreamsConfig) and direct users to pass configurations into the StreamsBuilder constructor instead.
  • Deprecate StreamsBuilder and Topology constructors that accept TopologyConfig, replacing them with constructors that accept a Properties or Map.
  • Improve misconfiguration detection by:
    • Throwing an exception if a topology-specific config is set in `StreamsConfig` but not applied to the topology.
    • Logging a warning if a topology-specific config is set in both places but has different values.

Old way to pass topology-specific config:

// the processor.wrapper.class, default.dsl.store, ensure.explicit.internal.resource.naming
// and topology.optimization configs must be set at different levels
// no validation exists
Properties properties = new Properties();
...
TopologyConfig topologyConfig = new TopologyConfig(streamsConfig);  // => for processor.wrapper.class, default.dsl.store, ensure.explicit.internal.resource.naming
StreamsBuilder streamsBuilder = new StreamsBuilder(topologyConfig);
Topology topology = streamsBuilder.build(properties); // => for topology.optimization
KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig);

New way to pass topology-specific config:

// the processor.wrapper.class, default.dsl.store, ensure.explicit.internal.resource.naming and topology.optimization
// configs must be set to the StreamsBuilder constructor
// An Exception will be thrown if one of the topology-specific config is pass to KafkaStreams but not to the StreamsBuilder
Properties properties = new Properties();
...
StreamsBuilder streamsBuilder = new StreamsBuilder(properties);
Topology topology = streamsBuilder.build();
StreamsConfig streamsConfig = new StreamsConfig(properties);
KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig);

Public Interfaces

Deprecations:

@Deprecated
public class TopologyConfig { ... }

public class StreamsBuilder {

    @Deprecated
    public StreamsBuilder(final TopologyConfig topologyConfigs) { ... }

    @Deprecated
    public synchronized Topology build(final Properties props) { ... }

}

public class Topology {

    @Deprecated
    public Topology(final TopologyConfig topologyConfigs) { ... }
}


Add new constructors:

public class StreamsBuilder {

    public StreamsBuilder(final Map<String, Object> configs) { ... }

    public StreamsBuilder(final Properties properties) { ... }

   }

public class Topology {

    public Topology(final Map<String, Object> configs) { ... }

    public Topology(final Properties properties) { ... }

   }

Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

Unit tests should continue to succeed

Rejected Alternatives

  1. Add StreamsBuilder & Topology constructors that accept a plain config map (or Properties) alongside the StreamsConfig constructors. 
  2. Move all the topology-specific configs from StreamsConfig to TopologyConfig. This means removing them from StreamsConfig completely. 
    While separating configs by application makes sense, proliferating config classes complicates discovery and expands the API surface.
  3. Deprecate the default. StreamsBuilder constructor, in order to force developers to pass configs in. But since the new check should help prevent developers from
    accidentally forgetting to pass the topology-specific configs into the Topology/StreamsBuilder, then deprecating the default constructors seems unnecessary.


  • No labels