Status

Current state: Under discussion

Discussion thread: here 

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams applications are opaque to cluster operators. When an application hits performance degradation or a protracted rebalance, operators cannot inspect the processing graph — the individual source, processor, and sink nodes, their predecessor/successor relationships, and the state stores they touch — without access to the application source code or logs. KIP-1071 (Streams Rebalance Protocol) already has clients send a topology to the broker, but only the subset the coordinator needs for assignment: subtopologies, source topics, changelog topics, and copartition groups. The full processing graph is not part of that assignment topology and is not currently available on the broker.

This KIP proposes a mechanism to send the full topology description from the client to the broker, where a pluggable backend can store and expose it for operational tooling and topology visualization in management UIs. The design follows the pattern established by KIP-714 (Client Metrics and Observability): the broker acts as a conduit, receiving data from clients and delegating storage and presentation to a plugin implementation. This keeps the broker itself simple and the feature extensible.

Public Interfaces

New Broker Configuration

Configuration nameDescriptionValues
group.streams.topology.description.classThe fully qualified class name of a StreamsGroupTopologyDescriptionPlugin implementation. When not set, the feature is disabled.Type: class, Default: empty string

New Client Configuration

Configuration nameDescriptionValues
topology.description.push.enabledControls whether the Kafka Streams client sends topology descriptions to the broker when requested. When set to false, the client ignores any TopologyDescriptionId set in heartbeat responses.Type: boolean, Default: true

StreamsGroupHeartbeatResponse Change

StreamsGroupHeartbeatResponse is bumped to the next version (N) and gains a new field at that version:

{ "name": "TopologyDescriptionId", "type": "uuid", "versions": "N+",
  "nullableVersions": "N+", "default": "null",
  "about": "When non-null, the client should push its current topology description tagged with this id via UpdateStreamsGroupTopologyDescription. Null when no push is requested." }

The broker mints a fresh TopologyDescriptionId on group creation and on every topology epoch bump and persists it on the streams group record. After each heartbeat from a non-STALE_TOPOLOGY member, the broker calls plugin.requiresTopologyPush(requestContext, groupId, topologyDescriptionId) with the current id; if the plugin returns true, the broker includes that id in the heartbeat response, otherwise the field is omitted.

New RPC: UpdateStreamsGroupTopologyDescription (API Key TBD)

A new RPC is introduced for setting the topology description for a streams group. Like StreamsGroupHeartbeat, the request is sent to the group coordinator for the group.

Request:

{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["broker"],
  "name": "UpdateStreamsGroupTopologyDescriptionRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The streams group identifier." },
    { "name": "TopologyDescriptionId", "type": "uuid", "versions": "0+",
      "about": "The topology description id received in the most recent heartbeat response, identifying which topology version this push corresponds to." },
    { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "0+",
      "about": "The topology description." }
  ],
  "commonStructs": [
    { "name": "TopologyDescription", "versions": "0+", "fields": [
      { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+",
        "about": "The subtopologies that make up this topology." },
      { "name": "GlobalStores", "type": "[]GlobalStore", "versions": "0+",
        "about": "Global state stores used by this topology." }
    ]},
    { "name": "Subtopology", "versions": "0+", "fields": [
      { "name": "SubtopologyId", "type": "string", "versions": "0+",
        "about": "The subtopology identifier, unique within the topology." },
      { "name": "Nodes", "type": "[]TopologyNode", "versions": "0+",
        "about": "The processing nodes in this subtopology." }
    ]},
    { "name": "TopologyNode", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." },
      { "name": "NodeType", "type": "int8", "versions": "0+",
        "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." },
      { "name": "SourceTopics", "type": "[]string", "versions": "0+",
        "about": "The source topics this node reads from. Populated for source nodes." },
      { "name": "SinkTopic", "type": "string", "versions": "0+",
        "nullableVersions": "0+", "default": "null",
        "about": "The topic this node writes to. Populated for sink nodes." },
      { "name": "Stores", "type": "[]string", "versions": "0+",
        "about": "The state store names accessed by this node. Populated for processor nodes." },
      { "name": "Successors", "type": "[]string", "versions": "0+",
        "about": "The names of successor nodes in the processing graph. Predecessor relationships are reconstructed from this field on the read side." }
    ]},
    { "name": "GlobalStore", "versions": "0+", "fields": [
      { "name": "Source", "type": "TopologyNode", "versions": "0+",
        "about": "The source node providing data to the global store." },
      { "name": "Processor", "type": "TopologyNode", "versions": "0+",
        "about": "The processor node that populates the global store." }
    ]}
  ]
}

