DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/gfkxvtoswthf62h1p4t5zktn44qxsp0t
JIRA: KAFKA-18053
Motivation
This KIP aims to address several issues related to the configuration application process in Kafka Streams.
The current API for configuring Kafka Streams before building the topology is cumbersome and prone to errors. It can lead to awkward code and confusion for users, and in the worst case, some configurations may be silently ignored, resulting in unintended application behavior.
We aim to address three key issues:
Silent Misconfiguration
The KafkaStreams constructor requires application-level configurations, while the topology can be built without them. As a result, most users pass their configurations only to KafkaStreams, unaware that certain settings also need to be applied to the topology. This can lead to subtle misconfigurations that go unnoticed.
The list of topology-specific configs includes:
topology.optimization
processor.wrapper.class
default.dsl.store
ensure.explicit.internal.resource.naming
Note that default.dsl.store is deprecated since 3.7
Fragmented Configuration APIs
Over time, multiple APIs have been introduced for setting configurations at different stages of topology construction. Users now face several ways to pass configurations, and some settings may work across multiple APIs, while others require a specific one. This inconsistency complicates the user experience.
Unnecessary Complexity with `TopologyConfig
The TopologyConfig class was originally intended for internal use, specifically to separate configurations between different topologies. A feature that has since been deprecated and will soon be removed. Despite this, the most recent APIs for passing configurations into a topology (such as the Topology and StreamsBuilder constructors) use TopologyConfig instead of StreamsConfig. This extra wrapper is unnecessary, as StreamsConfig would suffice. In fact, TopologyConfig is internally constructed from StreamsConfig, adding an unnecessary layer of complexity for users.
By addressing these issues, we aim to simplify configuration management, reduce the risk of misconfiguration, and provide a more intuitive API for Kafka Streams users.
Proposed Changes
- Deprecate TopologyConfig in favor of StreamsConfig.
- Deprecate StreamsBuilder#build(StreamsConfig) and direct users to pass configurations into the StreamsBuilder constructor instead.
- Deprecate StreamsBuilder and Topology constructors that accept TopologyConfig, replacing them with constructors that accept a Properties or Map.
- Improve misconfiguration detection by:
- Throwing an exception if a topology-specific config is set in `StreamsConfig` but not applied to the topology.
- Logging a warning if a topology-specific config is set in both places but has different values.
Old way to pass topology-specific config:
// the processor.wrapper.class, default.dsl.store, ensure.explicit.internal.resource.naming // and topology.optimization configs must be set at different levels // no validation exists Properties properties = new Properties(); ... TopologyConfig topologyConfig = new TopologyConfig(streamsConfig); // => for processor.wrapper.class, default.dsl.store, ensure.explicit.internal.resource.naming StreamsBuilder streamsBuilder = new StreamsBuilder(topologyConfig); Topology topology = streamsBuilder.build(properties); // => for topology.optimization KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig);
New way to pass topology-specific config:
// the processor.wrapper.class, default.dsl.store, ensure.explicit.internal.resource.naming and topology.optimization // configs must be set to the StreamsBuilder constructor // An Exception will be thrown if one of the topology-specific config is pass to KafkaStreams but not to the StreamsBuilder Properties properties = new Properties(); ... StreamsBuilder streamsBuilder = new StreamsBuilder(properties); Topology topology = streamsBuilder.build(); StreamsConfig streamsConfig = new StreamsConfig(properties); KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig);
Public Interfaces
Deprecations:
@Deprecated
public class TopologyConfig { ... }
public class StreamsBuilder {
@Deprecated
public StreamsBuilder(final TopologyConfig topologyConfigs) { ... }
@Deprecated
public synchronized Topology build(final Properties props) { ... }
}
public class Topology {
@Deprecated
public Topology(final TopologyConfig topologyConfigs) { ... }
}
Add new constructors:
public class StreamsBuilder {
public StreamsBuilder(final Map<String, Object> configs) { ... }
public StreamsBuilder(final Properties properties) { ... }
}
public class Topology {
public Topology(final Map<String, Object> configs) { ... }
public Topology(final Properties properties) { ... }
}
Compatibility, Deprecation, and Migration Plan
N/A
Test Plan
Unit tests should continue to succeed
Rejected Alternatives
- Add StreamsBuilder & Topology constructors that accept a plain config map (or Properties) alongside the StreamsConfig constructors.
- Move all the topology-specific configs from StreamsConfig to TopologyConfig. This means removing them from StreamsConfig completely.
While separating configs by application makes sense, proliferating config classes complicates discovery and expands the API surface. - Deprecate the default. StreamsBuilder constructor, in order to force developers to pass configs in. But since the new check should help prevent developers from
accidentally forgetting to pass the topology-specific configs into the Topology/StreamsBuilder, then deprecating the default constructors seems unnecessary.