Status

Current state: Accepted

Discussion thread: here 

JIRA: KAFKA-20618 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams applications are opaque to cluster operators. When an application hits performance degradation or a protracted rebalance, operators cannot inspect the processing graph — the individual source, processor, and sink nodes, their predecessor/successor relationships, and the state stores they touch — without access to the application source code or logs. KIP-1071 (Streams Rebalance Protocol) already has clients send a topology to the broker, but only the subset the coordinator needs for assignment: subtopologies, source topics, changelog topics, and copartition groups. The full processing graph is not part of that assignment topology and is not currently available on the broker.

This KIP proposes a mechanism to send the full topology description from the client to the broker, where a pluggable backend can store and expose it for operational tooling and topology visualization in management UIs. The design follows the pattern established by KIP-714 (Client Metrics and Observability): the broker acts as a conduit, receiving data from clients and delegating storage and presentation to a plugin implementation. This keeps the broker itself simple and the feature extensible.

Public Interfaces

New Broker Configuration

Configuration nameDescriptionValues
group.streams.topology.description.plugin.classThe fully qualified class name of a StreamsGroupTopologyDescriptionPlugin implementation. When not set, the feature is disabled.

Type: class, Default: null, Importance: MEDIUM


New Client Configuration

Configuration nameDescriptionValues
topology.description.push.enabledControls whether the Kafka Streams client sends topology descriptions to the broker when requested. 

When set to false, the client ignores TopologyDescriptionRequired=true in heartbeat responses.

Type: boolean, Default: true

StreamsGroupHeartbeatResponse Change

StreamsGroupHeartbeatResponse is bumped to the next version (N) and gains a new field at that version:

{ "name": "TopologyDescriptionRequired", "type": "bool", "versions": "N+", "default": "false",
  "about": "True if the client should send the topology description via StreamsGroupTopologyDescriptionUpdate." }


The broker sets this field to true when a topology description plugin is configured and the broker expects the client to send it's current version of the topology description.

New RPC: StreamsGroupTopologyDescriptionUpdate (API Key TBD)

A new RPC is introduced for setting the topology description for a streams group. Like StreamsGroupHeartbeat, the request is sent to the group coordinator for the group.

Request:

{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["broker"],
  "name": "StreamsGroupTopologyDescriptionUpdateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
	{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
	  "about": "The streams group identifier." },
	{ "name": "MemberId", "type": "string", "versions": "0+",
	  "about": "The ID of the streams group member sending the push." },
	{ "name": "TopologyEpoch", "type": "int32", "versions": "0+",
 	  "about": "The epoch of the topology being described." },
    { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "0+",
      "about": "The topology description." }
  ],
  "commonStructs": [
    { "name": "TopologyDescription", "versions": "0+", "fields": [
      { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", "versions": "0+",
        "about": "The subtopologies that make up this topology." },
      { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", "versions": "0+",
        "about": "Global state stores used by this topology." }
    ]},
    { "name": "TopologyDescriptionSubtopology", "versions": "0+", "fields": [
      { "name": "SubtopologyId", "type": "string", "versions": "0+",
        "about": "The subtopology identifier, unique within the topology." },
      { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "0+",
        "about": "The processing nodes in this subtopology." }
    ]},
    { "name": "TopologyDescriptionNode", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." },
      { "name": "NodeType", "type": "int8", "versions": "0+",
        "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." },
      { "name": "SourceTopics", "type": "[]string", "versions": "0+", "entityType": "topicName",
        "about": "The source topics this node reads from. Defined only for source nodes, may be empty if source topics are dynamically determined." },
      { "name": "SinkTopic", "type": "string", "versions": "0+", "entityType": "topicName",
        "nullableVersions": "0+", "default": "null",
        "about": "The topic this node writes to. Defined only for sink nodes, may be null if sink topic is dynamically determined." },
      { "name": "Stores", "type": "[]string", "versions": "0+",
        "about": "The state store names accessed by this node. Defined only for processor nodes." },
      { "name": "Successors", "type": "[]string", "versions": "0+",
        "about": "The names of successor nodes in the processing graph. Predecessor relationships are reconstructed from this field." }
    ]},
    { "name": "TopologyDescriptionGlobalStore", "versions": "0+", "fields": [
      { "name": "Source", "type": "TopologyDescriptionNode", "versions": "0+",
        "about": "The source node providing data to the global store." },
      { "name": "Processor", "type": "TopologyDescriptionNode", "versions": "0+",
        "about": "The processor node that populates the global store." }
    ]}
  ]
}

Response:


{
  "apiKey": "TBD",
  "type": "response",
  "name": "StreamsGroupTopologyDescriptionUpdateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." }
  ]
}

