Status

Current state: Under Discussion

Discussion thread: {TBD]

JIRA: KAFKA-20436 - Getting issue details... STATUS

Motivation

Background

The Model Context Protocol (MCP) is an open standard originally announced by Anthropic in November 2024 and donated to the Linux Foundation's Agentic AI Foundation in December 2025, co-founded by Anthropic, Block, and OpenAI. MCP follows a client-host-server architecture where the host application (such as Claude Desktop or VS Code) creates one MCP client per MCP server connection, with all communication using JSON-RPC 2.0 messages through stateful sessions with capability negotiation at initialization.

The protocol defines three server-side primitives:

  • Tools - functions for the AI model to execute (state-changing actions)
  • Resources - context and data for the model (read-only)
  • Prompts - templated messages and workflows

MCP was designed to solve the "N×M integration problem" where developers previously had to build custom connectors for each combination of data source and AI tool. MCP adoption is accelerating across Claude Desktop, Claude Code, GitHub Copilot (VS Code), and Google's Agent Development Kit (ADK).

Motivating Question

Should Apache Kafka provide a first-party MCP server so that AI agents can interact with Kafka clusters through the standardized MCP protocol?

Yes. Apache Kafka has a rich operational surface spanning five core APIs (Producer, Consumer, Streams, Connect, Admin) with over 100 distinct operations. Today, interacting with Kafka programmatically requires either:

  1. Writing Java/Python client code against the Kafka client libraries
  2. Using CLI tools (kafka-topics.sh, kafka-consumer-groups.sh, kafka-acls.sh)
  3. Calling the Connect REST API directly via curl

None of these are accessible to AI agents through MCP. This means developers cannot ask their AI assistant to "create a topic with 12 partitions and 3-day retention," "show me the lag for consumer group X," or "restart the failed connector task" - operations that should be trivial in an AI-assisted workflow.

Existing Implementations and Gap Analysis

At least five open-source Kafka MCP server implementations already exist (Confluent's mcp-confluent, tuannvm/kafka-mcp-server, and several Python implementations), validating the demand. However, all share critical gaps:

  • No ACL management - security administration is excluded from AI-assisted workflows
  • No transactional produce semantics - no exactly-once guarantees
  • No Kafka Streams or Share Group operations - major APIs are entirely absent
  • Confluent coupling - the most feature-complete implementation (mcp-confluent) only works with Confluent Cloud REST APIs, not vanilla Apache Kafka

This KIP Fills All Gaps

New Capabilities

A first-party MCP server for Apache Kafka will allow:

  • AI agents to create, describe, alter, and delete topics through natural language
  • AI agents to produce and consume messages, including transactional exactly-once produce
  • AI agents to monitor consumer group lag, reset offsets, and manage group membership
  • AI agents to manage ACLs - the first MCP implementation to expose security administration
  • AI agents to manage Kafka Connect connectors (create, pause, resume, restart, delete)
  • AI agents to inspect cluster health, KRaft quorum status, and broker configurations
  • AI agents to manage transactions (list, describe, abort hanging transactions, fence producers)
  • Developers to run the same tool locally (stdio) or deploy it remotely (Streamable HTTP) with OAuth 2.1

By incorporating this feature within Apache Kafka specifically:

  • More operators will have access to this feature under the Apache 2.0 license
  • The community can maintain the feature, reducing dependence on vendor-specific implementations
  • The server can be released alongside Kafka itself, ensuring API compatibility with each Kafka version
  • Vanilla Apache Kafka deployments (not just Confluent Cloud) are supported

This KIP proposes a first-party, Apache-licensed MCP server that wraps Kafka's native Java client APIs directly, supporting both vanilla Apache Kafka and managed deployments, with zero new external dependencies.

Public Interfaces

This KIP introduces no changes to the Kafka protocol, public APIs, client behaviors, or broker metrics.

It adds a new standalone module (tools/mcp-server) that packages a JSON-RPC 2.0 server exposing Kafka operations as MCP Tools (state-changing actions) and MCP Resources (read-only data). The server is a separate process - it does not run inside the broker.

New Configuration Properties

The MCP server is configured via command-line arguments and/or a properties file:

PropertyDefaultDescription
bootstrap.servers(required)Kafka broker addresses
mcp.transportstdioTransport mode: stdio or http
mcp.http.port9090HTTP port (when mcp.transport=http). Default avoids conflict with common services on 8080.
mcp.connect.url(none)Kafka Connect REST URL (optional, enables Connect tools). If Connect has authentication enabled, provide credentials via mcp.connect.auth.username and mcp.connect.auth.password.
security.protocolPLAINTEXTKafka security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)
sasl.mechanism(none)SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI)
sasl.jaas.config(none)JAAS configuration for SASL authentication
ssl.truststore.location(none)SSL truststore path
ssl.keystore.location(none)SSL keystore path (for mTLS)

All standard Kafka client properties (security.protocol, sasl.*, ssl.*) are passed through directly to the underlying Admin, KafkaProducer, and KafkaConsumer instances.

New MCP Tools (State-Changing Operations)

Topic Management:

