Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/bd6y65qro8fj548rpdrm3lxqomqo75x8

Vote thread: https://lists.apache.org/thread/prjobqr014y7q1v91o73zn56ks0vb9wr

JIRA: KAFKA-19368 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KIP-405, it introduced tired storage and supported to store remote log metadata via RemoteLogMetadataManager interface. The remote log metadata contains remote segment start/end offset, size, etc. This information is primarily utilized internally, leaving users without direct access through CLI tools. This limitation can complicate the management of both local and remote logs, as administrators often need to switch between different tools to gather necessary data.

Integrating the remote segment size into the kafka-log-dirs.sh script would greatly enhance operational efficiency. By providing a unified view of both local and remote log sizes in a single location, administrators can streamline their monitoring and management processes.

Public Interfaces

DescribeLogDirsRequest

Add a new field IncludeRemoteInfo. The remote log metadata is returned from RemoteLogMetadataManager interface. For some implementation like built-in TopicBasedRemoteLogMetadataManager, it returns value from local cache without sending further RPCs. However, this is not guaranteed that all implementations retrieve data from local cache. With IncludeRemoteInfo option, users can choose whether to spend extra cost to get remote log information.

DescribeLogDirsRequest
{
  "apiKey": 35,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeLogDirsRequest",
  // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
  // Version 1 is the same as version 0.
  // Version 2 is the first flexible version.
  // Version 3 is the same as version 2 (new field in response).
  // Version 4 is the same as version 2 (new fields in response).
  // Version 5 adds IncludeRemoteInfo field.
  "validVersions": "1-5",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+",
      "about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partition indexes." }
    ]},
    { "name": "IncludeRemoteInfo", "type": "bool", "versions": "5+", "default": false,
       "about": "Whether to include remote partition information." }
  ]
}

DescribeLogDirsResponse

Add new fields RemoteLogSize and OnlyLocalLogSize to DescribeLogDirsPatition. Update PartitionSize description to specify that it refers to the size of local log segments.

The RemoteLogManager periodically copies data to remote storage. A log segment may be uploaded to remote storage even though it hasn’t yet reached the local retention threshold. This means a segment can exist in both local and remote storage simultaneously. For example, suppose local.retention.ms is 12 hours and retention.ms is 48 hours. In this case, the remote storage would contain about 47 hours of data, while the local storage would hold about 11 hours. PartitionSize represents the total size of all local data, so it corresponds to roughly 11 hours of data. RemoteLogSize represents the total size of all remote data, which corresponds to roughly 47 hours of data. OnlyLocalLogSize represents data that has not yet been copied to remote storage. It is therefore a subset of PartitionSize but not included in RemoteLogSize.

DescribeLogDirsResponse
{
  "apiKey": 35,
  "type": "response",
  "name": "DescribeLogDirsResponse",
  // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
  // Starting in version 1, on quota violation, brokers send out responses before throttling.
  // Version 2 is the first flexible version.
  // Version 3 adds the top-level ErrorCode field
  // Version 4 adds the TotalBytes and UsableBytes fields
  // Version 5 adds RemoteLogSize and OnlyLocalLogSize field
  "validVersions": "1-5",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "3+",
      "ignorable": true, "about": "The error code, or 0 if there was no error." },
    { "name": "Results", "type": "[]DescribeLogDirsResult", "versions": "0+",
      "about": "The log directories.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "LogDir", "type": "string", "versions": "0+",
        "about": "The absolute log directory path." },
      { "name": "Topics", "type": "[]DescribeLogDirsTopic", "versions": "0+",
        "about": "The topics.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]DescribeLogDirsPartition", "versions": "0+",
          "about": "The partitions.", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
          { "name": "PartitionSize", "type": "int64", "versions": "0+",
            "about": "The size of the local log segments in this partition in bytes." },
          { "name": "OffsetLag", "type": "int64", "versions": "0+",
            "about": "The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or current replica's LEO (if it is the future log for the partition)." },
          { "name": "IsFutureKey", "type": "bool", "versions": "0+",
            "about": "True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of the replica in the future." },
          { "name": "RemoteLogSize", "type": "int64", "versions": "5+", "ignorable": true, "default": "-1",
            "about": "The size of the remote log segments for this partition, in bytes. Note that some of these segments may still be present in the broker’s local storage." },
          { "name":  "OnlyLocalLogSize", "type":  "int64", "versions": "5+", "ignorable": true, "default": "-1",
            "about": "The size of the log segments stored only in the broker’s local storage for this partition, in bytes. This excludes any data that has been offloaded to remote storage." }]}
      ]},
      { "name": "TotalBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1",
        "about": "The total size in bytes of the volume the log directory is in."
      },
      { "name": "UsableBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1",
        "about": "The usable size in bytes of the volume the log directory is in."
      }
    ]}
  ]
}

