  KIP-511: Collect and Expose Client's Name and Version in the Brokers
Operators of Apache Kafka clusters have literally no information about the clients connected to their clusters. Having basic information about the connected clients such as their name and their version could tremendously help them to 1) troubleshoot misbehaving clients; or 2) understand the impact of a broker upgrade to their clients and reach them out to inform them proactively.

Public Interfaces

ApiVersions Request/Response

ApiVersionsRequest is bumped to version 3 with two new fields.

  "apiKey": 18,
  "type": "request",
  "name": "ApiVersionsRequest",
  "validVersions": "0-3",
  // Versions 0 through 2 of ApiVersionsRequest are the same.
  // Starting in version 3, ClientName and ClientVersion are present. 
  "fields": [
	{"name": "ClientName", "type": "string", "versions": "3+", "about": "The name of the client."},
	{"name": "ClientVersion", "type": "string", "versions": "3+", "about": "The version of the client."}

ApiVersionsResponse is bumped to version 3 but does not have any changes in the schema.

  "apiKey": 18,
  "type": "response",
  "name": "ApiVersionsResponse",
  // Version 1 adds throttle time to the response.
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  // Version 3 is similar to version 2. 
  "validVersions": "0-3",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "Index", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }


We will add few metrics in the broker to surface information about the connected clients.

MetricTypeDescriptionCan be plotted?
kafka.server:type=ClientMetrics,name=ConnectedClientsGauge<Integer>The number of client connected.Yes
kafka.server:type=ClientMetrics,name=ConnectedClients,clientname=([-.\w]+),clientversion=([-.\w]+)Gauge<Integer>The number of client connected, broken down by clientname and clientversion. It gives an overview of the clients.Yes
kafka.server:type=ClientMetrics,name=ConnectionsGauge<List<Map<String, String>>

The clients connected to the broker where each Map represents a connection with the following metadata:

  • ClientId
  • ClientName
  • ClientVersion
  • ClientAddress
  • Principal
  • Listener
  • SecurityProtocol
No - Operator can get the active connections via JMX by using a tool such as jmxterm

Request Log

While the Request Log is not a public interface, it is worth mentioning that we will enrich it with the Client Name and the Client Version.

[2019-07-02 14:11:16,137] DEBUG Completed request:RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, correlationId=11) -- {coordinator_key=console-consumer-17661,coordinator_type=0},response:{throttle_time_ms=0,error_code=15,error_message=null,coordinator={node_id=-1,host=,port=-1}} from connection;totalTime:3.187,requestQueueTime:0.137,localTime:2.899,remoteTime:0.0,throttleTime:0.098,responseQueueTime:0.048,sendTime:0.124,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientName:java,clientVersion:2.2.0 (kafka.request.logger)

Proposed Changes

The idea is to re-use the existing ApiVersions Request to provide the name and the version of the client to the broker.

The client is responsible to provide its name and its version. For the Java client, the idea is to define two constants in the code to store its name and its version. If possible, the version will be set automatically based on metadata coming from gradle (or the repo itself) to avoid having to do manual changes.

In the broker, the name and the version of the client will be attached to the connection alongside already existing metadata such as the principal. This makes them reusable to later purposes (e.g. extending existing metrics).

Metrics will be added and the request log will be extended to make the metadata available to the operators.

Compatibility, Deprecation, and Migration Plan

What impact (if any) will there be on existing users?

Existing users extracting and parsing the Request Log may have to update their parsing logic to accommodate the new fields.

Rejected Alternatives

Put clientName and clientVersion in the RequestHeader

clientName and clientVersion could be sent in every request alongside to the clientId in the header. While this would be fairly simple to implement once KIP-482 is implemented, it would make adding more metadata in the future hard and would wast few bytes in every request for something which does not change within a session.

Put clientName and clientVersion in the RequestHeader but provide it only once

clientName and clientVersion could be added to the RequestHeader but sent only in the first request to save bytes in the subsequent requests. Concretely, it means sending it in the ApiVersionsRequest in order to have the info as soon as possible in the broker. Why not putting it in the ApiVersionsRequest directly? Moreover, it would make the implementation of a client ambiguous.

Add a new request to communicate the client metadata to the broker

Instead of piggy backing on the ApiVersionsRequest, we could implement a new Request/Response only for this purpose. This request would need to be sent as early as possible when the connection is established in order to have the information in the broker. Concretely, it means that it would be sent right after the ApiVersionsRequest/Response round trip and before any other request is sent. It would add another round trip to the broker before the client can proceed with its regular stuff. It also would require to be done before the authentication (TLS AuthN aside) and thus requiring specific treatment, similarly to the ApiVersionsRequest.