Response:


{
  "apiKey": "TBD",
  "type": "response",
  "name": "UpdateStreamsGroupTopologyDescriptionResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." }
  ]
}

Authorization: Requires READ ACL on the GROUP resource for the given group ID. Like offset commits, we don't consider this a modification of the GROUP. This allows deploying apps with READ ACLs on the group.


Error codes:

  • GROUP_AUTHORIZATION_FAILED — the client is not authorized
  • INVALID_REQUEST — the request is malformed, or the plugin semantically rejected the payload by completing its future with InvalidRequestException
  • UNSUPPORTED_VERSION — the coordinator cannot serve this RPC because no topology description plugin is configured
  • TOPOLOGY_DESCRIPTION_TOO_LARGE — the plugin rejected the description because it exceeds the size the plugin is willing to store
  • TOPOLOGY_DESCRIPTION_UPDATE_FAILED — the plugin failed to process the request for some other reason; the broker logs the underlying error
  • GROUP_ID_NOT_FOUND — the specified group does not exist
  • NOT_COORDINATOR — the broker is not the coordinator for this group
  • COORDINATOR_NOT_AVAILABLE — the coordinator is not available
  • COORDINATOR_LOAD_IN_PROGRESS — the coordinator is loading

Client-side retry behavior for each code is described in Error Handling and Retries.

StreamsGroupDescribeRequest Change

StreamsGroupDescribeRequest is bumped to the next version (N) and gains a new field at that version:

 {"name": "IncludeTopologyDescription", "type": "bool", "versions": "N+", "default": "false",
  "about": "Whether to include the full topology description from the topology description plugin in the response." } 

A client that negotiates an older version is handled unchanged by any broker. The flag may only be set when version N or later is negotiated.

StreamsGroupDescribeResponse Change

StreamsGroupDescribeResponse is bumped to the next version (N). Two new fields are added to each DescribedGroup at that version:

 { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "N+",
  "nullableVersions": "N+", "default": "null",
  "about": "The topology description for this group. Null if not available — see TopologyDescriptionStatus for the reason." },
{ "name": "TopologyDescriptionStatus", "type": "int8", "versions": "N+", "default": "0",
  "about": "When TopologyDescription is null, the reason: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription), 1=NOT_STORED (no topology description has been recorded for this group), 2=ERROR (the broker failed to fetch the topology description; check broker logs). Ignored when TopologyDescription is non-null." }

The TopologyDescription common struct mirrors the struct used by UpdateStreamsGroupTopologyDescriptionRequest (same field names and shape). Because Kafka RPC schemas do not share common structs across message files, the struct is duplicated in StreamsGroupDescribeResponse.json.


Setting these fields does not change the ErrorCode on the DescribedGroup: a group with a successful describe but a missing or failed topology fetch still returns ErrorCode=NONE. The TopologyDescriptionStatus field tells the caller why TopologyDescription is null, so that "waiting for first push" (NOT_STORED) can be distinguished from "broker-side fetch failed" (ERROR) without an error-level change to the describe result.

Plugin Interface

A new interface is introduced in org.apache.kafka.coordinator.group.api.streams :

package org.apache.kafka.coordinator.group.api.streams;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import java.util.concurrent.CompletableFuture;

/**
 * A broker-side plugin that manages topology descriptions for streams groups.
 *
 * <p>Implementations receive topology descriptions pushed by Kafka Streams clients
 * and can store, forward, or expose them however they see fit.
 *
 * <p>The broker calls {@link #requiresTopologyPush} on every heartbeat to determine
 * whether the client should send its topology description. Implementations that need
 * to consult an external service should kick off that work asynchronously on the
 * first call and return {@code false} until the result is available.
 *
 * <p>Every method takes an {@link AuthorizableRequestContext} as its first argument.
 *
 * <p>Implementations must be thread-safe. {@link #setTopology} may be called
 * concurrently by multiple group members observing the same
 * {@code TopologyDescriptionId} in their heartbeat responses; concurrent
 * calls with the same {@code (groupId, topologyDescriptionId)} pair carry
 * identical data and must be treated as idempotent.
 */
