Status
Current state: Accepted
Discussion thread: here
JIRA:
-
KAFKA-10120Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Classes in org.apache.kafka.common.requests
are not considered part of the supported public API. Unfortunately the DescribeLogDirsResponse.LogDirInfo
class from this package is used as the declared return type of the all()
and values()
methods of DescribeLogDirsResult
, which is in the public API. Furthermore the DescribeLogDirsResponse.ReplicaInfo
and Errors
classes are reachable from the public API of LogDirInfo
.
This means that clients are exposed to classes which are supposed to be an implementation detail and Javadoc for these classes is not published
(e.g. see https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/admin/DescribeLogDirsResult.html).
To address this new types (in org.apache.kafka.clients.admin
) need to be created to represent the same information and either:
- use the
LogDirInfo
replacement in the return type of two new methods, deprecating the existing methods and eventually removing the deprecated methods in some future Kafka X.0 release, or - change the return type of the two existing methods in some future Kafka X.0 release. In other words break binary and source compatibility and give users no notice of the change until they upgrade to X.0.
The former is preferable from a compatibility point of view, but ultimately results in method names which are inconsistent with the rest of the *Result
classes in the Admin
client (which are otherwise quite consistent in their use of all()
and values()
).
It should be noted that DescribeLogDirsResult
is annotated with @InterfaceStability.Evolving
and described in Admin
as follows:
This client was introduced in 0.11.0.0 and the API is still evolving. We will try to evolve the API in a compatible manner, but we reserve the right to make breaking changes in minor releases, if necessary. We will update the InterfaceStability annotation and this notice once the API is considered stable.
That warning notwithstanding, this KIP proposes to follow the former route, preferring client compatibility.
Public Interfaces
The existing DescribeLogDirsResult
methods values()
and all()
will be deprecated and DescribeLogDirsResult
will get two new methods:
public class DescribeLogDirsResult { // ... // Deprecated: /** * Return a map from brokerId to future which can be used to check the information of partitions on each individual broker * Deprecated Since Kafka 2.7. Use {@link descriptions()}. */ @Deprecated public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values() { return futures; } /** * Return a future which succeeds only if all the brokers have responded without error * Deprecated Since Kafka 2.7. Use {@link allDescriptions()}. */ @Deprecated public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all() { // Added: /** * Return a map from brokerId to future which can be used to check the information of partitions on each individual broker */ public Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions() { return futures; } /** * Return a future which succeeds only if all the brokers have responded without error */ public KafkaFuture<Map<Integer, Map<String, LogDirDescription>>> allDescriptions() { }
Two new classes will be added to org.apache.kafka.clients.admin
:
public class LogDirDescription { private final Map<TopicPartition, ReplicaInfo> replicaInfos; private final Errors error; LogDirDescription(Errors error, Map<TopicPartition, ReplicaInfo> replicaInfos) { this.replicaInfos = replicaInfos; } /** * A KafkaStorageException if this log directory is offline, * possibly some other exception if there were problems describing the log directory * or null if the directory is online. */ public ApiException error() { // ... } /** * A map from topic partition to replica information for that partition * in this log directory. */ public Map<TopicPartition, ReplicaInfo> repliaInfos() { return unmodifiableMap(replicaInfos); } @Override public String toString() { // ... } } public class ReplicaInfo { private final long size; private final long offsetLag; private final boolean isFuture; public ReplicaInfo(long size, long offsetLag, boolean isFuture) { this.size = size; this.offsetLag = offsetLag; this.isFuture = isFuture; } /** * The total size of the log segments in this replica in bytes. */ public long size() { return size; } /** * The lag of the log's LEO with respect to the partition's * high watermark (if it is the current log for the partition) * or the current replica's LEO (if it is the {@linkplain #isFuture() future log} * for the partition). */ public long offsetLag() { return offsetLag; } /** * Whether this replica has been created by a AlterReplicaLogDirsRequest * but not yet replaced the current replica on the broker. * @return true if this log is created by AlterReplicaLogDirsRequest and will replace the current log of the replica in the future. public boolean isFuture() { return isFuture; } @Override public String toString() { // ... }
Proposed Changes
In addition to that's described in the changes to the public interface, the kafka.admin.LogDirsCommand
will be fixed to use the new methods.
Compatibility, Deprecation, and Migration Plan
This change is source and binary compatible.