Authorization: Requires READ ACL on the GROUP resource for the given group ID. Like offset commits, we don't consider this a modification of the GROUP. This allows deploying apps with READ ACLs on the group.


Error codes:

  • GROUP_AUTHORIZATION_FAILED — the client is not authorized
  • INVALID_REQUEST — the request is malformed (including an empty MemberId)
  • UNSUPPORTED_VERSION — the coordinator cannot serve this RPC because no topology description plugin is configured
  • STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED — the plugin failed to process the request. The accompanying ErrorMessage carries the plugin's exception message. The broker's response shape is identical for both permanent and transient plugin failures; the distinction is broker-internal state that determines whether subsequent heartbeats at the same topology epoch will re-solicit (see Broker Side).
  • UNKNOWN_MEMBER_ID — the member named in MemberId is no longer in the group; the client should treat itself as fenced and rejoin

  • GROUP_ID_NOT_FOUND — the specified group does not exist
  • NOT_COORDINATOR — the broker is not the coordinator for this group
  • COORDINATOR_NOT_AVAILABLE — the coordinator is not available
  • COORDINATOR_LOAD_IN_PROGRESS — the coordinator is loading


Client-side retry behavior for each code is described in Client Side below.

StreamsGroupDescribeRequest Change

StreamsGroupDescribeRequest is bumped to the next version (N) and gains a new field at that version:

 {"name": "IncludeTopologyDescription", "type": "bool", "versions": "N+", "default": "false",
  "about": "Whether to include the full topology description from the topology description plugin in the response." } 

Clients on older versions never see the flag and behave unchanged.

StreamsGroupDescribeResponse Change

StreamsGroupDescribeResponse is bumped to the next version (N). Two new fields are added to each DescribedGroup at that version:

 { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "N+",
  "nullableVersions": "N+", "default": "null",
  "about": "The topology description for this group. Null if not available — see TopologyDescriptionStatus for the reason." },
{ "name": "TopologyDescriptionStatus", "type": "int8", "versions": "N+", "default": "0",
  "about": "The status of the topology description for this group: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription), 1=NOT_STORED (no topology description has been recorded for this group), 2=ERROR (the broker failed to fetch the topology description; check broker logs), 3=AVAILABLE (a topology description is present in the TopologyDescription field). The broker MUST set this field to AVAILABLE whenever it attaches a TopologyDescription." }

The TopologyDescription common struct mirrors the struct used by StreamsGroupTopologyDescriptionUpdateRequest (same field names and shape). The nested struct names are prefixed TopologyDescription to avoid collision with the existing Subtopology struct already defined for the describe response. Because Kafka RPC schemas do not share common structs across message files, the struct is duplicated in StreamsGroupDescribeResponse.json. Setting these fields does not change the ErrorCode on the DescribedGroup: a group with a successful describe but a missing or failed topology fetch still returns ErrorCode=NONE. The TopologyDescriptionStatus field tells the caller why TopologyDescription is null, so that "waiting for first push" (NOT_STORED) can be distinguished from "broker-side fetch failed" (ERROR) without an error-level change to the describe result.

DeleteGroupsResponse Change


DeleteGroupsRequest and DeleteGroupsResponse are both bumped to the next version (3). The request shape is unchanged at the new version; the response adds an ErrorMessage field to each per-group DeletableGroupResult:

{ "name": "ErrorMessage", "type": "string", "versions": "3+", "nullableVersions": "3+", "ignorable": true, "default": "null",  
  "about": "The error message, or null if there was no error." }  

A new generic error code is added to the per-group ErrorCode slot:

  • GROUP_DELETION_FAILED — the delete operation could not complete; the accompanying ErrorMessage describes the underlying cause. The group is not tombstoned, and the caller may retry once the underlying condition is resolved. For streams groups configured with a topology description plugin this is returned when plugin.deleteTopology fails; other group types may adopt the same code in the future.


This is the only deletion-blocking failure mode introduced by this KIP. Consumer and share groups are unaffected. See Broker Side for the full ordering rule and the recovery path.

Plugin Interface

A new interface is introduced in org.apache.kafka.coordinator.group.api.streams :

/**
 * A broker-side plugin that stores, forwards, or exposes topology descriptions pushed
 * by Kafka Streams clients.
 *
 * <p>Implementations must be thread-safe. {@link #setTopology} may be called
 * concurrently by multiple members of the same group; calls with the same
 * {@code (groupId, topologyEpoch)} carry identical data and must be idempotent.
 * {@link #deleteTopology} must also be idempotent — it may be called more than once
 * for the same {@code groupId}, including when nothing is stored.
 */
@InterfaceStability.Evolving  
public interface StreamsGroupTopologyDescriptionPlugin extends Configurable, AutoCloseable {

