DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-20029
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Internal topics in Kafka, specifically __transaction_state, __consumer_offsets, and the recently introduced __share_group_state (KIP-932),
rely on a specific hashing mechanism to route keys (transaction IDs, group IDs) to partitions.
For example, the routing logic for the Transaction Coordinator maps a Transaction ID to a specific partition of __transaction_state using the following logic:
Utils.abs(transactionId.hashCode()) % transactionTopicPartitionCount
This logic creates a strict dependency on the transactionTopicPartitionCount.
If the number of partitions changes (e.g., expanding from 50 to 150), the mapping logic changes immediately.
While the Kafka documentation advises against modifying these internal topics,
there is currently no hard guardrail preventing this operation via the Admin API CreatePartitions.
The Problem: Split-Brain and Data Inconsistency
Allowing dynamic partition expansion for these internal topics leads to critical availability and correctness issues, including split-brain scenarios, orphan transactions, and hanging transactions.
Here is a breakdown of the race condition identified during an expansion scenario (e.g., 50 -> 150 partitions):
Metadata Inconsistency: During the expansion, the metadata cache updates are not atomic across all nodes.
Split-Brain View:
- Old Broker A (stale metadata): Sees 50 partitions. Calculates
hash("A") % 50-> Partition 27. - Newly added Broker B (updated metadata): Sees 150 partitions. Calculates
hash("A") % 150-> Partition 130.
- Old Broker A (stale metadata): Sees 50 partitions. Calculates
Routing Divergence: A producer sending
FindCoordinatorfor Transaction ID "A" may be routed to Partition 130 or Partition 27 depending on which broker handles the request.Orphan/Hanging Transactions:
If a client interacts with the new coordinator (Partition 130), it creates state there.
If the client later interacts with a broker using the old metadata (partition 50), it searches Partition 27.
As a result, the transaction state may be written to an unexpected partition, which can lead to orphan transactions that can no longer be correctly committed or aborted.
This issue is not limited to __transaction_state :
__consumer_offsets: Partition expansion of __consumer_offsets can cause group IDs to be mapped to different partitions across brokers while metadata is converging. As a result, offset commits, offset fetches, and group coordination operations may fail in a non-deterministic manner depending on which broker handles the request. During this window, it may be impossible to reliably read and write committed offsets, and some groups may become effectively unfindable until the cluster state fully converges__share_group_state: Similar routing logic applies to share groups, risking state inconsistency.
Currently, this is only prevented by documentation "advice," which is insufficient to prevent critical human errors or automated misconfigurations.
Public Interfaces
- Configuration
controller.unstable.allow.internal.coordinator.partition.increase- Type: boolean
- Default: false
- Description: When set to
true, the controller allows partition expansion for internal topics. This is unsafe.
- Error Codes
INVALID_REQUEST: Returned for specific internal topics within a CreatePartitions response when expansion is attempted without the unsafe config enabled.
- classes
/kafka/core/src/main/scala/kafka/server/ControllerApis.scala → reject request
/kafka/core/src/main/scala/kafka/server/KafkaConfig.scala → Add configuration field for
controller.unstable.allow.internal.coordinator.partition.increasefor ControllerApi.scala to access./kafka/raft/src/main/java/org/apache/kafka/raft/KRaftConfigs.java → Add
controller.unstable.allow.internal.coordinator.partition.increase.
Proposed Changes
We propose to enforce a validation check in the Controller API layer to reject CreatePartitions requests targeting internal topics,
while allowing other valid requests in the same batch to proceed.
1. Configuration
We will introduce a new configuration to control this guardrail.
Config Name:
controller.unstable.allow.internal.coordinator.partition.increaseDefinition: Defined in
KRaftConfigs.javaand propagated toKafkaConfig.scala.Default:
false
2. Validation in ControllerApis
The validation logic will be implemented in kafka.server.ControllerApis to "fail fast" before the request reaches the Raft layer or Metadata cache.
In ControllerApis.handleCreatePartitions:
The controller will iterate through the list of topics in the
CreatePartitionsRequest.It will identify internal topics using
Topic.isInternal(name).If
controller.unstable.allow.internal.coordinator.partition.increaseisfalse:Requests for internal topics will be immediately marked with
INVALID_REQUESTerror in the response.Valid (non-internal) topics in the same request will be passed to
controller.createPartitionsfor processing.The final response will merge the results, returning per-topic success or failure codes.
This approach ensures that an accidental inclusion of an internal topic in a bulk operation does not fail the entire batch (supporting partial failures),
aligning with standard Kafka Admin API behavior.
Compatibility, Deprecation, and Migration Plan
Compatibility
- Kafka 4.0 Context: This KIP targets KRaft-only clusters (Zookeeper is deprecated in 4.0).
Behavioral Change (Operational Breaking Change):
This KIP introduces a breaking change in behavior. Previously, expanding internal topics was technically allowed (though unsafe). After this change, such requests will fail with
INVALID_REQUEST.This is a deliberate design choice to prioritize correctness and cluster stability over backward compatibility of an unsafe operation.
- Justification & Mitigation: This change is considered acceptable because Kafka documentation already explicitly advises against modifying internal topics. Furthermore, the impact is mitigated by the opt-in configuration (
controller.unstable.allow.internal.coordinator.partition.increase). Operators who strictly require this capability can explicitly enable it, ensuring that no critical workflows are permanently blocked.
Client Compatibility:
Existing
AdminClientimplementations (Java, Go, Python, etc.) do not need to be updated. They will simply receive a standardINVALID_REQUESTerror code, which is already handled by standard error handling routines. No binary incompatibility is introduced.
Deprecation
No features are being deprecated. This KIP strictly serves as a guardrail.
Migration Plan
No Data Migration Required: This change applies only to metadata operations. Existing data in internal topics is unaffected.
For Administrators:
- Operators who explicitly need to perform this operation may opt in by setting the unsafe controller configuration to true and ensuring it is applied consistently across controllers. Depending on the deployment and configuration management approach, applying this setting may require a controller restart before it takes effect.
Test Plan
Unit Tests (
ControllerApisTest):Batch Handling: Verify that a request containing
[valid-topic, __transaction_state]returnsNONE(success) for the valid topic andINVALID_REQUESTfor the internal topic.Config Override: Verify that expansion succeeds when
controller.unstable.allow.internal.coordinator.partition.increaseistrue.
Integration Tests:
Use
AdminClientagainst a KRaft cluster fixture to verify end-to-end blocking behavior.
Rejected Alternatives
1. Alternative Partitioning Strategies (e.g., Consistent Hashing, Rendezvous Hashing)
A theoretically more robust approach would be to adopt a partitioning strategy that minimizes key redistribution, such as Consistent Hashing. rejected reasons are:
Client Dependency: The partitioning logic (
hashCode % partitionCount) is embedded in client libraries (FindCoordinator). Changing this algorithm requires updating the entire client ecosystem, creating a massive compatibility challenge.Incomplete Solution: While Consistent Hashing reduces the scope of data redistribution, it does not eliminate it. Expanding the cluster would still require migrating a subset of keys (transaction states) to new partitions. As noted in the "Automatic Data Redistribution" section, implementing safe state migration for active, log-compacted internal topics is complex and risky, regardless of the hashing algorithm used.
Scope: The primary goal of this KIP is to prevent immediate availability risks in Kafka 4.0, not to redesign the core coordination and partitioning protocol.
2. Automatic Data Redistribution
We considered allowing the expansion and automatically redistributing data to new partitions. rejected reasons are:
Thundering Herd Risk (Metadata Storm): Since the routing logic is modular (
hashCode % N), changingNre-maps almost all keys. This forces every active client to rediscover their coordinator simultaneously. This "Thundering Herd" would cause a massive spike inFindCoordinatorrequests, likely leading to request timeouts and a cluster-wide outage.Complexity: Implementing a safe rebalancer for append-only log compacted topics involves complex concurrency controls that outweigh the benefits.
3. Validation in ReplicationControlManager (Deep Validation)
We considered validating deep inside the metadata layer. rejected reason is:
- Processing invalid requests consumes slots in the Controller Event Queue. Rejecting them at the RPC layer (
ControllerApis) is more efficient and protects the critical path of the KRaft controller.
4. Client-Side or Tool-Level Validation Only
We considered implementing the restriction solely in client-side tools (e.g., kafka-topics.sh) or the AdminClient SDK. rejected reasons are:
Lack of Universal Coverage: The Kafka ecosystem includes diverse client libraries (Java, C/C++, Go, Python, Rust, etc.). Enforcing this logic in every client implementation is impractical and prone to inconsistency.
Version Compatibility: Even if the Java
AdminClientwere updated to block this request, the cluster would remain vulnerable to older client versions that do not have this check.- Server Authority: To guarantee cluster stability, the server must act as the ultimate source of truth and safety. Relying on client-side behavior to prevent critical metadata corruption violates the principle of defensive programming.