Tool NameDescription
create_topicCreate a new topic with partitions, replication factor, and optional configs
delete_topicDelete a topic
alter_topic_configIncrementally alter topic configuration
create_partitionsIncrease partition count for a topic
delete_recordsDelete records before a given offset

Message Operations:

Tool NameDescription
produce_messageProduce a single message with optional key, headers, partition, timestamp
produce_batchProduce multiple messages atomically with flush
produce_transactionalProduce messages within an exactly-once transaction
consume_messagesConsume up to N messages from a topic with configurable offset reset

Consumer Group Management:

Tool NameDescription
delete_consumer_groupDelete a consumer group
alter_consumer_group_offsetsReset offsets for a consumer group (group must be empty)
remove_group_membersForce-remove members from a consumer group
delete_consumer_group_offsetsDelete committed offsets for specific partitions

ACL Management:

Tool NameDescription
create_aclsCreate access control list entries
delete_aclsDelete access control list entries matching a filter

Cluster Operations:

Tool NameDescription
alter_broker_configIncrementally alter broker configuration
elect_leadersTrigger preferred or unclean leader election
alter_partition_reassignmentsReassign partition replicas across brokers
alter_client_quotasAlter client quota configurations

Transaction Management:

Tool NameDescription
abort_transactionAbort a hanging transaction by coordinator
fence_producersFence transactional producers to force epoch bump

Connect Operations (requires mcp.connect.url):

Tool NameDescription
create_connectorCreate a new connector
update_connector_configUpdate connector configuration
delete_connectorDelete a connector
restart_connectorRestart a connector (optionally including tasks)
pause_connectorPause a running connector
resume_connectorResume a paused connector
stop_connectorStop a connector
restart_taskRestart a specific connector task

New MCP Resources (Read-Only Data)

Resource URIs use the kafka:// scheme as internal MCP identifiers. This is a custom scheme used only within the MCP protocol for resource discovery and is not registered with IANA. It is never exposed on the network - MCP clients resolve resources by calling the resources/read JSON-RPC method with the URI as a parameter. This follows the same pattern used by other MCP servers (e.g., github://, postgres://).

Resource URIDescription
kafka://topicsList all topics
kafka://topics/{name}Describe topic (partitions, replicas, ISR, configs)
kafka://topics/{name}/offsetsEarliest and latest offsets per partition
kafka://groupsList all groups (consumer, streams, share, classic)
kafka://groups/{id}Describe consumer group (members, assignments, state)
kafka://groups/{id}/offsetsCommitted offsets per partition
kafka://groups/{id}/lagPer-partition consumer lag (computed: end offset minus committed)
kafka://clusterCluster ID, controller, broker list
kafka://cluster/configs/{brokerId}Broker configuration
kafka://cluster/log-dirs/{brokerId}Log directory sizes and partition assignments
kafka://cluster/metadata-quorumKRaft quorum status and voter lag
kafka://cluster/featuresSupported and finalized feature versions
kafka://aclsAll ACL bindings (filterable by resource type and name)
kafka://transactionsActive transactions
kafka://transactions/{id}Transaction state, PID, epoch, partitions
kafka://streams-groups/{id}Streams group topology, members, state
kafka://share-groups/{id}Share group members and state
kafka://connectorsAll connectors with status (requires mcp.connect.url)
kafka://connectors/{name}/statusConnector and task states

Proposed Changes

Architecture

The MCP server is a standalone Java process that wraps three existing Kafka client interfaces. It does not run inside the broker.

MCP Primitives Mapping

MCP defines three server-side primitives. This KIP uses two:

MCP PrimitiveKafka MappingDescription
ToolsState-changing operationsCreate topics, produce messages, alter configs, manage ACLs
ResourcesRead-only dataList topics, describe groups, view offsets, cluster health
PromptsNot used in Phase 1Templated workflows (future enhancement)


Tool Implementations - Kafka API Mapping

Each MCP Tool maps to a specific method on Kafka's Java client APIs. Below is the exact mapping with source file references verified against the Kafka codebase.

Topic Management Tools

create_topic wraps Admin.createTopics(Collection<NewTopic>) (Admin.java line 180).

Parameters: name (string, required), partitions (int, optional - broker default if omitted), replicationFactor (short, optional - broker default if omitted), configs (map, optional).

The MCP server constructs a NewTopic object using the Optional constructor so that omitted values fall back to broker defaults (num.partitions, default.replication.factor):


// NewTopic.java line 56:
// public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor)
NewTopic newTopic = new NewTopic(name,
    partitions != null ? Optional.of(partitions) : Optional.empty(),
    replicationFactor != null ? Optional.of(replicationFactor) : Optional.empty());
if (configs != null) {
    newTopic.configs(configs);  // e.g. {"retention.ms": "259200000"}
}
admin.createTopics(Collections.singleton(newTopic)).all().get();

delete_topic wraps Admin.deleteTopics(Collection<String>) (Admin.java line 212).

alter_topic_config wraps Admin.incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>>) (Admin.java line 501). Uses AlterConfigOp.OpType.SET for each config entry.