	/**
	 * Store the topology description for a streams group.
	 *
     * <p>The returned future completes when the topology has been persisted or forwarded.
     * Failures must be signalled by completing the future exceptionally — implementations
     * must not throw synchronously; a synchronous throw is treated as a permanent failure
     * with a generic client-visible error message. The completion exception drives
     * broker-side behaviour:
     *
	 * <ul>
	 *   <li>{@link StreamsTopologyDescriptionPermanentFailureException} — the description will never be accepted
	 *       at this topology epoch (e.g. too large, semantically rejected). The broker
	 *       ratchets {@code FailedDescriptionTopologyEpoch} and stops re-soliciting until the
	 *       epoch advances.</li>
	 *   <li>{@link StreamsTopologyDescriptionTransientFailureException} or any other exception — treated as
	 *       transient. The broker arms or extends the per-group back-off (30 s → 1 h,
	 *       exponential) and re-solicits on a later heartbeat.</li>
	 * </ul>
	 *
	 * In both cases the caller receives error code
	 * {@code STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED} with the exception's message in
	 * {@code ErrorMessage}; the permanent-vs-transient split is broker-internal state.
	 */
	CompletableFuture<Void> setTopology(String groupId, int topologyEpoch,
                                    	StreamsGroupTopologyDescription description);

	/**
	 * Remove any topology description stored for this group. Called when the group is
	 * deleted or expires. A failure (future completed exceptionally) is reported to the
	 * caller of {@code DeleteGroups} as {@code GROUP_DELETION_FAILED} with the exception message
	 * in the per-group {@code ErrorMessage}, and the broker does not tombstone the group;
 	 * a retry of {@code DeleteGroups} re-invokes this method idempotently. The
	 * periodic-cleanup path treats a failure identically — the group's tombstone is
	 * deferred to a future cycle.
	 */
	CompletableFuture<Void> deleteTopology(String groupId);

    /**
     * Return the stored topology description for {@code (groupId, topologyEpoch)}, or
     * {@code null} if the plugin no longer has the data (e.g. backend wipe). If the future
     * completes exceptionally, the broker reports a read error for the group.
     */
    CompletableFuture<StreamsGroupTopologyDescription> getTopology(String groupId, int topologyEpoch);
}


The plugin uses a StreamsGroupTopologyDescription POJO that mirrors org.apache.kafka.streams.TopologyDescription but lives in the org.apache.kafka.coordinator.group.api.streams package; plugin implementations only need to depend on group-coordinator-api. The wire schema only carries Successors, so the broker-side POJO omits the predecessor relation — plugins that need both directions reconstruct predecessors in a single pass. The admin-side POJO does reconstruct them, since it is user-facing.

package org.apache.kafka.coordinator.group.api.streams;

public class StreamsGroupTopologyDescription {
    public Collection<Subtopology> subtopologies();
    public Collection<GlobalStore> globalStores();

    public static final class Subtopology {
        public String id();
        public Collection<Node> nodes();
    }

	/**
     * A processing node in the topology. Predecessor nodes can be inferred from successor relation.
     */
    public sealed interface Node {
        String name();
        Set<String> successors();
    }

    public static final class Source implements Node {
        public Set<String> topics();
    }

    public static final class Processor implements Node {
        public Set<String> stores();
    }

    public static final class Sink implements Node {
        public Optional<String> topic();
    }

    public static final class GlobalStore {
        public Source source();
        public Processor processor();
    }
}


Two new exception classes in the same package let the plugin signal the permanent-vs-transient distinction. Any other exception completed on the future is treated as transient:

package org.apache.kafka.coordinator.group.api.streams;

import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.annotation.InterfaceStability;

/** Signals that the topology description for the current epoch will never be accepted (e.g. too large, semantically rejected). */
@InterfaceStability.Evolving  
public class StreamsTopologyDescriptionPermanentFailureException extends ApiException {
    public StreamsTopologyDescriptionPermanentFailureException(String message) { super(message); }
    public StreamsTopologyDescriptionPermanentFailureException(String message, Throwable cause) { super(message, cause); }
}

/** Signals a transient backend failure; the broker re-solicits on a later heartbeat. Any other exception completed on the future is treated identically. */
@InterfaceStability.Evolving  
public class StreamsTopologyDescriptionTransientFailureException extends ApiException {
    public StreamsTopologyDescriptionTransientFailureException(String message) { super(message); }
    public StreamsTopologyDescriptionTransientFailureException(String message, Throwable cause) { super(message, cause); }
}

Broker-Side Persistence

