You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Draft

Discussion thread: TBD

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When creating topics or partitions, the Kafka controller has to pick brokers to host the new partitions. The current placement logic is based on a round robin algorithm and supports rack awareness. This works well in most cases, but it does not take into account any operations planned, in progress, or recently completed on the cluster.

For example if we have a 3 broker cluster that is at capacity and add 3 new brokers, the placement logic will assign partitions on all 6 brokers. Ideally we’d like new partitions to be placed in priority on the new brokers.

Another issue happens if we want to scale down a cluster. Before decommissioning a broker an operator reassigns its partitions onto other brokers. But if new partitions are created, there isn’t a mechanism to prevent Kafka from placing them on that broker. In clusters with frequent topic creations, this makes the process of decommissioning brokers painful.

Similar scenarios happen when scaling storage in JBOD environments. Unlike the partition assignment, the selection of the log directory to use to host a new partition does not use round robin but instead the directory with the least amount of partitions is picked. If the disk sizes are not homogeneous, or if some partitions are much larger than others, this forces operators to regularly shuffle partitions between log directories. This also makes the process of removing a disk painful as removing partitions makes the log directory more likely to host any new partitions being created.


This KIP aims at improving the following scenarios:

  • When adding brokers to a cluster, Kafka currently does not always place new partitions on new brokers
  • When removing brokers/log directories, there is no way to prevent Kafka from placing new replicas on these brokers/log directories

It will also serve as a basis for the decommissioning brokers process. For that operation our documentation states that “we plan to add tooling support for decommissioning brokers in the future.” since 2015.

Proposed Changes

This KIP proposes introducing the concept of “cordoned” log directories. This is reusing the “cordon” terminology from Kubernetes. When a log directory is cordoned, it still fully functions but no new partitions can be allocated on it. If all the log directories of a broker are cordoned, no new partitions can be assigned to the broker.

Cluster operators will be able to set the cordoned.log.dirs configuration on each broker. This can be set in the broker properties files when it is started and updated at runtime via the Admin client or using the kafka-configs.sh tool.

For example, if broker 0 has the following configuration:
log.dirs=/tmp/kraft-broker0-logs0,/tmp/kraft-broker0-logs1

You can cordon /tmp/kraft-broker0-logs0 using:
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config cordoned.log.dirs=/tmp/kraft-broker0-logs0 --entity-type brokers --entity-name 0

When the configuration is set on a broker, the broker will include the Uuids of cordoned log directories when it registers to the controller and also include them in each heartbeat request it sends to the controller.

The controller uses the ClusterDescriber interface to let the replica placer determine the usable brokers and the default log directory partitions should be placed on. This interface has 2 methods whose behavior will be updated:
- usableBrokers(): This method will only return brokers that have log directories not cordoned.
- defaultDir(): This method will not return the Uuid of a log directory that is cordoned.

Public Interfaces

This KIP introduces a new broker configuration, new API versions and new metadata records versions.

Configurations

This KIP proposes adding a new broker configuration: cordoned.log.dirs.

Documentation: A comma-separated list of the directories that are cordoned. Entries in this list must be entries in log.dirs or log.dir configuration.
Type: string
Default: empty string
Importance: low
Update Mode: per-broker

API Requests/Responses

The following requests and responses will be updated:

  • BrokerRegistrationRequest v4 adds the CordonedLogDirs field


{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-4",
  "flexibleVersions": "0+",
  "fields": [      
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "4+", "about": "Log directories that are cordoned.", "ignorable": true }
  ]
}
  • BrokerRegistrationResponse v4 adds no new fields

  • BrokerHeartbeatRequest v1 adds the CordonedLogDirs flexible field
{
  "apiKey": 63,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions": "1+", "taggedVersions": "2+", "tag": "1", "about": "Log directories that are cordoned." }
  ]
}
  • BrokerHeartbeatResponse v1 adds no new fields

Metadata Records

  • BrokerRegistrationChangeRecord v3 adds the CordonedLogDirs field
{
  "apiKey": 17,
  "type": "metadata",
  "name": "BrokerRegistrationChangeRecord",
  "validVersions": "0-3",
  "flexibleVersions": "0+",
  "fields": [
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "3+", "taggedVersions": "3+", "tag": "3", "about": "Log directories configured in this broker which are cordoned." }
  ]
}
  • RegisterBrokerRecord v4 adds the CordonedLogDirs field
{
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord",
  "validVersions": "0-4",
  "flexibleVersions": "0+",
  "fields": [
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "4+", "taggedVersions": "4+", "tag": "1", "about": "Log directories configured in this broker which are cordoned." }
  ]
}

Compatibility, Deprecation, and Migration Plan

This feature is enabled by setting the new configuration, cordoned.log.dirs. If it is not set there are no changes.

Test Plan

This KIP will be tested using unittest, integration tests.

Rejected Alternatives


  • No labels