Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Systems that interface with Kafka, such as management systems and proxies, often need to perform administrative actions. For example, they might need to be able to create or delete topics. Currently, they can't do this without relying on internal Kafka classes, or shell scripts distributed with Kafka. We would like to add a public, stable
AdminClient API that exposes this functionality to JVM-based clients in a well-supported way.
AdminClient will use the KIP-4 wire protocols to communicate with brokers. Because we avoid direct communication with ZooKeeper, the client does not need a ZooKeeper dependency. In fact, once this KIP is implemented, we will be able to lock down Zookeeper security further, by enforcing the invariant that only brokers need to communicate with ZK.
By using the
AdminClient API, clients will avoid being tightly coupled to the implementation details of a particular Kafka version. They will not need to access internal Kafka classes, or parse the output of scripts. They will also gain the benefits of cross-version client compatibility as implemented in KIP-97.
AdminClient will be distributed as part of
kafka-clients.jar. It will provide a Java API for managing Kafka.
AdminClient interface will be in the
org.apache.kafka.clients.admin namespace. The implementation will be in the
KafkaAdminClient class, in the same namespace. The separation between interface and implementation is intended to make the difference between public API and private implementation clearer, and make developing mocks in unit tests easier. This is similar to the divide between
Users will configure the
AdminClient the same way they configure the
Consumer: by supplying a map of keys to values to its constructor. As much as possible, we should reuse the same configuration key names, such as bootstrap.servers, client.id, etc. We should also offer the ability to configure timeouts, buffer sizes, and other networking settings.
AdminClient will provide
CompletableFuture-based APIs that closely reflect the requests which the brokers can handle. The client will be multi-threaded; multiple threads will be able to safely make calls using the same
AdminClient object. When a future fails, its
get() method will throw an
ExceutionException which wraps the underlying exception.
We want to handle errors fully and cleanly in
AdminClient. APIs that require communication with multiple brokers should allow for the possbiility that some brokers will respond and others will not. Any possible return value from the API should be handled. Note that API functions do not throw exceptions. Instead, the
CompletableFuture objects contain the exceptions when necessary.
In general, we want to avoid using internal Kafka classes in the
AdminClient interface. For example, most RPC classes should be considered internal, such as
MetadaResponse. We should be able to change those classes in the future without worrying about breaking users of
AdminClient. Inner classes such as
MetadataResponse#TopicMetadata should also be considered internal, and not exposed in the API of
As mentioned earlier, the
AdminClient will use the KIP-4 wire protocol. This mainly means using
NetworkClient and related RPC classes for the implementation.
This KIP will add only APIs that can be implemented with the existing server-side RPCs. See "New or Changed Public Interfaces" for details. The intention is that we will continue to extend
AdminClient with further KIPs that also add the appropriate server-side functionality is added (such as ACL management.)
New or Changed Public Interfaces
Clients use the administrative client by creating an instance of class
AdminClient, via the
AdminClient#Factory#create function. When the user is done with the
AdminClient, they must call close to release the network sockets and other associated resources of the client.
Just like the consumer and the producer, the admin client will have its own
AdminClientConfig configuration class which extends
Initially, the supported configuration keys will be:
- The boostrap servers as a list of host:port pairs.
- An ID string to pass to the server when making requests.
- The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
- The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used
- The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
- The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
The amount of time to wait before attempting to retry a failed request.
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Close idle connections after the number of milliseconds specified by this config
The security protocol used to communicate with brokers
Migration Plan and Compatibility
AdminClient will use KIP-97 API version negotiation to communicate with older or newer brokers. In cases where an API is not available on an older or newer broker, we will throw an
We should avoid making incompatible changes to the
AdminClient function and class signatures. So for example, if we need to add an additional argument to an API, we should add a new function rather than changing an existing function signature.
We should have an
AdminClientTest integration test which tests creating topics, deleting topics, and listing topics through the API. We can use the
KafkaServerTestHarness to test this efficiently with multiple brokers. For methods which support batches, we should test passing a batch of items to be processed. We should test error conditions as well. We should test the node listing and version getting APIs as well.
Instead of having a futures-based API, we could have a synchronous API. In this API, each function would block rather than returning a Future. However, the Futures-based API can easily be used as a blocking API, simply by calling get() on the Futures which get returned.
We would like to add more functionality to the
AdminClient as soon as it becomes available on the brokers. For example, we would like to add a way of altering topics that already exist. We would also like to add management APIs for ACLs, or the functionality of