public interface StreamsGroupTopologyDescriptionPlugin extends Configurable, AutoCloseable {

    /**
     * Returns whether the broker should request a topology push from the client.
     *
     * <p>Called on every successful heartbeat. Not called for members in
     * {@code STALE_TOPOLOGY} status. If this method returns {@code true}, the broker
     * includes the {@code topologyDescriptionId} in the heartbeat response.
     *
     * <p>This method should not throw. Failure modes should be handled internally
     * and converted to a return value of {@code false}. The broker defensively
     * catches any exception and treats it as {@code false}, logging at WARN; plugins
     * should not rely on this backstop.
     *
     * <p>This method is on the heartbeat path and must return quickly. Implementations
     * that need to consult an external service should return {@code false} until the
     * result is available.
     *
     * <p>See the KIP's <em>Plugin Implementation Guidelines</em> section for how
     * implementations should handle in-flight tracking, retries, and topology
     * expiration.
     *
     * @param requestContext the context of the heartbeat request
     * @param groupId the streams group ID
     * @param topologyDescriptionId the broker-minted id identifying the current topology
     *        version for this group; opaque to the plugin
     * @return true if the broker should request a topology push from the client
     */
    boolean requiresTopologyPush(AuthorizableRequestContext requestContext,
                                 String groupId, Uuid topologyDescriptionId);

    /**
     * Called when a client sends a topology description for a streams group.
     * This method may be called concurrently by multiple members of the same group;
     * all calls for the same (groupId, topologyDescriptionId) carry identical data.
     *
     * <p>The returned future completes when the topology has been persisted or
     * forwarded. Complete it exceptionally with
     * {@link org.apache.kafka.common.errors.InvalidRequestException} to signal
     * that the payload is semantically invalid, or with
     * {@link org.apache.kafka.common.errors.TopologyDescriptionTooLargeException}
     * to signal that the description is larger than the plugin is willing to
     * store. Any other exception is mapped to
     * {@code TOPOLOGY_DESCRIPTION_UPDATE_FAILED}.
     *
     * @param requestContext the context of the UpdateStreamsGroupTopologyDescription request
     * @param groupId the streams group ID
     * @param topologyDescriptionId the id this push is tagged with, as carried in the
     *        heartbeat response that asked for it; opaque to the plugin
     * @param description the topology description
     * @return a future that completes when the operation is done
     */
    CompletableFuture<Void> setTopology(AuthorizableRequestContext requestContext,
                                        String groupId, Uuid topologyDescriptionId,
                                        StreamsGroupTopologyDescription description);

    /**
     * Called when a group is explicitly deleted via DeleteGroups. Removes any topology
     * description stored for this group.
     *
     * <p>The returned future completes when the deletion has been processed.
     * If it completes exceptionally, the broker logs the error; the outcome does not
     * affect the DeleteGroups response returned to the caller.
     *
     * @param requestContext the context of the DeleteGroups request
     * @param groupId the streams group ID
     * @return a future that completes when the operation is done
     */
    CompletableFuture<Void> deleteTopology(AuthorizableRequestContext requestContext, String groupId);

    /**
     * Called to retrieve the stored topology description for a group. This is invoked
     * by the broker when a client calls StreamsGroupDescribe with
     * {@code IncludeTopologyDescription=true}.
     *
     * <p>Returns a future that resolves to the stored topology description for the
     * given {@code (groupId, topologyDescriptionId)} pair, or to {@code null} if no
     * topology is stored (e.g. no push has succeeded yet, or the stored description
     * is tagged with a different id). If the future completes exceptionally, the
     * plugin signals a read error for this group.
     *
     * @param requestContext the context of the StreamsGroupDescribe request
     * @param groupId the streams group ID
     * @param topologyDescriptionId the id of the topology version the caller is
     *        asking about; opaque to the plugin
     * @return a future resolving to the stored topology description, or null if none
     */
    CompletableFuture<StreamsGroupTopologyDescription>
        getTopology(AuthorizableRequestContext requestContext,
                    String groupId, Uuid topologyDescriptionId);
}