create_partitions wraps Admin.createPartitions(Map<String, NewPartitions>) (Admin.java line 634). Note: partition count can only be increased, never decreased.

delete_records wraps Admin.deleteRecords(Map<TopicPartition, RecordsToDelete>) (Admin.java line 691). Deletes all records before the specified offset.

Message Operation Tools

produce_message wraps KafkaProducer.send(ProducerRecord) (KafkaProducer.java line 244).

Parameters: topic (string, required), value (string, required), key (string, optional), partition (int, optional), timestamp (long, optional), headers (map, optional).

The MCP server maintains a singleton KafkaProducer<String, String> with StringSerializer for both key and value:


// ProducerRecord constructor from ProducerRecord.java line 50:
// ProducerRecord(String topic, Integer partition, Long timestamp,
//                K key, V value, Iterable<Header> headers)
ProducerRecord<String, String> record = new ProducerRecord<>(
    topic, partition, timestamp, key, value, headers);
RecordMetadata metadata = producer.send(record).get();
// Returns: topic, partition, offset, timestamp

produce_transactional wraps the full transactional lifecycle from Producer.java:


// Producer.java transactional methods:
// default void initTransactions()  - delegates to initTransactions(false)
// void beginTransaction() throws ProducerFencedException
// void commitTransaction() throws ProducerFencedException
// void abortTransaction() throws ProducerFencedException

producer.initTransactions();
producer.beginTransaction();
try {
    for (Message msg : messages) {
        producer.send(new ProducerRecord<>(msg.topic, msg.key, msg.value));
    }
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    throw e;
}

This provides exactly-once semantics - a capability absent from all existing MCP implementations. The transactional producer is created on-demand with the user-supplied transactionalId set via ProducerConfig.TRANSACTIONAL_ID_CONFIG.

consume_messages creates a short-lived KafkaConsumer per request.

Parameters: topic (string, required), groupId (string, optional - random UUID if omitted), partitions (int[], optional), fromBeginning (boolean, default false), maxMessages (int, default 10), timeout (long, default 5000ms).


// Consumer.java interface methods used:
// void subscribe(Collection<String> topics)           - line 46
// void assign(Collection<TopicPartition> partitions)  - line 56
// ConsumerRecords<K, V> poll(Duration timeout)        - line 92

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
    groupId != null ? groupId : "mcp-" + UUID.randomUUID());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
    fromBeginning ? "earliest" : "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxMessages));

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props,
        new StringDeserializer(), new StringDeserializer())) {
    if (partitions != null) {
        consumer.assign(partitions);
    } else {
        consumer.subscribe(Collections.singletonList(topic));
    }
    if (fromBeginning) {
        // poll once to get assignment, then seek
        consumer.poll(Duration.ZERO);
        consumer.seekToBeginning(consumer.assignment());
    }
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(timeout));
    // Convert to JSON: [{topic, partition, offset, key, value, timestamp, headers}]
}

Design decisions:

  • Short-lived consumers - each call creates and closes a consumer. KafkaConsumer is explicitly not thread-safe (documented in class Javadoc). Short-lived instances avoid concurrency issues.
  • enable.auto.commit=false - prevents the MCP server from committing offsets for groups it does not own.
  • Random group ID fallback - if no groupId is provided, a random one is generated so the consumer does not interfere with existing groups.

Consumer Group Management Tools

delete_consumer_group wraps Admin.deleteConsumerGroups(Collection<String>) (Admin.java line 986).

alter_consumer_group_offsets wraps Admin.alterConsumerGroupOffsets(String, Map<TopicPartition, OffsetAndMetadata>) (Admin.java line 1281). The group must be empty (no active members) for this to succeed.

remove_group_members wraps Admin.removeMembersFromConsumerGroup(String, RemoveMembersFromConsumerGroupOptions) (Admin.java line 1269). Can force-remove specific members or all members.

delete_consumer_group_offsets wraps Admin.deleteConsumerGroupOffsets(String, Set<TopicPartition>) (Admin.java line 1035).

ACL Management Tools

create_acls wraps Admin.createAcls(Collection<AclBinding>) (Admin.java line 393).

Parameters: acls[] - each entry contains resourceType (TOPIC, GROUP, CLUSTER, etc.), resourceName, patternType (LITERAL, PREFIXED), principal, host, operation (READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, etc.), permissionType (ALLOW, DENY).


// From clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java:
// AclBinding(ResourcePattern pattern, AccessControlEntry entry)
//   ResourcePattern: resourceType + name + patternType
//   AccessControlEntry: principal + host + operation + permissionType

AclBinding binding = new AclBinding(
    new ResourcePattern(ResourceType.TOPIC, "my-topic", PatternType.LITERAL),
    new AccessControlEntry("User:alice", "*", AclOperation.READ, AclPermissionType.ALLOW)
);
admin.createAcls(Collections.singleton(binding)).all().get();

This is the first MCP implementation to expose ACL management - no existing project supports it.

delete_acls wraps Admin.deleteAcls(Collection<AclBindingFilter>) (Admin.java line 422).

Cluster Operation Tools

alter_broker_config wraps Admin.incrementalAlterConfigs(...) (Admin.java line 501) with ConfigResource.Type.BROKER.