Two new tagged fields are added to the persisted StreamsGroupMetadataValue record (a broker-internal record written to __consumer_offsets, not a wire-level change visible to clients):

 { "name": "StoredDescriptionTopologyEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 3,
  "default": -1, "type": "int32",
  "about": "The topology epoch whose description is currently stored in the topology description plugin, or -1 if none is stored." },
{ "name": "FailedDescriptionTopologyEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 4,
  "default": -1, "type": "int32",
  "about": "The topology epoch whose description push the plugin permanently rejected (signalled by StreamsTopologyDescriptionPermanentFailureException), or -1 if none. Heartbeat-path solicitation is suppressed while this equals the current topology epoch, to avoid hot-looping." }

The two fields together drive the broker-side gating decision on the heartbeat and describe paths. Both are tagged fields, so older brokers that decode the record before deserializing the tags simply see the defaults. Streams groups that existed before this KIP land carry both fields as -1; on the first post-upgrade heartbeat the broker solicits a push, and the regular setTopology path persists the new value.

Admin Client Interface


DescribeStreamsGroupsOptions gains an includeTopologyDescription(boolean) setter. When set to true, the admin client sets IncludeTopologyDescription on the StreamsGroupDescribeRequest (at the new version) and the coordinator consults the plugin.

StreamsGroupDescription gains two accessors:

  • Optional<StreamsGroupTopologyDescription> topologyDescription() — empty unless the field was requested and the plugin returned a description.
  • StreamsGroupTopologyDescriptionStatus topologyDescriptionStatus() — a new enum { NOT_REQUESTED, NOT_STORED, ERROR, AVAILABLE }. Each enum value's ordinal matches the wire-level TopologyDescriptionStatus int8 (0, 1, 2, 3). AVAILABLE is reported when TopologyDescription is non-null.

The Admin client exposes a POJO hierarchy in org.apache.kafka.clients.admin that mirrors org.apache.kafka.streams.TopologyDescription but lives in the clients module. The admin client converts the wire-format struct into this hierarchy.

package org.apache.kafka.clients.admin;

public class StreamsGroupTopologyDescription {
    public Collection<Subtopology> subtopologies();
    public Collection<GlobalStore> globalStores();

    public static final class Subtopology {
        public String id();
        public Collection<Node> nodes();
    }

    public interface Node {
        String name();

        /** Direct predecessor nodes. */
        Set<String> predecessors(); 

        /** Direct successor nodes. */ 
        Set<String> successors();
    }

    public static final class Source implements Node {
        public Set<String> topics();
    }

    public static final class Processor implements Node {
        public Set<String> stores();
    }

    public static final class Sink implements Node {
        public Optional<String> topic();
    }

    public static final class GlobalStore {
        public Source source();
        public Processor processor();
    }
}

Command-Line Tool

kafka-streams-groups.sh gains a new --topology sub-action under --describe, parallel to --members, --offsets, and --state:

kafka-streams-groups.sh --bootstrap-server <broker> --describe --topology --group <group-id>

The output mirrors the format produced by Topology#describe() in the Kafka Streams API:

Topologies:
   Sub-topology: 0
     Source:  KSTREAM-SOURCE-0000000000 (topics: [input-topic])
       --> my-processor
     Processor: my-processor (stores: [my-store])
       <-- KSTREAM-SOURCE-0000000000
       --> KSTREAM-SINK-0000000002
     Sink: KSTREAM-SINK-0000000002 (topic: output-topic)
       <-- my-processor

Global stores, if present, are printed in a trailing Global Stores: block. The command calls the admin client with includeTopologyDescription(true).

When the coordinator returns no topology description, the command picks its output from the TopologyDescriptionStatus on the response:

  • NOT_STORED"No topology description has been recorded for group '<group-id>'."
  • ERROR"The broker failed to retrieve the topology description for group '<group-id>' (check broker logs)."
  • AVAILABLE → normal pretty-printed topology.

Exit code. The command exits 0 for AVAILABLE. It exits 1 for NOT_STORED, for ERROR, and when the DescribedGroup.ErrorCode is non-zero (authorization failure, group not found, coordinator unavailable, etc.). NOT_REQUESTED does not occur for this command because it always sets IncludeTopologyDescription=true.

Proposed Changes

End-to-end Flow

The sequence below traces the four user-visible interactions — a successful push, a describe, an explicit DeleteGroups, and broker-driven cleanup of a naturally-expired group. Plugin calls return CompletableFutures; the broker awaits them asynchronously.