The plugin uses a StreamsGroupTopologyDescription POJO that mirrors org.apache.kafka.streams.TopologyDescription but lives in the org.apache.kafka.group.api.streams module; plugin implementations only need to depend on group-coordinator-api. The only difference is that there is no predecessor relation.

package org.apache.kafka.group.api.streams;

public class StreamsGroupTopologyDescription {
    public Collection<Subtopology> subtopologies();
    public Collection<GlobalStore> globalStores();

    public static class Subtopology {
        public String id();
        public Collection<Node> nodes();
    }

    public interface Node {
        String name();
        Set<String> successors();
    }

    public static class Source implements Node {
        public Set<String> topics();
    }

    public static class Processor implements Node {
        public Set<String> stores();
    }

    public static class Sink implements Node {
        public String topic();
    }

    public static class GlobalStore {
        public Source source();
        public Processor processor();
    }
}

Admin Client Interface


DescribeStreamsGroupsOptions gains an includeTopologyDescription(boolean) setter. When set to true, the admin client sets IncludeTopologyDescription on the StreamsGroupDescribeRequest (at the new version) and the coordinator consults the plugin.

StreamsGroupDescription gains two accessors:

  • Optional<StreamsGroupTopologyDescription> topologyDescription() — empty unless the field was requested and the plugin returned a description.
  • StreamsGroupTopologyDescriptionStatus topologyDescriptionStatus() — a new enum { AVAILABLE, NOT_REQUESTED, NOT_STORED, ERROR }. AVAILABLE is reported when TopologyDescription is non-null; the remaining values mirror the wire-level TopologyDescriptionStatus int8.

The Admin client exposes a POJO hierarchy in org.apache.kafka.clients.admin that mirrors org.apache.kafka.streams.TopologyDescription but lives in the clients module. The admin client converts the wire-format struct into this hierarchy.

package org.apache.kafka.clients.admin;

public class StreamsGroupTopologyDescription {
    public Collection<Subtopology> subtopologies();
    public Collection<GlobalStore> globalStores();

    public static class Subtopology {
        public String id();
        public Collection<Node> nodes();
    }

    public interface Node {
        String name();
        Set<String> predecessors();
        Set<String> successors();
    }

    public static class Source implements Node {
        public Set<String> topics();
    }

    public static class Processor implements Node {
        public Set<String> stores();
    }

    public static class Sink implements Node {
        public String topic();
    }

    public static class GlobalStore {
        public Source source();
        public Processor processor();
    }
}

Command-Line Tool

kafka-streams-groups.sh gains a new --topology sub-action under --describe, parallel to --members, --offsets, and --state:

kafka-streams-groups.sh --bootstrap-server <broker> --describe --topology --group <group-id>

The output mirrors the format produced by Topology#describe() in the Kafka Streams API:

Topologies:
   Sub-topology: 0
     Source:  KSTREAM-SOURCE-0000000000 (topics: [input-topic])
       --> my-processor
     Processor: my-processor (stores: [my-store])
       <-- KSTREAM-SOURCE-0000000000
       --> KSTREAM-SINK-0000000002
     Sink: KSTREAM-SINK-0000000002 (topic: output-topic)
       <-- my-processor

Global stores, if present, are printed in a trailing Global Stores: block. The command calls the admin client with includeTopologyDescription(true).

When the coordinator returns no topology description, the command picks its output from the TopologyDescriptionStatus on the response:

  • NOT_STORED"No topology description has been recorded for group '<group-id>'."
  • ERROR"The broker failed to retrieve the topology description for group '<group-id>' (check broker logs)."
  • AVAILABLE → normal pretty-printed topology.

Exit code. The command exits 0 for AVAILABLE. It exits 1 for NOT_STORED, for ERROR, and when the DescribedGroup.ErrorCode is non-zero (authorization failure, group not found, coordinator unavailable, etc.). NOT_REQUESTED does not occur for this command because it always sets IncludeTopologyDescription=true.

Proposed Changes

