Status

Current state: Under Discussion

Discussion thread: here

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When creating topics or partitions, the Kafka controller has to pick brokers to host the new partitions. The current placement logic is based on a round robin algorithm and supports rack awareness. This works well in most cases, but it does not take into account any operations planned, in progress, or recently completed on the cluster.

For example if we have a 3 broker cluster that is at capacity and add 3 new brokers, the placement logic will assign partitions on all 6 brokers. Ideally we’d like new partitions to be placed in priority on the new brokers.

Another issue happens if we want to scale down a cluster. Before decommissioning a broker an operator reassigns its partitions onto other brokers. But if new partitions are created, there isn’t a mechanism to prevent Kafka from placing them on that broker. In clusters with frequent topic creations, this makes the process of decommissioning brokers painful.

Similar scenarios happen when scaling storage in JBOD environments. Unlike the partition assignment, the selection of the log directory to use to host a new partition does not use round robin but instead the directory with the least amount of partitions is picked. If the disk sizes are not homogeneous, or if some partitions are much larger than others, this forces operators to regularly shuffle partitions between log directories. This also makes the process of removing a disk painful as removing partitions makes the log directory more likely to host any new partitions being created.


This KIP aims at improving the following scenarios:

  • When adding brokers to a cluster, Kafka currently does not always place new partitions on new brokers
  • When removing brokers/log directories, there is no way to prevent Kafka from placing new replicas on these brokers/log directories

It will also serve as a basis for the decommissioning brokers process. For that operation our documentation states that “we plan to add tooling support for decommissioning brokers in the future.” since 2015.

Proposed Changes

This KIP proposes introducing the concept of “cordoned” log directories. This is reusing the “cordon” terminology from Kubernetes. When a log directory is cordoned, it still fully functions but no new partitions can be allocated on it. If all the log directories of a broker are cordoned, the broker is effectively cordoned and no new partitions can be assigned to that broker.

Cluster operators will be able to set the cordoned.log.dirs configuration on each broker. This can be set in the broker properties files when it is started and updated at runtime via the Admin client or using the kafka-configs.sh tool.

For example, if broker 0 has the following configuration:
log.dirs=/tmp/kraft-broker0-logs0,/tmp/kraft-broker0-logs1

You can cordon /tmp/kraft-broker0-logs0 using:
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config cordoned.log.dirs=/tmp/kraft-broker0-logs0 --entity-type brokers --entity-name 0

You can then use the kafka-configs.sh tool to described the cordoned log directories:
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers --entity-name 0
Dynamic configs for broker 0 are:
  cordoned.log.dirs=/tmp/kraft-broker0-logs0 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:cordoned.log.dirs=/tmp/kraft-broker0-logs0}

You can use the same mechanisms to uncordon log directories. Either update the broker's properties file and restart the broker, or use the kafka-configs.sh tool to update the cordoned.log.dirs configuration.


When the configuration is set on a broker, the broker will include the Uuids of cordoned log directories when it registers to the controller and also include them in each heartbeat request it sends to the controller to reflect potential dynamic updates.

The controller uses the ClusterDescriber interface to let the replica placer determine the usable brokers and the default log directory partitions should be placed on. This interface has 2 methods whose behavior will be updated:
- usableBrokers(): This method will only return brokers that have at least one log directory not cordoned.
- defaultDir(): This method will return the Uuid of a log directory that is not cordoned.

In the assignment the controller computes, the target log directory is only a hint. As mentioned in the motivation section, brokers actually pick actual log directory to use. Even if a broker does not pick the directory hinted by the controller, only a log directory not currently cordoned will be selected.

Error handling

CreateTopics/CreatePartitions requests

When creating a new topic or partitions without explicit partition assignments, if not enough brokers are usable to satisfy the requested replication factor, the INVALID_REPLICATION_FACTOR error will be returned. This error is already returned today is not enough brokers are registered. The error message will be adjusted to mention this could also be due to brokers not having uncordoned log directories as the current error message only mentioned the number of registered brokers.

When creating a new topic or partitions with explicit partition assignments, if any of the specified brokers don't have uncordoned log directories, the INVALID_REPLICA_ASSIGNMENT error will be returned. This error is already returned if the broker Ids are not registered in the cluster. The error message will be adjusted to specify the reason (not available log directories).

AlterPartitionReassignments request

