Status

Current stateUnder Discussion

Discussion thread: here

JIRA: KAFKA-20029

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Internal topics in Kafka, specifically __transaction_state, __consumer_offsets, and the recently introduced __share_group_state (KIP-932),
rely on a specific hashing mechanism to route keys (transaction IDs, group IDs) to partitions.

For example, the routing logic for the Transaction Coordinator maps a Transaction ID to a specific partition of __transaction_state using the following logic:

Utils.abs(transactionId.hashCode()) % transactionTopicPartitionCount

This logic creates a strict dependency on the transactionTopicPartitionCount.
If the number of partitions changes (e.g., expanding from 50 to 150), the mapping logic changes immediately.

While the Kafka documentation advises against modifying these internal topics,
there is currently no hard guardrail preventing this operation via the Admin API  CreatePartitions.

The Problem: Split-Brain and Data Inconsistency

Allowing dynamic partition expansion for these internal topics leads to critical availability and correctness issues, including split-brain scenarios, orphan transactions, and hanging transactions.

Here is a breakdown of the race condition identified during an expansion scenario (e.g., 50 -> 150 partitions):

  1. Metadata Inconsistency: During the expansion, the metadata cache updates are not atomic across all nodes.

  2. Split-Brain View:

    • Old Broker A (stale metadata): Sees 50 partitions. Calculates hash("A") % 50 -> Partition 27.
    • Newly added Broker B (updated metadata): Sees 150 partitions. Calculates hash("A") % 150 -> Partition 130.
  3. Routing Divergence: A producer sending FindCoordinator for Transaction ID "A" may be routed to Partition 130 or Partition 27 depending on which broker handles the request.

  4. Orphan/Hanging Transactions:

    • If a client interacts with the new coordinator (Partition 130), it creates state there.

    • If the client later interacts with a broker using the old metadata (partition 50), it searches Partition 27.

    • As a result, the transaction state may be written to an unexpected partition, which can lead to orphan transactions that can no longer be correctly committed or aborted.

This issue is not limited to __transaction_state :

  • __consumer_offsets: Partition expansion of __consumer_offsets can cause group IDs to be mapped to different partitions across brokers while metadata is converging. As a result, offset commits, offset fetches, and group coordination operations may fail in a non-deterministic manner depending on which broker handles the request. During this window, it may be impossible to reliably read and write committed offsets, and some groups may become effectively unfindable until the cluster state fully converges

  • __share_group_state: Similar routing logic applies to share groups, risking state inconsistency.

Currently, this is only prevented by documentation "advice," which is insufficient to prevent critical human errors or automated misconfigurations.


Public Interfaces

  • Configuration
    • controller.unstable.allow.internal.coordinator.partition.increase 
      • Type: boolean
      • Default: false
      • Description: When set to true, the controller allows partition expansion for internal topics. This is unsafe.
  • Error Codes
    • INVALID_REQUEST : Returned for specific internal topics within a CreatePartitions response when expansion is attempted without the unsafe config enabled.
  • classes
    • /kafka/core/src/main/scala/kafka/server/ControllerApis.scala → reject request

    • /kafka/core/src/main/scala/kafka/server/KafkaConfig.scala → Add configuration field for controller.unstable.allow.internal.coordinator.partition.increase for ControllerApi.scala to access.

    • /kafka/raft/src/main/java/org/apache/kafka/raft/KRaftConfigs.java → Add controller.unstable.allow.internal.coordinator.partition.increase.

Proposed Changes

We propose to enforce a validation check in the Controller API layer to reject CreatePartitions requests targeting internal topics,
while allowing other valid requests in the same batch to proceed.

1. Configuration

We will introduce a new configuration to control this guardrail.

  • Config Name: controller.unstable.allow.internal.coordinator.partition.increase

  • Definition: Defined in KRaftConfigs.java and propagated to KafkaConfig.scala.

  • Default: false

2. Validation in ControllerApis

The validation logic will be implemented in kafka.server.ControllerApis to "fail fast" before the request reaches the Raft layer or Metadata cache.

In ControllerApis.handleCreatePartitions:

  1. The controller will iterate through the list of topics in the CreatePartitionsRequest.

  2. It will identify internal topics using Topic.isInternal(name).

  3. If controller.unstable.allow.internal.coordinator.partition.increase is false:

    • Requests for internal topics will be immediately marked with INVALID_REQUEST error in the response.

    • Valid (non-internal) topics in the same request will be passed to controller.createPartitions for processing.

    • The final response will merge the results, returning per-topic success or failure codes.

This approach ensures that an accidental inclusion of an internal topic in a bulk operation does not fail the entire batch (supporting partial failures),
aligning with standard Kafka Admin API behavior.