elect_leaders wraps Admin.electLeaders(ElectionType, Set<TopicPartition>) (Admin.java line 1094). Supports PREFERRED and UNCLEAN election types.

alter_partition_reassignments wraps Admin.alterPartitionReassignments(...) (Admin.java line 1147).

alter_client_quotas wraps Admin.alterClientQuotas(Collection<ClientQuotaAlteration>) (Admin.java line 1398).

Transaction Management Tools

abort_transaction wraps Admin.abortTransaction(AbortTransactionSpec) (Admin.java line 1733). Used to abort hanging transactions that block LSO advancement.

fence_producers wraps Admin.fenceProducers(Collection<String>) (Admin.java line 1780). Forces a producer epoch bump, fencing old producers.

Connect Tools

Connect tools do NOT use the Java client libraries. They make HTTP calls to the Kafka Connect REST API, whose endpoints are defined in ConnectorsResource.java (connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java).

ToolHTTP MethodEndpointConnectorsResource Method
create_connectorPOST/connectorscreateConnector() (line 103)
update_connector_configPUT/connectors/{name}/configputConnectorConfig() (line 155)
delete_connectorDELETE/connectors/{name}destroyConnector() (line 268)
restart_connectorPOST/connectors/{name}/restartrestartConnector() (line 195)
pause_connectorPUT/connectors/{name}/pausepauseConnector() (line 243)
resume_connectorPUT/connectors/{name}/resumeresumeConnector() (line 251)
stop_connectorPUT/connectors/{name}/stopstopConnector() (line 230)
restart_taskPOST/connectors/{name}/tasks/{id}/restartrestartTask() (line 280)

Resource Implementations - Kafka API Mapping

Consumer Lag Computation

The kafka://groups/{id}/lag resource is the most commonly requested operation from AI agents. It is not a single API call - the MCP server computes it by combining two Admin API calls:


// Step 1: Get committed offsets
// Admin.listConsumerGroupOffsets(String) - Admin.java line 928
Map<TopicPartition, OffsetAndMetadata> committed =
    admin.listConsumerGroupOffsets(groupId)
         .partitionsToOffsetAndMetadata().get();

// Step 2: Get end offsets for same partitions
// Admin.listOffsets(Map<TopicPartition, OffsetSpec>) - Admin.java line 1334
Map<TopicPartition, OffsetSpec> latestSpec = committed.keySet().stream()
    .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
    admin.listOffsets(latestSpec).all().get();

// Step 3: Compute lag per partition
// lag = endOffset - committedOffset
Map<TopicPartition, Long> lag = committed.entrySet().stream()
    .collect(Collectors.toMap(
        Map.Entry::getKey,
        e -> endOffsets.get(e.getKey()).offset() - e.getValue().offset()
    ));

Cluster Resources

kafka://cluster wraps Admin.describeCluster() (Admin.java line 342) - returns cluster ID, controller node, and broker list.

kafka://cluster/metadata-quorum wraps Admin.describeMetadataQuorum() (Admin.java line 1623) - returns KRaft quorum status including voter lag, last fetch timestamp, and log end offset per voter. This is critical for monitoring KRaft cluster health.

kafka://cluster/features wraps Admin.describeFeatures() (Admin.java line 1525) - returns supported and finalized feature versions.

Group Resources

The MCP server supports all four group types in Kafka 4.x:

ResourceAdmin API MethodAdmin.java Line
Consumer groupsdescribeConsumerGroups()866
Streams groupsdescribeStreamsGroups()2063
Share groupsdescribeShareGroups()1948
Classic groupsdescribeClassicGroups()2086

kafka://groups uses Admin.listGroups() (Admin.java line 1072) which returns all group types. The resource response includes the group type so the AI agent can distinguish between them.

Transport Mechanisms

MCP defines two standard transports. The server supports both:

stdio - The AI client launches the server as a subprocess. JSON-RPC 2.0 messages are exchanged over stdin/stdout as newline-delimited JSON. This is the default mode, used for local development with Claude Desktop, VS Code, etc.

{
  "mcpServers": {
    "kafka": {
      "command": "java",
      "args": ["-jar", "kafka-mcp-server.jar",
               "--bootstrap-servers", "localhost:9092"]
    }
  }
}

Streamable HTTP - The server runs as an independent HTTP service. Clients send JSON-RPC requests via HTTP POST. The server optionally uses Server-Sent Events (SSE) for streaming responses. This mode is suited for remote and production deployments.

java -jar kafka-mcp-server.jar \
  --transport http \
  --port 8080 \
  --bootstrap-servers broker1:9092,broker2:9092 \
  --security-protocol SASL_SSL \
  --sasl-mechanism SCRAM-SHA-512

Security Architecture


Layer 1: Kafka Broker Authentication. The server connects to Kafka as a regular client. All standard Kafka authentication mechanisms are supported via standard client configuration properties (security.protocol, sasl.mechanism, sasl.jaas.config, ssl.*). These properties are passed directly to Admin.create(props), new KafkaProducer(props), and new KafkaConsumer(props). No custom authentication code is needed.