When reassigning partitions between brokers, if the request places partitions on brokers with no uncordoned log directories, the INELIGIBLE_REPLICA will be returned. Currently the controller also logs an INFO message with the reason. This log will be updated to provide the correct reason in case the request is rejected because of cordoned log directories.

AlterReplicaLogDirs request

When reassigning partitions onto a cordoned log directory, the KAFKA_STORAGE_ERROR error will be returned with a message indicating the target directory is cordoned. This error is already returned today if the target directory is offline.

Public Interfaces

This KIP introduces a new broker configuration, new API versions and new metadata records versions.

Configurations

This KIP proposes adding a new broker configuration:

Name: cordoned.log.dirs
Documentation: A comma-separated list of the directories that are cordoned. Entries in this list must be entries in log.dirs or log.dir configuration.
Type: string
Default: empty string
Importance: low
Update Mode: per-broker

API Requests/Responses

The following requests and responses will be updated:

  • BrokerRegistrationRequest v4 adds the CordonedLogDirs field
{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-4",
  "flexibleVersions": "0+",
  "fields": [      
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "4+", 
       "about": "Log directories that are cordoned.", "ignorable": true }
  ]
}
  • BrokerRegistrationResponse v4 adds no new fields

  • BrokerHeartbeatRequest v1 adds the CordonedLogDirs flexible field
{
  "apiKey": 63,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions": "1+", "taggedVersions": "2+", 
       "tag": "1", "about": "Log directories that are cordoned." }
  ]
}
  • BrokerHeartbeatResponse v1 adds no new fields

Metadata Records

Updating these metadata records ensures that the cordoned log directories of each broker are persisted in the metadata log. So if a broker is offline, its cordoned log directories are still available to the controller when assigning partitions.

  • BrokerRegistrationChangeRecord v3 adds the CordonedLogDirs field
{
  "apiKey": 17,
  "type": "metadata",
  "name": "BrokerRegistrationChangeRecord",
  "validVersions": "0-3",
  "flexibleVersions": "0+",
  "fields": [
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "3+", "taggedVersions": "3+", 
       "tag": "3", "about": "Log directories that are cordoned." }
  ]
}
  • RegisterBrokerRecord v4 adds the CordonedLogDirs field
{
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord",
  "validVersions": "0-4",
  "flexibleVersions": "0+",
  "fields": [
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "4+", "taggedVersions": "4+", 
       "tag": "1", "about": "Log directories that are cordoned." }
  ]
}
  • DescribeLogDirsRequest v5 adds no new fields
  • DescribeLogDirsResponse v5 adds the IsCordoned field
{
  "apiKey": 35,
  "type": "response",
  "name": "DescribeLogDirsResponse",
  // Starting in version 1, on quota violation, brokers send out responses before throttling.
  "validVersions": "0-5",
  // Version 2 is the first flexible version.
  // Version 3 adds the top-level ErrorCode field
  // Version 4 adds the TotalBytes and UsableBytes fields
  // Version 5 adds the IsCordoned fields
  "flexibleVersions": "2+",
  "fields": [
      ...
      { "name": "IsCordoned", "type": "bool", "versions": "5+", "ignorable": true, "default": false,
        "about": "True if this log directory is cordoned."
      }
    ]}
  ]
}

Admin API

When describing log directories via the Admin.describeLogDirs() method, you get LogDirDescription objects. A new method will be added to LogDirDescription to tell whether the log directory is cordoned:

/**
 * A description of a log directory on a particular broker.
 */
public class LogDirDescription {

    ...

    /**
     * Whether this log directory is cordoned or not.
     */
    public boolean isCordoned() {
       ...
    }
}

The output of the LogDirsCommand tool will also be updated to show if a log directory is cordoned or not.

Compatibility, Deprecation, and Migration Plan

This feature is enabled by setting the new configuration, cordoned.log.dirs. If it is not set there are no changes.

Test Plan

This KIP will be tested using unittest, integration tests.

Rejected Alternatives

  • KIP-660: Pluggable ReplicaPlacer: This proposed making the ReplicaPlacer pluggable and allow users to implement their own. The main issue was that in order to make an intelligent placement, the placer would need to have a full view of the cluster which is potentially very large. Implementations would potentially be complex and also need to be fast to not hang the controller thread. For these reasons, this proposal was rejected.


  • No labels