Broker Side

  1. The plugin is instantiated at broker startup if group.streams.topology.description.plugin.class is configured. A broker without a plugin returns UNSUPPORTED_VERSION for StreamsGroupTopologyDescriptionUpdate and never sets TopologyDescriptionRequired, so the RPC is only sent against a plugin-configured broker.

  2. After a successful StreamsGroupHeartbeat, the broker decides whether to set TopologyDescriptionRequired=true purely from the group's persisted state — no plugin RPC is involved. Members with STALE_TOPOLOGY status are skipped. For all other members the broker sets the flag iff StoredDescriptionTopologyEpoch != currentTopologyEpoch AND FailedDescriptionTopologyEpoch != currentTopologyEpoch AND no per-group back-off is in its window. The back-off is in-memory state (keyed by groupId, carrying topologyEpoch + nextAttemptMs) that arms or extends every time the flag is set and additionally on a transient setTopology failure; consecutive arms double the window from 30 s up to 1 h. It clears on a successful push, on a permanent failure (where FailedDescriptionTopologyEpoch ratchets), and implicitly on any topology-epoch advance. The same mechanism covers unresponsive plugins and clients that never push (for example, with topology.description.push.enabled=false).

  3. On StreamsGroupTopologyDescriptionUpdate, the broker checks the READ ACL on the group and that a plugin is configured, then validates the MemberId: an empty MemberId is rejected with INVALID_REQUEST, a non-existing streams group with GROUP_ID_NOT_FOUND, and a MemberId not matching any current member with UNKNOWN_MEMBER_ID. The broker enforces no size limit; the plugin decides what it is willing to store. The broker then calls setTopology on the plugin. On success it writes a metadata record setting StoredDescriptionTopologyEpoch = pushedEpoch and the response carries NONE. On StreamsTopologyDescriptionPermanentFailureException it writes FailedDescriptionTopologyEpoch = pushedEpoch so subsequent heartbeats at the same epoch do not re-solicit. On StreamsTopologyDescriptionTransientFailureException or any other exception it writes no metadata record, arms the per-group back-off, and the next heartbeat re-solicits once the window elapses. In both failure cases the response carries STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED with the plugin's exception message in ErrorMessage; the permanent-vs-transient split is broker-internal state. If the plugin call succeeds but the metadata-record write fails, the next heartbeat sees the drift, re-solicits, and the idempotent re-push closes the gap.

  4. On DeleteGroups, the broker calls deleteTopology on the plugin before writing the group tombstone, for each requested streams group with StoredDescriptionTopologyEpoch != -1. On plugin success the group is tombstoned and the per-group ErrorCode is NONE. On plugin failure the group is not tombstoned and the per-group ErrorCode is set to GROUP_DELETION_FAILED with the plugin's exception message in ErrorMessage (also logged at WARN); the operator retries the request once the plugin recovers, or unsets the plugin config to bypass it. Other groups in the same batch are unaffected — the failure is reported per group. This ordering matches the natural-expiration cleanup below, which also defers tombstoning until plugin.deleteTopology succeeds.

  5. On StreamsGroupDescribe with IncludeTopologyDescription=true, the broker calls getTopology on the plugin only when StoredDescriptionTopologyEpoch == currentTopologyEpoch for that group; otherwise it reports NOT_STORED without making a plugin call. Calls across groups run in parallel. Authorization is unchanged — the existing DESCRIBE ACL on the GROUP resource covers the topology description. DescribedGroup.ErrorCode is never modified by topology-related outcomes; the TopologyDescriptionStatus field carries the reason when TopologyDescription is null (NOT_REQUESTED, NOT_STORED, or ERROR; the last is also logged at WARN). With no plugin configured the broker returns NOT_STORED. A getTopology call returning null (plugin-side data loss) surfaces as NOT_STORED and is logged at WARN; subsequent describes keep returning NOT_STORED until the topology epoch advances or an operator clears plugin state.
  6. When a plugin is configured, the broker runs a periodic topology-description cleanup every offsets.retention.check.interval.ms. Each fire fans out a read-only query across the broker's hosted __consumer_offsets partitions to identify streams groups eligible for cleanup: isEmpty && allOffsetsExpired && StoredDescriptionTopologyEpoch != -1. This is the same eligibility predicate the shard's offset-expiration sweep uses to delete consumer/share groups, with the additional StoredDescriptionTopologyEpoch != -1 filter. For each eligible group, the broker calls plugin.deleteTopology(groupId) and, on success, writes a metadata record setting StoredDescriptionTopologyEpoch = -1. Plugin failures leave the field set; the same group is retried on the next cycle. Once StoredDescriptionTopologyEpoch = -1, the shard's offset-expiration sweep tombstones the (now flag-cleared) group on a subsequent cycle. When no plugin is configured, the periodic cleanup does not run and the shard's offset-expiration sweep expires streams groups normally, ignoring StoredDescriptionTopologyEpoch; operators that disable a previously-configured plugin are responsible for cleaning up plugin-side state out-of-band.