Broker Side

  1. The plugin is instantiated at broker startup if group.streams.topology.description.class is configured. A broker without a plugin does not advertise UpdateStreamsGroupTopologyDescription in its ApiVersions response and returns UNSUPPORTED_VERSION for any such request. Because a plugin-less broker never includes a TopologyDescriptionId in heartbeat responses, in normal operation the RPC is only sent against a broker that has a plugin configured.
  2. The broker mints a fresh TopologyDescriptionId (a UUID) on group creation and on every topology epoch bump, and persists it on the streams group record as a tagged field. After a successful StreamsGroupHeartbeat, the broker calls requiresTopologyPush on the plugin with the current id. STALE_TOPOLOGY members are skipped. If the plugin returns true, the broker includes the id in the heartbeat response and the client pushes its topology under that id; otherwise the field is omitted. Per the plugin contract requiresTopologyPush should not throw; the broker defensively catches any exception, logs at WARN, and treats the call as false. Beyond the STALE_TOPOLOGY skip the broker does no other gating — deduplication, timeouts, and back-off are the plugin's responsibility (see Plugin Implementation Guidelines). The plugin sees only the opaque id and the group identity; it does not see topology epochs or wall-clock timestamps.
  3. On UpdateStreamsGroupTopologyDescription, the broker checks the READ ACL on the group and that a plugin is configured. The broker does not enforce a size limit on the topology description — it is the plugin's responsibility to decide what size it is willing to store. The broker calls setTopology on the plugin, passing the TopologyDescriptionId from the request unchanged. On success, the response carries NONE; InvalidRequestException from the plugin maps to INVALID_REQUEST, TopologyDescriptionTooLargeException maps to TOPOLOGY_DESCRIPTION_TOO_LARGE, any other exception maps to TOPOLOGY_DESCRIPTION_UPDATE_FAILED, and all three are logged at WARN.

  4. On DeleteGroups, the broker calls deleteTopology on the plugin only after the group delete has returned no error for that group. deleteTopology failures are logged but do not affect the deletion response. For groups that expire naturally (all members leave), deleteTopology is not called — the plugin expires the topology via a wall-clock TTL (see Plugin Implementation Guidelines).
  5. On StreamsGroupDescribe with IncludeTopologyDescription=true, the broker reads the current TopologyDescriptionId from group metadata and calls getTopology on the plugin for each group, after assembling the rest of the response. Calls across groups run in parallel. Authorization is unchanged — the existing DESCRIBE ACL on the GROUP resource covers the topology description. DescribedGroup.ErrorCode is never modified by topology-related outcomes; the TopologyDescriptionStatus field carries the reason when TopologyDescription is null (NOT_REQUESTED, NOT_STORED, or ERROR; the last is also logged at WARN). When no plugin is configured, the broker returns NOT_STORED (the broker does not distinguish "no plugin" from "plugin has nothing stored" on the wire).

Client Side

  1. The Streams client records the TopologyDescriptionId from each heartbeat response. A non-null id means "push your topology under this id"; a null/missing id means no push is needed.
  2. At startup, if topology.description.push.enabled=true, the Streams client converts the topology returned by Topology#describe() to the wire format and stores it internally. When topology.description.push.enabled=false, no description is stored and the feature is disabled on this client.
  3. On each consumer background-thread poll, the client sends UpdateStreamsGroupTopologyDescription to the coordinator (carrying the most recently received TopologyDescriptionId) when a coordinator is known, the id is non-null, a stored topology description is available, and no prior request is in flight. Completion handling is described in Error Handling and Retries.

The push runs on the consumer background thread and never blocks user-facing Kafka Streams APIs. The push is best-effort: a failure to send the topology description does not prevent the Streams application from running.

Topology Translation

TopologyDescription from Kafka Streams maps one-to-one to the wire types in the RPC schema. Predecessor edges are not sent on the wire; the read side reconstructs them by inverting each node's successor list.

Error Handling and Retries

NOT_COORDINATOR and COORDINATOR_NOT_AVAILABLE trigger coordinator rediscovery; the pending id is preserved. COORDINATOR_LOAD_IN_PROGRESS and network exceptions leave the pending id in place for retry on the next poll.