Layer 2: MCP Protocol Authentication (Streamable HTTP only). For remote deployments, the server implements OAuth 2.1 with PKCE as specified in the MCP specification (March 2025 update). The MCP server acts as an OAuth resource server, validating bearer tokens on each request. For stdio transport (local development), no MCP-layer authentication is needed because the AI client launches the server as a local subprocess.

Authorization. The MCP server inherits Kafka's ACL-based authorization. If the configured Kafka user does not have DELETE permission on a topic, delete_topic will fail with TopicAuthorizationException. The MCP server surfaces this as a structured JSON-RPC error response to the AI agent. Operators should configure the MCP server's Kafka credentials with least-privilege ACLs.

Integration with KIP-1298 (Per-Resource-Type Authorization). Standard Kafka ACLs operate at the resource level (topic, group, cluster). KIP-1298 proposes finer-grained per-resource-type authorization, which is critical for AI agent access control. When KIP-1298 is available, the MCP server's Kafka credentials can be restricted to specific resource types - for example, allowing an AI agent to DESCRIBE topics but not ALTER them, or to READ consumer groups but not DELETE them. This provides defense-in-depth beyond the tool-level restrictions: even if an AI agent hallucinates or is prompt-injected into calling delete_topic, the broker-side authorization rejects it.

Tool Allow-List. As an additional guardrail against prompt injection and hallucination, the MCP server supports a configurable tool allow-list:

PropertyDefaultDescription
mcp.tools.allowed* (all)Comma-separated list of tool names the server will register. Tools not in this list are never exposed to the AI agent.
mcp.tools.denied(none)Comma-separated list of tool names to explicitly block. Takes precedence over mcp.tools.allowed.

Example: to restrict an AI agent to read-only operations only:

mcp.tools.allowed=consume_messages
mcp.tools.denied=delete_topic,delete_consumer_group,create_acls,delete_acls,alter_partition_reassignments


his is a deterministic hard control - denied tools are never registered in the MCP capability negotiation, so the AI agent cannot discover or invoke them regardless of prompt content.

Data Governance and PII Masking

Providing an AI model direct access to topic data is a security risk in regulated industries. Message payloads consumed via consume_messages may contain PII (credit card numbers, medical records, email addresses) that should never reach an LLM's context window.

The MCP server provides an extensible interceptor interface for payload redaction:

/**
 * Interceptor that processes consumed records before they are returned
 * to the AI agent via JSON-RPC response. Implementations can redact,
 * mask, or filter sensitive fields.
 */
public interface McpRecordInterceptor {
    /**
     * Process a consumed record before it is serialized to JSON-RPC response.
     * @param record The consumed record (topic, partition, offset, key, value, headers)
     * @return The redacted record, or null to exclude entirely
     */
    ConsumerRecord<String, String> intercept(ConsumerRecord<String, String> record);
}


PropertyDefaultDescription
mcp.interceptor.classes(none)Comma-separated list of McpRecordInterceptor implementation classes. Applied in order.

Operators in regulated environments can implement interceptors that:

  • Regex-match and mask credit card numbers, SSNs, email addresses
  • Strip specific JSON fields from message values
  • Exclude entire topics from consume_messages responses
  • Integrate with external data classification services

The interceptor runs inside the MCP server process, before the payload is serialized into the JSON-RPC response - the LLM never sees the unredacted data.

Rate Limiting and Noisy Neighbor Controls

Autonomous AI agents are not predictable microservices - they can execute sudden, bursty metadata queries that could overwhelm the KRaft controller or saturate broker resources. The MCP server implements rate limiting at two levels:

Level 1: MCP Server-Side Rate Limiting. The server enforces configurable request rate limits before any Kafka API call is made:

PropertyDefaultDescription
mcp.rate.limit.requests.per.second50Maximum JSON-RPC requests per second across all tools
mcp.rate.limit.produce.bytes.per.second10485760 (10 MB)Maximum produce throughput per second
mcp.rate.limit.consume.bytes.per.second52428800 (50 MB)Maximum consume throughput per second
mcp.rate.limit.admin.requests.per.second20Maximum Admin API calls per second (protects KRaft controller)

Requests exceeding the rate limit receive a JSON-RPC error response with code -32029 (rate limited) and a Retry-After header (for HTTP transport). The AI agent can retry after the specified interval.

Level 2: Kafka Client Quotas. The MCP server's Kafka user should be configured with broker-side client quotas (Admin.alterClientQuotas()) to enforce hard limits at the broker level:


# Recommended broker-side quotas for the MCP server's Kafka user
# Set via kafka-configs.sh or Admin.alterClientQuotas()
consumer_byte_rate=52428800    # 50 MB/s consume
producer_byte_rate=10485760    # 10 MB/s produce
request_percentage=10          # max 10% of broker request handler capacity


This dual-layer rate limiting ensures that even if the MCP server-side limits are misconfigured, the broker-side quotas provide a hard ceiling that protects the core streaming infrastructure.

Module Structure