Client Side

  1. At startup, if topology.description.push.enabled=true, the Streams client converts the topology returned by Topology#describe() to the wire format and stores it internally. The mapping is one-to-one with the RPC schema; predecessor edges are not sent on the wire and the read side reconstructs them by inverting each node's successor list. When topology.description.push.enabled=false, no description is stored and the feature is disabled on this client.
  2. The Streams client records the TopologyDescriptionRequired flag from each heartbeat response.
  3. On each consumer background-thread poll, the client sends StreamsGroupTopologyDescriptionUpdate to the coordinator when a coordinator is known, the flag is set, a stored topology description is available, the client has a non-empty member ID assigned by the coordinator, and no prior request is in flight. The member ID populated on the request is the same one carried on StreamsGroupHeartbeat. The push runs on the consumer background thread and never blocks user-facing Kafka Streams APIs; the push is best-effort.
  4. Completion handling on the push response is keyed on the error code. NOT_COORDINATOR and COORDINATOR_NOT_AVAILABLE trigger coordinator rediscovery and leave the flag set. COORDINATOR_LOAD_IN_PROGRESS and network exceptions leave the flag set for retry on the next poll. UNKNOWN_MEMBER_ID means the member has been dropped from the group: the client clears the flag and relies on the existing membership-management path to trigger a clean rejoin on the next heartbeat. All other errors (STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, INVALID_REQUEST, UNSUPPORTED_VERSION, GROUP_ID_NOT_FOUND, GROUP_AUTHORIZATION_FAILED) clear the flag and log at WARN with the response's ErrorMessage; the client does not retry on its own, and a re-attempt happens only when the broker re-sets the flag via a subsequent heartbeat. A non-zero ThrottleTimeMs on the response delays the next push attempt by that amount, as with other request managers.

Plugin Implementation Guidelines

A correct plugin implementation should:

  • Treat setTopology (on (groupId, topologyEpoch)) and deleteTopology (on (groupId)) as idempotent; the broker may re-issue an identical call when an earlier call's bookkeeping write failed.
  • Be thread-safe under concurrent invocation: setTopology may be called by multiple members of the same group in the same heartbeat cycle, and the periodic-cleanup path may invoke deleteTopology while a member is mid-push.
  • Reject payloads the plugin will not accept by completing the setTopology future with StreamsTopologyDescriptionPermanentFailureException; the broker persists the rejection at the epoch level via FailedDescriptionTopologyEpoch and stops re-soliciting at the same epoch. The exception message reaches the client in ErrorMessage.
  • Signal transient storage-layer failures by completing the setTopology future with StreamsTopologyDescriptionTransientFailureException (or any other exception); the broker's per-group back-off (30 s → 1 h, exponential) throttles re-solicitation.
  • Return null from getTopology when the plugin has lost the description; the broker reports NOT_STORED on the describe response. Exceptions are reserved for transient backend failures and surface as ERROR.
  • Avoid blocking coordinator threads (plugin methods may be invoked on them) and complete futures within seconds, not minutes; the broker applies no wall-clock deadline to plugin calls, so the coordinator's responsiveness is bounded by what the plugin does. Bound plugin-side state explicitly — the plugin shares the broker heap.

Security Considerations

The topology description contains user-defined processor names, state-store names, and topic names. This KIP does not treat these as inherently sensitive and does not introduce any client-side redaction. Operators who consider node, store, or topic names sensitive should scope DESCRIBE ACLs on the GROUP resource accordingly: the same ACL that guards StreamsGroupDescribe guards the TopologyDescription field on the response. The StreamsGroupTopologyDescriptionUpdate path is guarded by READ on the GROUP, identical to the existing heartbeat ACL.

Metrics

The following broker-side metrics are added under the existing group-coordinator-metrics group (JMX type kafka.server:type=group-coordinator-metrics). Each sensor produces both a rate and a cumulative count.


MBeanTypeDescription
kafka.server:type=group-coordinator-metrics,name=streams-group-topology-description-set-success-{rate,count}MeterSuccessful plugin.setTopology calls.
kafka.server:type=group-coordinator-metrics,name=streams-group-topology-description-set-error-{rate,count}Meter

Failed plugin.setTopology calls. An error increments this sensor regardless of whether it was StreamsTopologyDescriptionPermanentFailureException, StreamsTopologyDescriptionTransientFailureException, or any other exception.

kafka.server:type=group-coordinator-metrics,name=streams-group-topology-description-delete-success-{rate,count}MeterSuccessful plugin.deleteTopology calls (covers both the explicit DeleteGroups and periodic-cleanup paths).
kafka.server:type=group-coordinator-metrics,name=streams-group-topology-description-delete-error-{rate,count}MeterFailed plugin.deleteTopology calls.
kafka.server:type=group-coordinator-metrics,name=streams-group-topology-description-get-success-{rate,count}MeterSuccessful plugin.getTopology calls.
kafka.server:type=group-coordinator-metrics,name=streams-group-topology-description-get-error-{rate,count}MeterFailed plugin.getTopology calls.
kafka.server:type=group-coordinator-metrics,name=streams-group-topology-description-cleanup-cycle-{rate,count}MeterPeriodic topology-description cleanup cycles that actually ran.
kafka.server:type=group-coordinator-metrics,name=streams-group-topology-description-cleanup-eligible-{rate,count}MeterStreams group IDs identified as eligible for topology-description cleanup, summed across partitions.


