DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here
JIRA:
KAFKA-19935
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Streams applications often evolve their topology over time. When a topology changes, the following can happen unintentionally:
Internal topics such as changelog, repartition, or subscription / response topics change their names.
Source topics for a given topology are modified.
Sink topics for a topology are modified.
For stateful applications this can have serious operational impact:
When a state store name changes, Kafka Streams creates a new changelog topic. The application may start from an empty state while the old changelog still contains the previous state.
When a repartition topic name changes, the repartitioned state feeding downstream operators may be reset.
When internal subscription or response topics used by foreign key joins change, the internal protocol can be impacted.
When source or sink topic sets change unexpectedly, the application may start consuming from or producing to the wrong topics.
A particularly subtle and dangerous case is when applications rely on unnamed internal components (for example, unnamed state stores and internal topics that use auto-generated names),
and the topology is evolved by adding a new source node near the beginning of the topology. In this situation:
Kafka Streams assigns incremental identifiers to unnamed nodes and stores (for example,
KSTREAM-AGGREGATE-STATE-STORE-0000000001).Adding a new source early in the topology can shift these identifiers for all downstream nodes.
As a result, the corresponding internal changelog and repartition topic names also change, even though the logical processing semantics may be intended to remain the same.
In practice, simply replaying the source topics is often not enough to reconstruct the previous state:
If the application only processes records “after the last commit” in normal operation, and
Some keys appear very infrequently (for example, a restaurant with very few orders compared to others),
then the new state store built on top of the new internal topics may never converge to the previous state, because the rare keys that existed in the old store may not reappear in the input.
This leads to permanent divergence between the old and new state, which can be critical for applications that depend on consistent state (for example, financial aggregates, counters, or long-lived customer profiles).
Kafka Streams already provides mechanisms and KIPs to improve topology evolution and naming, but they primarily operate at configuration or runtime level, and many users still rely on unnamed internal topics and stores.
There is currently no built-in tooling that allows developers to compare two topologies offline and highlight these internal and external topic changes before deployment, especially in the “new source + unnamed internal topic” scenario described above.
This KIP proposes a snapshot-based, rule-based topology validator in the streams test utils module:
The validator compares two topology descriptions produced by
topology.describe().toString().It parses these descriptions into a simplified topology context.
It applies a configurable set of rules that look for specific types of changes in that context.
In the initial version, the built-in rules detect changes in:
Internal changelog topics
Internal repartition topics
Internal subscription or response topics
Source topics
Sink topics
The validator is designed to be:
Offline – it runs purely on topology descriptions without requiring a running cluster.
Rule-based and extensible – new rules can be added over time without changing the core design.
Policy-agnostic – it does not make a decision about whether a change is “correct” or “incorrect”; instead it surfaces structured
TopologyChangeentries that tests and CI pipelines can interpret (for example, by failing on certain change types and only logging warnings for others).
Importantly, the validator is not intended to be a complete or sound definition of topology compatibility.
Because it operates on a snapshot of the topology description and a finite set of simple rules, it can produce both false positives and false negatives:
It may flag changes that are intentional and safe for a given application, for example a deliberate rename of an internal topic where state migration is handled manually.
It may miss incompatibilities that are not visible at the topic level or that require deeper semantic understanding of the processing logic.
The goal of this KIP is therefore not to guarantee compatibility, but to provide a lightweight and opt in guardrail that highlights risky topic level changes early in development and CI.
Developers remain responsible for interpreting the reported changes in the context of their application.
This allows potentially risky topology changes, in particular implicit renames caused by adding new sources while using unnamed internal components, to be discovered and reviewed earlier, rather than being discovered in production.
Public Interfaces
All new types are added to the streams test utils module, under a package such as org.apache.kafka.streams.
Enum TopologyChangeType
Represents the type of a detected change.
public enum TopologyChangeType {
INTERNAL_CHANGELOG_TOPIC_CHANGED,
INTERNAL_CHANGELOG_TOPIC_REMOVED,
INTERNAL_CHANGELOG_TOPIC_ADDED,
INTERNAL_REPARTITION_TOPIC_CHANGED,
INTERNAL_REPARTITION_TOPIC_REMOVED,
INTERNAL_REPARTITION_TOPIC_ADDED,
INTERNAL_SUBSCRIPTION_TOPIC_CHANGED,
INTERNAL_SUBSCRIPTION_TOPIC_REMOVED,
INTERNAL_SUBSCRIPTION_TOPIC_ADDED,
SOURCE_TOPIC_CHANGED,
SOURCE_TOPIC_REMOVED,
SOURCE_TOPIC_ADDED,
SINK_TOPIC_CHANGED,
SINK_TOPIC_REMOVED,
SINK_TOPIC_ADDED
}
This KIP only defines types related to internal topics and source, sink topics.
More types could be added in future work.
Class TopologyChange
Represents a single detected change.
public final class TopologyChange {
private final TopologyChangeType type;
private final String detail;
public TopologyChange(final TopologyChangeType type, final String detail) {
this.type = type;
this.detail = detail;
}
// Support public getter such as type().
...
}
The detail field is a human readable description, for example "listing added or removed topic names".
Class TopologyChanges
public final class TopologyChanges {
private final List<TopologyChange> changes;
// Support public getter.
}
The TopologyChanges type is a simple container for one or more TopologyChange instances.
Rules are intended to be composable. The validator will run all configured rules and aggregate their results.
Class ParsedTopologyContext
A simplified summary of a topology, extracted from the string representation returned by Topology.describe().toString() .
public final class ParsedTopologyContext {
private final Set<String> changelogTopics;
private final Set<String> repartitionTopics;
private final Set<String> subscriptionTopics;
private final Set<String> sourceTopics;
private final Set<String> sinkTopics;
// Support public getter.
}
In the initial version, this context only exposes:
- Internal changelog topics.
- Internal repartition topics.
- Internal subscription / response topics
- Source topics
- Sink topics
The type is intentionally generic so that more topology summary fields can be added in future without breaking existing rules.
Interface TopologyParser
Abstraction to parse the string description of a topology into a ParsedTopologyContext .
public interface TopologyParser {
ParsedTopologyContext parse(String topologyDescription);
}
The validator will assume that topologyDescription is the result of topology.describe().toString() .
The default implementation shipped with streams test utils will:
- Parse the description line by line.
- Extract:
- Internal changelog topics
- Internal repartition topics
- Internal subscription and response topics
- Source topics
- Sink Topics
Based on the format of TopologyDescription.toString(), the existing naming conventions for internal topics, and a set of well-defined regular expression patterns used to extract each category.
Interface TopologyValidationRule
A rule that inspects the previous and current parsed topology context and returns detected changes.
public interface TopologyValidationRule {
TopologyChanges validate(ParsedTopologyContext previous,
ParsedTopologyContext current);
}
Class TopologyValidationResult
Represents the aggregated changes across all rules.
public final class TopologyValidationResult {
private final List<TopologyChange> topologyChanges;
public List<TopologyChange> changes() {
return topologyChanges;
}
public boolean hasChangeTypeOf(TopologyChangeType... types) {
// returns true if there is at least one change whose type matches any given type
}
public void warnIfHasChangeTypeOf(TopologyChangeType... types) {
// if there is a change of any given type, log a warning with the details
}
public void exceptionIfHasChangeTypeOf(TopologyChangeType... types) {
// if there is a change of any given type, throw a TopologyValidationException
}
}
- The core responsibility of this class is to carry the list of
TopologyChangeentries. - The helper methods hasChangeTypeOf, warnIfHasChangeTypeOf, and exceptionIfHasChangeTypeOf are convenience APIs to make usage in tests and CI. The exact behavior and exception type can be refined during KIP review.
- Applications are free to ignore the helper methods and work directly with the list of
TopologyChangeentries if they prefer to implement custom policies.
Class TopologyValidator
The main entry point for the validator.
public final class TopologyValidator {
private final TopologyParser parser;
private final List<TopologyValidationRule> rules;
public TopologyValidationResult validate(final String previousTopology,
final String currentTopology) {
final ParsedTopologyContext prev = parser.parse(previousTopology);
final ParsedTopologyContext curr = parser.parse(currentTopology);
final TopologyChangesBuilder changesBuilder = new TopologyChangesBuilder();
for (TopologyValidationRule rule : rules) {
final TopologyChanges changes = rule.validate(prev, curr);
changesBuilder.addChanges(changes);
}
return changesBuilder.buildResult();
}
// Support TopologyValidatorBuilder in here.
}
TopologyChangesBuilder is an internal helper class to accumulate change entries from multiple rules. (not displayed in here)
Application will typically construct a validator through the TopologyValidatorBuilder :
TopologyValidationResult result = TopologyValidator.builder()
.parser(new DefaultTopologyParser())
.addRule(new ChangeLogTopicChangeRule())
.addRule(new RepartitionTopicChangeRule())
.addRule(new SubscriptionTopicChangeRule())
.build()
.validate(previousTopologyDescription, currentTopologyDescription);
Rules
This KIP introduces several built in rule implementations:
public final class ChangeLogTopicChangeRule implements TopologyValidationRule {
@Override
public TopologyChanges validate(final ParsedTopologyContext previous,
final ParsedTopologyContext current) {
// compare previous.changelogTopics() and current.changelogTopics()
// create TopologyChange entries with INTERNAL_CHANGELOG_TOPIC_ADDED,
// INTERNAL_CHANGELOG_TOPIC_REMOVED, or INTERNAL_CHANGELOG_TOPIC_CHANGED as needed
}
}
public final class RepartitionTopicChangeRule implements TopologyValidationRule {
@Override
public TopologyChanges validate(final ParsedTopologyContext previous,
final ParsedTopologyContext current) {
// compare previous.repartitionTopics() and current.repartitionTopics()
// create TopologyChange entries with INTERNAL_REPARTITION_TOPIC_CHANGED
}
}
public final class SubscriptionTopicChangeRule implements TopologyValidationRule {
@Override
public TopologyChanges validate(final ParsedTopologyContext previous,
final ParsedTopologyContext current) {
// compare previous.subscriptionTopics() and current.subscriptionTopics()
// create TopologyChange entries with INTERNAL_SUBSCRIPTION_TOPIC_CHANGED
}
}
public final class SourceTopicChangeRule implements TopologyValidationRule { /* ... */ }
public final class SinkTopicChangeRule implements TopologyValidationRule { /* ... */ }
Proposed Changes
High Level Flow
The validator is intended for offline use in test and CI. A typical usage flow is:
- Generate a baseline topology description from a known stable version:
- build the topology in a test
- Call
topology.describe().toString(). - Store the string under test resources, for example in a text file.
- In a validation test or build step:
- Load the baseline description.
- Build the current topology and obtain its description string.
- Run the validator with both strings.
- Decide how to handle detected changes based on the
TopologyValidationResult.
Parsing
The TopologyParser implementation will:
- Receive a topology description string that was obtained via
topology.describe().toString(). - Parse the string line by line.
- Based on the existing format of
TopologyDescription.toString()and the naming convention for internal topics, extract:- internal changelog topics
- internal repartition topics
- internal subscription / response topics
- Source topics
- Sink topics
- Populate a
ParsedTopologyContextinstance with these sets.
This KIP explicitly limits the first version to the fields shown in ParsedTopologyContext.
Additional information (for example, store type or window configuration) can be added in future if needed. (First, TopologyDescription.toString() should expose that information).
Rule based change detection
Each TopologyValidationRule is responsible for comparing aspects of the previous and current ParsedTopologyContext and returning a TopologyChanges object.
The initial set of built in rules covers:
Internal changelog topics
Internal repartition topics
Internal subscription / response topics
Source topics
Sink topics
Each rule is focused and simple:
Determine the relevant sets (for example previous changelog topics and current changelog topics).
Compute added, removed, and potentially renamed entries.
Create
TopologyChangeinstances with appropriateTopologyChangeTypeand a detail string.Return these as
TopologyChanges.
The validator aggregates changes from all rules into a TopologyValidationResult .
Result handling and policy
The validator itself does not impose any policy. It only returns a structured TopologyValidationResult .
Applications and CI pipelines decide how strict to be.
Typical patterns include:
- Fail the test or build if any internal changelog topic change is detected:
result.exceptionIfHasChangeTypeOf(TopologyChangeType.INTERNAL_CHANGELOG_TOPIC_CHANGED,
TopologyChangeType.INTERNAL_CHANGELOG_TOPIC_ADDED,
TopologyChangeType.INTERNAL_CHANGELOG_TOPIC_REMOVED);
- Log a warning if only subscription internal topics changed:
result.warnIfHasChangeTypeOf(TopologyChangeType.INTERNAL_SUBSCRIPTION_TOPIC_CHANGED);
- Treat source and sink topic changes as an explicit signal to review the topology change:
if (result.hasChangeTypeOf(TopologyChangeType.SOURCE_TOPIC_CHANGED,
TopologyChangeType.SINK_TOPIC_CHANGED)) {
// maybe fail in CI or require manual approval
}
By separating detection from policy, the validator can be used in different ways depending on the needs and risk appetite of a given team.
Example
@Test
public void shouldNotChangeInternalTopicsAccidentally() throws IOException {
final String previousDescription = Files.readString(Paths.get("src/test/resources/topology-baseline.txt"));
final String currentDescription = buildTopology().describe().toString();
final TopologyValidationResult result = TopologyValidator.builder()
.parser(new DefaultTopologyParser())
.addRule(new ChangeLogTopicChangeRule())
.addRule(new RepartitionTopicChangeRule())
.addRule(new SubscriptionTopicChangeRule())
.build()
.validate(previousDescription, currentDescription);
// strict on internal state topics
result.exceptionIfHasChangeTypeOf(
TopologyChangeType.INTERNAL_CHANGELOG_TOPIC_CHANGED,
TopologyChangeType.INTERNAL_REPARTITION_TOPIC_CHANGED
);
// softer on subscription topics
result.warnIfHasChangeTypeOf(TopologyChangeType.INTERNAL_SUBSCRIPTION_TOPIC_CHANGED);
// optional checks on source and sink topic changes
if (result.hasChangeTypeOf(TopologyChangeType.SOURCE_TOPIC_CHANGED,
TopologyChangeType.SINK_TOPIC_CHANGED)) {
logger.warn("Source or sink topics changed: {}", result.changes());
}
}
Compatibility, Deprecation, and Migration Plan
All new types are added in the streams test utils module. (under streams/test-utils/src/main/java/org/apache/kafka/streams/)
No existing public APIs are changed.
Runtime behavior of Kafka Streams applications is not affected.
Adoption is fully opt in.
Applications can introduce the validator gradually by:
Capturing a baseline topology description.
Adding validation tests for selected topologies.
Tightening or relaxing policies over time.
Test Plan
Parsing tests for the default TopologyParser
Use representative outputs of
Topology.describe().toString()Cover topologies with internal changelog topics, repartition topics, subscription and response topics, multiple sources and sinks
Rule tests for each built in rule
Verify detection of added, removed, and changed internal changelog topics
Verify detection of changes in repartition topics
Verify detection of changes in subscription topics
Verify detection of changes in source and sink topics
Aggregation tests for TopologyValidationResult
Verify that changes from multiple rules are aggregated correctly
Verify that hasChangeTypeOf works as expected for different combinations of types
Verify that warnIfHasChangeTypeOf and exceptionIfHasChangeTypeOf behave as documented
Limitations and false positives
The TopologyValidator proposed in this KIP has several intentional limitations.
First, it only looks at information that is visible in the string representation of the topology, as returned by Topology.describe().toString(). It does not inspect:
Application specific semantics of processors
State store contents or schemas
External systems that consume from or produce to the topics
Second, the validator is rule based. The initial rules focus on:
Internal changelog topics
Internal repartition topics
Internal subscription and response topics
Source and sink topics
This means that the validator can produce both false positives and false negatives:
False positives
Some deployments may intentionally rename internal topics, for example as part of a controlled state migration.
Teams might add or remove source or sink topics as part of an expected rollout.
In such cases, the validator will still report TopologyChange entries, but these changes are intentional and safe.
False negatives
Some incompatible changes may not be visible at the topic level at all, for example changes in record semantics, schemas, or processing logic that do not affect internal or external topics.
Future changes to the TopologyDescription string format, or highly custom topologies, might make it harder for the default parser to extract all relevant information.
Because of these limitations, the validator must be understood as:
A guardrail that highlights potentially risky topic level changes early, not a proof of safety or compatibility.
A tool that surfaces structured signals, leaving the final decision to fail builds or accept changes to the application owner.
To mitigate false positives, applications and CI pipelines can:
Tailor their policies per change type, for example failing on internal changelog topic changes while only logging warnings for subscription topics.
Inspect the detail messages included in TopologyChange entries to confirm whether a reported change is expected.
Optionally, maintain allow lists or baselines for known and accepted changes.
To mitigate false negatives, this KIP proposes a minimal and extensible design:
ParsedTopologyContext can be extended with additional fields in future work.
New TopologyValidationRule implementations can be added as we gain more experience with real world topologies and incident patterns.
Rejected Alternatives
Runtime validation at application startup
Another option would be to validate topology compatibility at runtime, when a Streams application starts, using the previously deployed topology as a baseline.
This KIP deliberately does not take that approach and limits itself to an offline tool in the streams-test-utils module.
Drawbacks:
- It would require additional infrastructure to persist and look up the “previous” topology in a production environment (for example, an extra topic, an external store, or custom metadata management). The proposed validator only needs a text snapshot that can be checked into version control and used in unit tests.
- Tying validation to application startup means that misconfigurations or unexpected topology changes can delay or prevent applications from starting, which is operationally sensitive. Many teams prefer a clear separation between “can this process start?” and “is this change operationally safe?”.
- Even with runtime validation, developers would still only discover problems at deployment time. The goal of this KIP is to surface signals earlier, in unit tests and CI, before a new version is rolled out to a cluster.
Using broker metadata instead of topology description
Deriving changes from actual broker metadata and topic listings was considered.
Drawbacks:
Requires access to a running cluster.
Couples validation to cluster state instead of logical topology.
Makes it difficult to run purely as a unit test.
The proposed approach is intentionally offline and only depends on the topology description string.
Full graph diff of topology structure
Reconstructing the full topology graph, including all nodes and edges, and performing a graph diff could detect deeper structural changes. However:
It is significantly more complex to implement and maintain.
It risks being misunderstood as a full logical compatibility checker, which is out of scope for this KIP.
For many real world incidents, internal topic and source / sink topic changes are already the critical signals.
This KIP focuses on a smaller, more targeted problem: detecting changes in internal and external topics that are directly visible in the topology description. More advanced graph analysis can be considered in future work if this validator proves useful.