tools/
└── mcp-server/
    ├── build.gradle
    └── src/
        ├── main/java/org/apache/kafka/tools/mcp/
        │   ├── KafkaMcpServer.java           # Entry point, transport selection
        │   ├── McpRequestHandler.java         # JSON-RPC dispatch
        │   ├── tools/
        │   │   ├── TopicTools.java            # create_topic, delete_topic, etc.
        │   │   ├── MessageTools.java          # produce, consume
        │   │   ├── GroupTools.java            # Consumer/Streams/Share group ops
        │   │   ├── AclTools.java              # create_acls, delete_acls
        │   │   ├── ClusterTools.java          # alter_broker_config, elect_leaders
        │   │   ├── TransactionTools.java      # abort_transaction, fence_producers
        │   │   └── ConnectTools.java          # Connect REST API wrapper
        │   ├── resources/
        │   │   ├── TopicResources.java        # kafka://topics/*
        │   │   ├── GroupResources.java        # kafka://groups/* + lag computation
        │   │   ├── ClusterResources.java      # kafka://cluster/*
        │   │   ├── AclResources.java          # kafka://acls
        │   │   ├── TransactionResources.java  # kafka://transactions/*
        │   │   └── ConnectResources.java      # kafka://connectors/*
        │   └── transport/
        │       ├── StdioTransport.java        # stdin/stdout JSON-RPC
        │       └── HttpTransport.java         # Streamable HTTP + SSE
        └── test/java/org/apache/kafka/tools/mcp/
            ├── TopicToolsTest.java
            ├── MessageToolsTest.java
            ├── GroupResourcesTest.java
            └── ...

Dependencies

DependencyVersionPurposeAlready in Kafka?
jackson-databind2.20.1JSON-RPC serializationYes (gradle/dependencies.gradle)
jetty-server12.0.32HTTP transportYes (used by Kafka Connect)
Kafka clients moduleCurrentAdmin, Producer, Consumer APIsYes (internal module)

Zero new external dependencies. The MCP server uses only libraries already present in the Kafka build. Jackson handles JSON-RPC serialization. Jetty (already used by Kafka Connect for its REST API) handles HTTP transport.

Gradle Integration

The new module must be registered in settings.gradle alongside the existing tools submodules:


'tools:mcp-server',


// tools/mcp-server/build.gradle
plugins {
    id 'java'
    id 'application'
    id 'com.gradleup.shadow'  // already in root build.gradle with 'apply false'
}

dependencies {
    implementation project(':clients')
    implementation libs.jacksonDatabind
    implementation libs.jettyServer
    implementation libs.slf4jApi
    runtimeOnly libs.log4j2Api
    runtimeOnly libs.log4j2Core

    testImplementation libs.junitJupiter
    testImplementation libs.mockitoCore
}

application {
    mainClass = 'org.apache.kafka.tools.mcp.KafkaMcpServer'
}

The module is built as part of ./gradlew build but packaged separately - it does not ship inside the broker distribution. It produces a standalone JAR via ./gradlew :tools:mcp-server:shadowJar.

Distribution

The shadow JAR is a self-contained fat JAR that includes the Kafka clients library, Jackson, Jetty, and all transitive dependencies. It does NOT require Kafka libraries on the classpath. The com.gradleup.shadow plugin is already declared in Kafka's root build.gradle (version 8.3.9, apply false) and is used by the clients module and jmh-benchmarks module - so it is already approved for use in the Kafka build.

The shadow JAR is distributed as a separate artifact (kafka-mcp-server-{version}.jar) alongside the main Kafka release tarball. It is NOT included inside the broker distribution to keep the broker footprint unchanged.

Observability

The Kafka MCP server exposes JMX metrics under the kafka.mcp domain, following the same pattern as Kafka's existing client metrics:

MetricTypeDescription
kafka.mcp:type=server,name=request-totalCounterTotal JSON-RPC requests received
kafka.mcp:type=server,name=request-error-totalCounterTotal requests that returned an error
kafka.mcp:type=server,name=request-latency-avgGaugeAverage request latency in ms
kafka.mcp:type=server,name=request-latency-p99Gauge99th percentile request latency in ms
kafka.mcp:type=server,name=active-consumersGaugeNumber of short-lived consumers currently open
kafka.mcp:type=tool,name={toolName}-totalCounterPer-tool invocation count
kafka.mcp:type=tool,name={toolName}-error-totalCounterPer-tool error count

Logging uses SLF4J with Log4j2 (same as all Kafka tools). Logging levels:

  • INFO: Server startup/shutdown, transport binding, connection events
  • DEBUG: Individual JSON-RPC request/response summaries (tool name, duration, success/error)
  • TRACE: Full JSON-RPC request/response bodies (for development debugging only - may contain message payloads)

Sensitive data (SASL credentials, SSL keystore passwords) is never logged at any level.

Graceful Shutdown

On receiving SIGTERM or SIGINT, the Kafka MCP server performs an orderly shutdown:

  1. Stop accepting new requests (return 503 Service Unavailable for HTTP, close stdin for stdio)
  2. Wait for in-flight requests to complete (30-second timeout)
  3. Flush the singleton KafkaProducer to ensure pending records are delivered
  4. Close KafkaProducer, Admin, and any active KafkaConsumer instances with a 10-second timeout
  5. Stop the HTTP server (if running)
  6. Exit cleanly