No client-side metrics are introduced.

Compatibility, Deprecation, and Migration Plan



This KIP bumps StreamsGroupHeartbeatResponse, StreamsGroupDescribeRequest, and StreamsGroupDescribeResponse to the next available version of each RPC and adds the new fields (TopologyDescriptionRequired, IncludeTopologyDescription, TopologyDescription, TopologyDescriptionStatus) at that version. In detail:

  • A pre-KIP-1331 client talking to a KIP-1331 broker never sees the new heartbeat-response flag — the broker only emits TopologyDescriptionRequired on responses at the new version, so the client behaves as before and is never asked to push.
  • A KIP-1331 admin client that requests includeTopologyDescription(true) against a pre-KIP-1331 broker fails fast with UnsupportedVersionException. The user sees a clear error.
  • A pre-KIP-1331 admin client talking to a KIP-1331 broker doesn't request the topology description, so the broker doesn't populate the new response fields — the older client sees the same describe response shape it always did.

Without a configured plugin, no flags are set and no topology descriptions are sent. There is no behavioral change for existing deployments.


DeleteGroupsRequest and DeleteGroupsResponse are bumped to version 3, which adds a per-group ErrorMessage field on the response and introduces the new GROUP_DELETION_FAILED error code. Older admin clients negotiate version 2 and never see the new ErrorMessage field; they continue to receive only the per-group ErrorCode. DeleteGroups can newly fail on streams groups when a topology description plugin is configured and its deleteTopology call fails. Brokers without a configured plugin keep today's behaviour. The broker gates the new error code on the request version — for v3 requests the per-group result carries GROUP_DELETION_FAILED with the cause string in ErrorMessage; for v2 requests it downgrades to UNKNOWN_SERVER_ERROR with no ErrorMessage, matching the convention used by KIP-1043 for GROUP_ID_NOT_FOUND on DescribeGroups (v6) and OffsetCommit (v9). The group is not tombstoned in either case, so an idempotent retry of DeleteGroups converges once the plugin recovers. No client-side change is required.

Rolling Upgrades

During a rolling upgrade of brokers, some brokers may have the plugin configured and some may not. The TopologyDescriptionRequired flag is only set by plugin-equipped coordinators; a client whose current coordinator lacks the plugin never sees the flag. If the coordinator migrates mid-push to a plugin-less broker, the client receives UNSUPPORTED_VERSION, clears the topologyDescriptionRequired flag, and does not retry. The flag is set again only if a future heartbeat response from a plugin-equipped coordinator includes TopologyDescriptionRequired=true.

During a rolling upgrade of the Streams application (topology epoch change), the broker skips the heartbeat-path gating for STALE_TOPOLOGY members — the response flag is not set, regardless of the persisted StoredDescriptionTopologyEpoch. Only members running the new topology reach the gating comparison. If every active member is stale during the rollout, the current-epoch topology remains uncaptured until at least one member heartbeats with the new epoch.

During this transition the assignment topology (advanced synchronously when a new-epoch member heartbeats) and the description topology (advanced asynchronously via the plugin) can briefly disagree: StreamsGroupDescribe may report the new topologyEpoch while TopologyDescription is still null with status NOT_STORED until the first push for the new epoch succeeds. The two reconverge once any member at the new epoch pushes its description.

Future Work

Hash-based mismatch detection. A future enhancement could introduce a topology hash to detect clients on different topology descriptions reporting the same topology epoch.

Multi-version describe. The describe response surfaces only the topology under the group's current topologyEpoch. During a rolling topology upgrade, the previous epoch's description is still stored by the plugin but is no longer reachable via describe. A natural extension is a descriptions[] array on the response, tagged by epoch, allowing operators to view both the previous and the in-flight new topology while the rollout completes.

Node grouping. A nodeGroup field on TopologyNode for UI grouping, requiring an extension to the public TopologyDescription.Node interface in Kafka Streams — best addressed in a follow-up KIP.

Test Plan

Integration Tests