All other errors (TOPOLOGY_DESCRIPTION_TOO_LARGE, TOPOLOGY_DESCRIPTION_UPDATE_FAILED, INVALID_REQUEST, UNSUPPORTED_VERSION, GROUP_ID_NOT_FOUND, GROUP_AUTHORIZATION_FAILED) clear the pending id and log at WARN. The client does not retry on its own; re-attempts, if any, happen when the broker sends a non-null id on a subsequent heartbeat. See Plugin Implementation Guidelines for when the plugin should re-solicit.

A non-zero ThrottleTimeMs on the response delays the next push attempt by that amount, as with other request managers.

Plugin Implementation Guidelines

Broker-side gating around the plugin is minimal. The plugin can expect requiresTopologyPush to be called on a regular cadence following each group's heartbeat interval.

A correct plugin implementation should:

  • Implement requiresTopologyPush as a non-blocking, efficient call, since it is invoked frequently.
  • Expire and clean up stored topology descriptions for inactive groups and superseded ids internally. Incoming requiresTopologyPush calls can be used as an implicit keep-alive.
  • Decide and enforce a maximum stored description size. Reject pushes that exceed it by completing the setTopology future with TopologyDescriptionTooLargeException, and stop returning requiresTopologyPush=true for an id once that id has been confirmed too large.
  • Avoid concurrent or repetitive pushes: track in-flight pushes per id, back off with an exponential schedule on transient failures, and disable pushes permanently for the id on permanent failures (TOPOLOGY_DESCRIPTION_TOO_LARGE, plugin-semantic INVALID_REQUEST).
  • Not request repeated pushes from stable groups that have already pushed their topology under the current id.

Security Considerations

The topology description contains user-defined processor names, state-store names, and topic names. This KIP does not treat these as inherently sensitive and does not introduce any client-side redaction. Operators who consider node, store, or topic names sensitive should scope DESCRIBE ACLs on the GROUP resource accordingly: the same ACL that guards StreamsGroupDescribe guards the TopologyDescription field on the response. The UpdateStreamsGroupTopologyDescription path is guarded by READ on the GROUP, identical to the existing heartbeat ACL.

Metrics

This KIP does not introduce any new broker-side or client-side metrics.

Compatibility, Deprecation, and Migration Plan

This KIP bumps StreamsGroupHeartbeatResponse, StreamsGroupDescribeRequest, and StreamsGroupDescribeResponse to the next available version of each RPC and adds the new fields (TopologyDescriptionId, IncludeTopologyDescription, TopologyDescription, TopologyDescriptionStatus) at that version. Pre-upgrade clients negotiate an older version and never see the new fields, so all three changes are wire-compatible.

The new RPC uses a new API key and is only sent by clients that understand the feature.

Without a configured plugin, no flags are set and no topology descriptions are sent. There is no behavioral change for existing deployments.

Rolling Upgrades

During a rolling upgrade of brokers, some brokers may have the plugin configured and some may not. The TopologyDescriptionId is only set by plugin-equipped coordinators; a client whose current coordinator lacks the plugin never sees the id. If the coordinator migrates mid-push to a plugin-less broker, the client receives UNSUPPORTED_VERSION, clears the pending id, and does not retry. A new id is sent again only when a future heartbeat response from a plugin-equipped coordinator includes one.

During a rolling upgrade of the Streams application (topology epoch change), the broker mints a new TopologyDescriptionId and skips the plugin entirely for STALE_TOPOLOGY members. Only members running the new topology reach the plugin and receive the new id. If every active member is stale during the rollout, the current topology remains uncaptured until at least one member heartbeats at the new epoch.

Future Work

Hash-based mismatch detection. A future enhancement could introduce a topology hash to detect clients running different topology code at the same topology epoch (e.g., a partial deployment), which the broker-minted id alone cannot catch.

Node grouping. A nodeGroup field on TopologyNode for UI grouping, requiring an extension to the public TopologyDescription.Node interface in Kafka Streams — best addressed in a follow-up KIP.

Test Plan

