Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: DraftUnder Discussion

Discussion thread: TBD here

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This KIP proposes introducing the concept of “cordoned” log directories. This is reusing the “cordon” terminology from Kubernetes. When a log directory is cordoned, it still fully functions but no new partitions can be allocated on it. If all the log directories of a broker are cordoned, the broker is effectively cordoned and no new partitions can be assigned to the that broker.

Cluster operators will be able to set the cordoned.log.dirs configuration on each broker. This can be set in the broker properties files when it is started and updated at runtime via the Admin client or using the kafka-configs.sh tool.

...

You can cordon /tmp/kraft-broker0-logs0 using:
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config cordoned.log.dirs=/tmp/kraft-broker0-logs0 --entity-type brokers --entity-name 0

You can then use the kafka-configs.sh tool to described the cordoned log directories:
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers --entity-name 0
Dynamic configs for broker 0 are:
  cordoned.log.dirs=/tmp/kraft-broker0-logs0 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:cordoned.log.dirs=/tmp/kraft-broker0-logs0}

You can use the same mechanisms to uncordon log directories. Either update the broker's properties file and restart the broker, or use the kafka-configs.sh tool to update the cordoned.log.dirs configuration.


When the configuration is set on a broker, the broker will include the Uuids of cordoned log directories when it registers to the controller and also include them in each heartbeat request it sends to the controller to reflect potential dynamic updates.

The controller uses the ClusterDescriber interface to let the replica placer determine the usable brokers and the default log directory partitions should be placed on. This interface has 2 methods whose behavior will be updated:
- usableBrokers(): This method will only return brokers that have at least one log directory not cordoned.
- defaultDir(): This method will not return the Uuid of a log directory that is not cordoned.

In the assignment the controller computes, the target log directory is only a hint. As mentioned in the motivation section, brokers actually pick actual log directory to use. Even if a broker does not pick the directory hinted by the controller, only a log directory not currently cordoned will be selected.

Error handling

CreateTopics/CreatePartitions requests

When creating a new topic or partitions without explicit partition assignments, if not enough brokers are usable to satisfy the requested replication factor, the INVALID_REPLICATION_FACTOR error will be returned. This error is already returned today is not enough brokers are registered. The error message will be adjusted to mention this could also be due to brokers not having uncordoned log directories as the current error message only mentioned the number of registered brokers.

When creating a new topic or partitions with explicit partition assignments, if any of the specified brokers don't have uncordoned log directories, the INVALID_REPLICA_ASSIGNMENT error will be returned. This error is already returned if the broker Ids are not registered in the cluster.

...

The error message will be adjusted to specify the reason (not available log directories).

AlterPartitionReassignments request

When reassigning partitions between brokers, if the request places partitions on brokers with no uncordoned log directories, the INELIGIBLE_REPLICA will be returned. Currently the controller also logs an INFO message with the reason. This log will be updated to provide the correct reason in case the request is rejected because of cordoned log directories.

...

Code Block
languagejs
{
  "apiKey": 63,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0-1",
  "flexibleVersions": "10+",
  "fields": [
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions": "1+", "taggedVersions": "2+", 
       "tag": "1", "about": "Log directories that are cordoned." }
  ]
}
  • BrokerHeartbeatResponse v1 adds no new fields

Metadata Records

Updating these metadata records ensures that the cordoned log directories of each broker are persisted in the metadata log. So if a broker is offline, its cordoned log directories are still available to the controller when assigning partitions.

  • BrokerRegistrationChangeRecord v3 adds the CordonedLogDirs field

...

Code Block
languagejs
{
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord",
  "validVersions": "0-4",
  "flexibleVersions": "0+",
  "fields": [
     ...
     { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "4+", "taggedVersions": "4+", 
       "tag": "1", "about": "Log directories that are cordoned." }
  ]
}
  • DescribeLogDirsRequest v5 adds no new fields
  • DescribeLogDirsResponse v5 adds the IsCordoned field
Code Block
languagejs
{
  "apiKey": 35,
  "type": "response",
  "name": "DescribeLogDirsResponse",
  // Starting in version 1, on quota violation, brokers send out responses before throttling.
  "validVersions": "0-5",
  // Version 2 is the first flexible version.
  // Version 3 adds the top-level ErrorCode field
  // Version 4 adds the TotalBytes and UsableBytes fields
  // Version 5 adds the IsCordoned fields
  "flexibleVersions": "2+",
  "fields": [
      ...
      { "name": "IsCordoned", "type": "bool", "versions": "5+", "ignorable": true, "default": false,
        "about": "True if this log directory is cordoned."
      }
    ]}
  ]
}

Admin API

When describing log directories via the Admin.describeLogDirs() method, you get LogDirDescription objects. A new method will be added to LogDirDescription to tell whether the log directory is cordoned:

Code Block
languagejava
/**
 * A description of a log directory on a particular broker.
 */
public class LogDirDescription {

    ...

    /**
     * Whether this log directory is cordoned or not.
     */
    public boolean isCordoned() {
       ...
    }
}

The output of the LogDirsCommand tool will also be updated to show if a log directory is cordoned or not.

Compatibility, Deprecation, and Migration Plan

...