Broker

  • StreamsGroupTopologyDescriptionUpdateRequestTest (new) — push happy path; StreamsTopologyDescriptionPermanentFailureException (ratchets FailedDescriptionTopologyEpoch) and StreamsTopologyDescriptionTransientFailureException (arms back-off) both surface as STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED with the cause in ErrorMessage; heartbeat-flag gating; member/group/MemberId fencing; explicit DeleteGroups plugin-success path (tombstone written) and plugin-failure path (GROUP_DELETION_FAILED returned with ErrorMessage, group not tombstoned, retry converges after plugin recovery).
  • StreamsGroupTopologyDescriptionUpdateNoPluginRequestTest (new) — broker without plugin: pushes rejected, flag never set, describe returns NOT_STORED.
  • AuthorizerIntegrationTestREAD ACL on the GROUP resource for the new RPC.

Streams client

  • TopologyDescriptionPluginIntegrationTest (new) — end-to-end push against an in-memory plugin; describe status mapping; topology.description.push.enabled=false opt-out.

CLI

  • TopologyDescriptionFormatterTest (new) — each TopologyDescriptionStatus, the pretty-print format, and the exit-code mapping.
  • StreamsGroupCommandTest — the --topology sub-action against a populated group.

System Tests

New ducktape suite streams_topology_description_plugin_test.py driving the Java harness TopologyDescriptionPluginSystemTest:

  • StreamsTopologyDescriptionPluginTest — CLI describe of a running Kafka Streams app; client opt-out; periodic-cleanup-after-retention. The last scenario lives here because realistic offsets.retention.minutes is minutes-to-hours; overriding it low enough at the integration layer would affect unrelated tests on the shared embedded cluster.
  • StreamsTopologyDescriptionPluginNoPluginTest — broker without plugin reports NOT_STORED and never asks the client to push.

Rejected Alternatives

Embedding the topology description in the heartbeat

Adding the topology description to StreamsGroupHeartbeatRequest would bloat the heartbeat with potentially large payloads and mix observability concerns with the rebalance protocol. A separate RPC keeps the feature self-contained and allows the broker to apply independent size limits and authorization. See the discussion in the KIP for a full comparison of both approaches.

Storing topologies in an internal topic

Using a compacted internal topic (like __consumer_offsets) would require the broker to materialize all topology descriptions in memory from the topic's compacted log. For clusters with many streams groups, this could consume significant heap space on the coordinator broker. A plugin-based approach avoids this by delegating storage to an external system that can handle retention and retrieval independently.

Adding a separate read RPC

A dedicated read RPC was considered but rejected in favor of extending StreamsGroupDescribe at a new version with an IncludeTopologyDescription flag. This keeps the number of new API keys minimal, groups the topology description naturally with other group metadata, and avoids bloating the describe response when the caller does not need it. The plugin remains free to expose topologies directly via its own channels in addition to the describe RPC.

Adding compression support

KIP-714 supports compression (ZStd, LZ4, GZip, Snappy) for telemetry payloads because metrics are pushed repeatedly at high frequency. Topology descriptions are pushed infrequently (only on topology epoch changes) and the wire payload is expected to fit comfortably for typical applications. The Kafka protocol's flexible versions already provide efficient serialization. Adding compression would add complexity without meaningful benefit for this use case.

Appendix: Example RPC payload

For illustration, consider a Kafka Streams application that reads from orders, filters out null values, and writes the remaining records to valid-orders:

StreamsBuilder builder = new StreamsBuilder();
builder.stream("orders")
       .filter((key, value) -> value != null)
       .to("valid-orders");

Topology#describe() produces the following text representation:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [orders])
      --> KSTREAM-FILTER-0000000001
    Processor: KSTREAM-FILTER-0000000001 (stores: [])
      --> KSTREAM-SINK-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Sink: KSTREAM-SINK-0000000002 (topic: valid-orders)
      <-- KSTREAM-FILTER-0000000001

The corresponding StreamsGroupTopologyDescriptionUpdate request body, with the topology converted to the wire format, looks like this:

{
  "GroupId": "orders-app",
  "TopologyEpoch": 0,
  "TopologyDescription": {
    "Subtopologies": [
      {
        "SubtopologyId": "0",
        "Nodes": [
          {
            "Name": "KSTREAM-SOURCE-0000000000",
            "NodeType": 1,
            "SourceTopics": ["orders"],
            "SinkTopic": null,
            "Stores": [],
            "Successors": ["KSTREAM-FILTER-0000000001"]
          },
          {
            "Name": "KSTREAM-FILTER-0000000001",
            "NodeType": 2,
            "SourceTopics": [],
            "SinkTopic": null,
            "Stores": [],
            "Successors": ["KSTREAM-SINK-0000000002"]
          },
          {
            "Name": "KSTREAM-SINK-0000000002",
            "NodeType": 3,
            "SourceTopics": [],
            "SinkTopic": "valid-orders",
            "Stores": [],
            "Successors": []
          }
        ]
      }
    ],
    "GlobalStores": []
  }
}

Predecessor edges (<-- in the text representation) are not sent on the wire; the read side reconstructs them by inverting each node's Successors.

  • No labels