Compatibility, Deprecation, and Migration Plan

Compatibility

  • Kafka 4.0 Context: This KIP targets KRaft-only clusters (Zookeeper is deprecated in 4.0).
  • Behavioral Change (Operational Breaking Change):

    • This KIP introduces a breaking change in behavior. Previously, expanding internal topics was technically allowed (though unsafe). After this change, such requests will fail with INVALID_REQUEST.

    • This is a deliberate design choice to prioritize correctness and cluster stability over backward compatibility of an unsafe operation.

    • Justification & Mitigation: This change is considered acceptable because Kafka documentation already explicitly advises against modifying internal topics. Furthermore, the impact is mitigated by the opt-in configuration (controller.unstable.allow.internal.coordinator.partition.increase). Operators who strictly require this capability can explicitly enable it, ensuring that no critical workflows are permanently blocked.
  • Client Compatibility:

    • Existing AdminClient implementations (Java, Go, Python, etc.) do not need to be updated. They will simply receive a standard INVALID_REQUEST error code, which is already handled by standard error handling routines. No binary incompatibility is introduced.


Deprecation

  • No features are being deprecated. This KIP strictly serves as a guardrail.

Migration Plan

  • No Data Migration Required: This change applies only to metadata operations. Existing data in internal topics is unaffected.

  • For Administrators:

    • Operators who explicitly need to perform this operation may opt in by setting the unsafe controller configuration to true and ensuring it is applied consistently across controllers. Depending on the deployment and configuration management approach, applying this setting may require a controller restart before it takes effect.

Test Plan

  • Unit Tests (ControllerApisTest):

    • Batch Handling: Verify that a request containing [valid-topic, __transaction_state] returns NONE (success) for the valid topic and INVALID_REQUEST for the internal topic.

    • Config Override: Verify that expansion succeeds when controller.unstable.allow.internal.coordinator.partition.increase is true.

  • Integration Tests:

    • Use AdminClient against a KRaft cluster fixture to verify end-to-end blocking behavior.

Rejected Alternatives

1. Alternative Partitioning Strategies (e.g., Consistent Hashing, Rendezvous Hashing)

A theoretically more robust approach would be to adopt a partitioning strategy that minimizes key redistribution, such as Consistent Hashing. rejected reasons are:

  • Client Dependency: The partitioning logic (hashCode % partitionCount) is embedded in client libraries (FindCoordinator). Changing this algorithm requires updating the entire client ecosystem, creating a massive compatibility challenge.

  • Incomplete Solution: While Consistent Hashing reduces the scope of data redistribution, it does not eliminate it. Expanding the cluster would still require migrating a subset of keys (transaction states) to new partitions. As noted in the "Automatic Data Redistribution" section, implementing safe state migration for active, log-compacted internal topics is complex and risky, regardless of the hashing algorithm used.

  • Scope: The primary goal of this KIP is to prevent immediate availability risks in Kafka 4.0, not to redesign the core coordination and partitioning protocol.

2. Automatic Data Redistribution

We considered allowing the expansion and automatically redistributing data to new partitions. rejected reasons are:

  • Thundering Herd Risk (Metadata Storm): Since the routing logic is modular (hashCode % N), changing N re-maps almost all keys. This forces every active client to rediscover their coordinator simultaneously. This "Thundering Herd" would cause a massive spike in FindCoordinator requests, likely leading to request timeouts and a cluster-wide outage.

  • Complexity: Implementing a safe rebalancer for append-only log compacted topics involves complex concurrency controls that outweigh the benefits.

3. Validation in ReplicationControlManager (Deep Validation)

We considered validating deep inside the metadata layer. rejected reason is:

  • Processing invalid requests consumes slots in the Controller Event Queue. Rejecting them at the RPC layer (ControllerApis) is more efficient and protects the critical path of the KRaft controller.

4. Client-Side or Tool-Level Validation Only

We considered implementing the restriction solely in client-side tools (e.g., kafka-topics.sh) or the AdminClient SDK. rejected reasons are:

  • Lack of Universal Coverage: The Kafka ecosystem includes diverse client libraries (Java, C/C++, Go, Python, Rust, etc.). Enforcing this logic in every client implementation is impractical and prone to inconsistency.

  • Version Compatibility: Even if the Java AdminClient were updated to block this request, the cluster would remain vulnerable to older client versions that do not have this check.

  • Server Authority: To guarantee cluster stability, the server must act as the ultimate source of truth and safety. Relying on client-side behavior to prevent critical metadata corruption violates the principle of defensive programming.




  • No labels