DescribeLogDirsOptions

Add a new field includeRemoteInfo.

DescribeLogDirsOptions
/**
 * Options for {@link Admin#describeLogDirs(Collection)}
 */
public class DescribeLogDirsOptions extends AbstractOptions<DescribeLogDirsOptions> {
    private boolean includeRemoteInfo = false;

    public DescribeLogDirsOptions includeRemoteInfo(boolean includeRemoteInfo) {
        this.includeRemoteInfo = includeRemoteInfo;
        return this;
    }

    public boolean includeRemoteInfo() {
        return includeRemoteInfo;
    }
}

ReplicaInfo

Add new fields remoteSize and onlyLocalSize.

ReplicaInfo
/**
 * A description of a replica on a particular broker.
 */
public class ReplicaInfo {

    private final long size;
    private final long offsetLag;
    private final boolean isFuture;
    private final OptionalLong remoteSize;
    private final OptionalLong onlyLocalSize;

    public ReplicaInfo(long size, long offsetLag, boolean isFuture, long remoteSize, long onlyLocalSize) {
        this.size = size;
        this.offsetLag = offsetLag;
        this.isFuture = isFuture;
        this.remoteSize = (remoteSize == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(remoteSize);
        this.onlyLocalSize = (onlyLocalSize == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(onlyLocalSize);
    }

    /**
     * The total size of the log segments in this replica in bytes.
     */
    public long size() {
        return size;
    }

    /**
     * The lag of the log's LEO with respect to the partition's
     * high watermark (if it is the current log for the partition)
     * or the current replica's LEO (if it is the {@linkplain #isFuture() future log}
     * for the partition).
     */
    public long offsetLag() {
        return offsetLag;
    }

    /**
     * Whether this replica has been created by a AlterReplicaLogDirsRequest
     * but not yet replaced the current replica on the broker.
     *
     * @return true if this log is created by AlterReplicaLogDirsRequest and will replace the current log
     * of the replica at some time in the future.
     */
    public boolean isFuture() {
        return isFuture;
    }

    /**
     * The size of the remote log segments for this partition, in bytes.
     * Note that some of these segments may still be present in the broker’s local storage. 
     */
    public OptionalLong remoteSize() {
        return remoteSize;
    }

    /**
     * The size of the log segments stored only in the broker’s local storage for this partition, in bytes.
     * This excludes any data that has been offloaded to remote storage.
     */
    public OptionalLong onlyLocalSize() {
        return onlyLocalSize;
    }

    @Override
    public String toString() {
        return "ReplicaInfo(" +
                "size=" + size +
                ", offsetLag=" + offsetLag +
                ", isFuture=" + isFuture +
                ", remoteSize=" + remoteSize +
                ", onlyLocalSize=" + onlyLocalSize +
                ')';
    }
}

Proposed Changes

kafka-log-dirs.sh

--describe --topic-list —include-remote-info

Add —include-remote-info option and show remoteSize and onlyLocalSize in result.

> ./bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list my-topic --include-remote-info
{"brokers":[{"broker":1,"logDirs":[{"partitions":[{"partition":"my-topic-0","size":6811202,"offsetLag":0,"isFuture":false,"remoteSize":1538476, "onlyLocalSize":5272726}],"error":null,"logDir":"/tmp/kraft-combined-logs"}]}],"version":1}

Compatibility, Deprecation, and Migration Plan

This KIP adds new fields to DescribeLogDirs RPC which doesn’t break compatibility with old versions.

Test Plan

  • Test kafka-log-dirs.sh shows remote and only local size if there is --include-remote-info option.

  • Test kafka-log-dirs.sh doesn’t show remote and only local size if there is no --include-remote-info option.

  • Test DescribeLogDirs RPC returns remote partition size if IncludeRemoteInfo is true.

  • Test DescribeLogDirs RPC returns default remote partition size if IncludeRemoteInfo is false.

Rejected Alternatives

N/A

  • No labels