Integration Tests

  • A Streams client pushes its topology description after the broker requests it, and the configured plugin receives the description.
  • A broker without a topology description plugin never asks the client to push and rejects pushes outright.
  • Authorization is enforced on the new RPC.
  • Topology descriptions exceeding the configured size limit are rejected; descriptions at the limit are accepted.
  • Permanent plugin-side validation failures stop the client from retrying; transient failures and throttling delay subsequent attempts.
  • Requesting a topology description via describe returns it for groups that have pushed, and surfaces the appropriate non-available status (no description stored, or read error) otherwise — without turning the describe itself into an error.
  • The CLI prints the topology when it is available and reports a clear, distinct message for each non-available outcome, with exit code 0 only when the topology is actually returned.

System Tests

  • A running Kafka Streams application pushes its topology and an operator can retrieve and pretty-print it via the CLI end-to-end.
  • Disabling the feature on the client stops it from sending topology descriptions altogether.
  • During a rolling upgrade the feature degrades gracefully on either side: pre-upgrade clients are not asked to push, and pre-upgrade brokers cleanly reject pushes from upgraded clients.

Rejected Alternatives

Embedding the topology description in the heartbeat

Adding the topology description to StreamsGroupHeartbeatRequest would bloat the heartbeat with potentially large payloads and mix observability concerns with the rebalance protocol. A separate RPC keeps the feature self-contained and allows the broker to apply independent size limits and authorization. See the discussion in the KIP for a full comparison of both approaches.

Storing topologies in an internal topic

Using a compacted internal topic (like __consumer_offsets) would require the broker to materialize all topology descriptions in memory from the topic's compacted log. For clusters with many streams groups, this could consume significant heap space on the coordinator broker. A plugin-based approach avoids this by delegating storage to an external system that can handle retention and retrieval independently.

Adding a separate read RPC

A dedicated read RPC was considered but rejected in favor of extending StreamsGroupDescribe at a new version with an IncludeTopologyDescription flag. This keeps the number of new API keys minimal, groups the topology description naturally with other group metadata, and avoids bloating the describe response when the caller does not need it. The plugin remains free to expose topologies directly via its own channels in addition to the describe RPC.

Adding compression support

KIP-714 supports compression (ZStd, LZ4, GZip, Snappy) for telemetry payloads because metrics are pushed repeatedly at high frequency. Topology descriptions are pushed infrequently (only on topology epoch changes) and the wire payload is expected to fit comfortably for typical applications. The Kafka protocol's flexible versions already provide efficient serialization. Adding compression would add complexity without meaningful benefit for this use case.

Appendix: Example RPC payload

For illustration, consider a Kafka Streams application that reads from orders, filters out null values, and writes the remaining records to valid-orders:

StreamsBuilder builder = new StreamsBuilder();
builder.stream("orders")
       .filter((key, value) -> value != null)
       .to("valid-orders");

Topology#describe() produces the following text representation:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [orders])
      --> KSTREAM-FILTER-0000000001
    Processor: KSTREAM-FILTER-0000000001 (stores: [])
      --> KSTREAM-SINK-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Sink: KSTREAM-SINK-0000000002 (topic: valid-orders)
      <-- KSTREAM-FILTER-0000000001

The corresponding UpdateStreamsGroupTopologyDescription request body, with the topology converted to the wire format, looks like this:

{
  "GroupId": "orders-app",
  "TopologyDescriptionId": "8d7e3c2a-4f6b-4d9a-9c1b-7e5f3a8d2c4e",
  "TopologyDescription": {
    "Subtopologies": [
      {
        "SubtopologyId": "0",
        "Nodes": [
          {
            "Name": "KSTREAM-SOURCE-0000000000",
            "NodeType": 1,
            "SourceTopics": ["orders"],
            "SinkTopic": null,
            "Stores": [],
            "Successors": ["KSTREAM-FILTER-0000000001"]
          },
          {
            "Name": "KSTREAM-FILTER-0000000001",
            "NodeType": 2,
            "SourceTopics": [],
            "SinkTopic": null,
            "Stores": [],
            "Successors": ["KSTREAM-SINK-0000000002"]
          },
          {
            "Name": "KSTREAM-SINK-0000000002",
            "NodeType": 3,
            "SourceTopics": [],
            "SinkTopic": "valid-orders",
            "Stores": [],
            "Successors": []
          }
        ]
      }
    ],
    "GlobalStores": []
  }
}

Predecessor edges (<-- in the text representation) are not sent on the wire; the read side reconstructs them by inverting each node's Successors.

  • No labels