Short-lived consumers created by consume_messages are closed within the request handler (try-with-resources), so they do not need special shutdown handling.

Phased Rollout

PhaseScopeIncluded
Phase 1Core operationscreate_topic, delete_topic, produce_message, consume_messages, alter_consumer_group_offsets, delete_consumer_group, kafka://topics, kafka://groups, kafka://groups/{id}/lag, kafka://cluster
Phase 2Security + Connectcreate_acls, delete_acls, all Connect tools, kafka://acls, kafka://connectors/*
Phase 3Advancedproduce_transactional, abort_transaction, fence_producers, elect_leaders, kafka://transactions/*, kafka://streams-groups/*, kafka://share-groups/*, kafka://cluster/metadata-quorum

Further Work

In addition to the phased rollout described above, the following are natural extensions that can be pursued after the core implementation is stable:

  • MCP Prompts: Templated workflows such as "troubleshoot consumer lag for group X," "set up a new topic with production-ready configs," or "diagnose why connector Y is failing." Prompts compose multiple tools and resources into guided multi-step workflows.
  • Multi-cluster support: Allow the MCP server to manage multiple Kafka clusters via cluster aliases (e.g., kafka://prod/topics, kafka://staging/topics). Requires a routing layer and per-cluster credential management.
  • Schema Registry integration: Expose Schema Registry operations (register schema, check compatibility, list subjects) as MCP tools and resources. Schema Registry is not part of Apache Kafka, so this would be an optional module wrapping the Schema Registry REST API.
  • AI-driven operations: Intelligent consumer lag remediation (detect lag → diagnose root cause → suggest scaling), automated dead-letter queue analysis (classify failure patterns → suggest fixes), and smart schema evolution (check compatibility before proposing changes).
  • Binary message support: Phase 1 uses StringSerializer/StringDeserializer. Future work can add Avro, Protobuf, and JSON Schema support via pluggable serializers, potentially integrated with Schema Registry.
  • Kafka Streams topology visualization: Expose Streams topology metadata as an MCP resource so AI agents can visualize and reason about stream processing pipelines.

These components are less defined and do not have KIPs attached. Contributions are welcome.

Relationships to Other Open KIPs

Because the MCP server exposes Kafka's operational surface to AI agents, it will naturally benefit from and interact with other features:


  • KIP-1150: Diskless Topics (Accepted) - The MCP server's create_topic tool can expose Diskless topic type configuration once KIP-1150's sub-KIPs (KIP-1163, KIP-1164) are implemented, allowing AI agents to provision cost-optimized topics on hyperscaler clouds.
  • KIP-1279: Cluster Mirroring - The MCP server can expose mirror link status and management operations once KIP-1279 adds the relevant Admin API methods, enabling AI-assisted DR management.
  • KIP-848: The Next Generation of the Consumer Rebalance Protocol - The MCP server's consumer group resources already use Admin.listGroups() which returns all group types including the new consumer group protocol. No changes needed.
  • KIP-932: Queues for Kafka (Share Groups) - The MCP server already includes kafka://share-groups/{id} resources wrapping Admin.describeShareGroups() and Admin.listShareGroupOffsets().

Compatibility, Deprecation, and Migration Plan

This KIP adds a new standalone tool module. It does not modify any existing Kafka code, protocol, or public API.

  • Backward compatibility: Not applicable - entirely new module.
  • Deprecation: None.
  • Migration: None - opt-in tool.

The MCP server targets Kafka 4.0+ (KRaft mode). It works with any Kafka cluster that the standard Java clients can connect to. The server uses only the public Admin, KafkaProducer, and KafkaConsumer APIs, so it is forward-compatible with future Kafka versions as long as these APIs remain stable (they are part of Kafka's public interface contract).

Test Plan

Unit Tests

Each tool and resource class has corresponding JUnit 5 tests using mocked Admin, KafkaProducer, and KafkaConsumer instances:

  • TopicToolsTest - verify create_topic constructs correct NewTopic, handles TopicExistsException gracefully
  • MessageToolsTest - verify produce_message constructs correct ProducerRecord, returns RecordMetadata fields (topic, partition, offset, timestamp)
  • MessageToolsTest - verify produce_transactional calls initTransactions()  beginTransaction()  send()  commitTransaction() in order, and calls abortTransaction() on failure
  • AclToolsTest - verify create_acls constructs correct AclBinding objects from JSON parameters
  • GroupResourcesTest - verify lag computation correctly subtracts committed offsets from end offsets
  • ConnectToolsTest - verify HTTP calls to Connect REST endpoints with correct method, path, and body

Integration Tests

Integration tests using Kafka's existing kafka.test.ClusterTestExtensions test infrastructure:

  1. Start embedded Kafka cluster
  2. Start MCP server pointing at cluster via stdio transport
  3. Send JSON-RPC requests (e.g., create_topic, produce_message, consume_messages)
  4. Verify Kafka state changes (topic exists, message retrievable, offsets committed)
  5. Verify error handling (create duplicate topic returns structured error, produce to non-existent topic returns structured error)

Manual Validation

# Build
./gradlew :tools:mcp-server:shadowJar

# Test with stdio transport (pipe JSON-RPC)
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list"}' | \
  java -jar tools/mcp-server/build/libs/kafka-mcp-server.jar \
  --bootstrap-servers localhost:9092

# Or configure in Claude Desktop mcp.json and test interactively

Documentation Plan

The following documentation will be added or updated:

  • New page: MCP Server - A dedicated documentation page under the Kafka Tools section describing the MCP server, its configuration properties, supported tools and resources, transport mechanisms, security setup, and usage examples with Claude Desktop, VS Code, and Google ADK.
  • Update: CONTRIBUTING.md - Add a note about the MCP server for contributors who want to test Kafka operations via AI agents.
  • Update: Kafka Tools documentation - Add the MCP server to the list of available Kafka tools alongside kafka-topics.sh, kafka-consumer-groups.sh, etc.
  • Inline Javadoc - All public classes in tools/mcp-server will include Javadoc describing their purpose, parameters, and relationship to the underlying Kafka API methods.

Rejected Alternatives

Embedding MCP in the broker process. The MCP server is a separate process. Embedding it in the broker would couple AI tool concerns with broker stability, increase the broker's attack surface, and require broker restarts to update MCP capabilities. Kafka's existing tools (kafka-topics.sh, kafka-consumer-groups.sh) follow the same pattern of being standalone client-side processes.

Using Confluent's mcp-confluent. Tightly coupled to Confluent Cloud REST APIs. Does not work with vanilla Apache Kafka. Not Apache-licensed for inclusion in the Kafka project.

Python or Go implementation. Kafka's client libraries are Java-native. A Java MCP server avoids serialization overhead and version skew between client library wrappers in other languages. It also integrates naturally with Kafka's existing Gradle build and test infrastructure, and can be released alongside Kafka itself.

Extending the broker's REST API instead of MCP. Kafka brokers do not have a REST API (only Kafka Connect does). Adding a REST API to the broker would be a much larger change requiring its own KIP. MCP is the standardized protocol that AI agents already speak - meeting agents where they are is more practical than inventing a new broker-side API.

Exposing Kafka Streams topology management. Kafka Streams applications are user-space programs, not cluster-side resources. The Admin API provides describeStreamsGroups() (Admin.java line 2063) and listStreamsGroupOffsets() (Admin.java line 965) for monitoring, but starting/stopping Streams apps is outside the broker's control. The MCP server exposes what the Admin API provides.

Adding MCP Prompts in Phase 1. MCP Prompts are templated workflows (e.g., "troubleshoot consumer lag step by step"). These are valuable but can be added incrementally after the core Tools and Resources are stable. Deferring reduces initial scope and controversy.

Supporting multiple clusters in Phase 1. Multi-cluster support (with cluster aliases and routing) adds significant complexity. Phase 1 targets a single cluster via one bootstrap.servers configuration. Multi-cluster support can be added in a follow-up KIP.

FAQ:

Q: Why not extend Kafka Connect's REST API instead? A: Kafka Connect's REST API manages connectors, not Kafka itself. It cannot create topics, manage ACLs, produce/consume messages, or inspect consumer groups. The MCP server wraps the Admin, Producer, and Consumer APIs - a fundamentally different surface. Connect integration is included as one category among eight, not the foundation.

Q: Should this be a separate Apache project instead of part of Kafka? A: No. The MCP server's value comes from tight coupling with Kafka's client APIs and release cycle. Shipping it alongside Kafka ensures API compatibility with each version. Kafka's existing tools/ module already contains CLI tools (kafka-topics.sh, kafka-consumer-groups.sh) that wrap the same Admin API - the MCP server is the same pattern for a different protocol. A separate project would introduce version skew and maintenance fragmentation.

Q: What is the maintenance burden as MCP evolves? A: The MCP protocol surface used by this KIP is small: JSON-RPC 2.0 request dispatch, tool/resource registration, and capability negotiation. The bulk of the code is Kafka API wrappers, which change only when the Admin/Producer/Consumer APIs change (rare, and we control those). MCP protocol changes (new primitives, transport updates) are additive and backward-compatible by design. The maintenance burden is comparable to maintaining the existing CLI tools.

Q: Should we wait for MCP to stabilize further? A: MCP's core primitives (Tools, Resources, Prompts) and transports (stdio, Streamable HTTP) are stable as of the June 2025 specification. The protocol is governed by the Linux Foundation's Agentic AI Foundation with Anthropic, Block, and OpenAI as co-founders. Waiting risks ceding the ecosystem to vendor-specific implementations (Confluent's mcp-confluent already exists). Shipping now under Apache 2.0 ensures the community controls the canonical Kafka MCP integration.

Q: How does this handle binary/non-string message payloads? A: Phase 1 uses StringSerializer/StringDeserializer for simplicity. Binary payloads can be base64-encoded in the JSON request. Avro, Protobuf, and JSON Schema support via pluggable serializers (potentially integrated with Schema Registry) is listed under Further Work.

  • No labels