You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Status

Current stateUnder Discussion

Discussion threadThread

JIRA: KAFKA-8855 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Operators of Apache Kafka clusters have literally no information about the clients connected to their clusters. Having basic information about the connected clients such as their name and their version could tremendously help them to 1) troubleshoot misbehaving clients; or 2) understand the impact of a broker upgrade to their clients and reach them out to inform them proactively.

Public Interfaces

ApiVersions Request/Response

ApiVersionsRequest is bumped to version 3 with two new fields.

{
  "apiKey": 18,
  "type": "request",
  "name": "ApiVersionsRequest",
  "validVersions": "0-3",
  // Versions 0 through 2 of ApiVersionsRequest are the same.
  // Starting in version 3, ClientName and ClientVersion are present. 
  "fields": [
	{"name": "ClientName", "type": "string", "versions": "3+", "about": "The name of the client."},
	{"name": "ClientVersion", "type": "string", "versions": "3+", "about": "The version of the client."}
  ]
}

ApiVersionsResponse is bumped to version 3 but does not have any changes in the schema.

{
  "apiKey": 18,
  "type": "response",
  "name": "ApiVersionsResponse",
  // Version 1 adds throttle time to the response.
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  // Version 3 is similar to version 2. 
  "validVersions": "0-3",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "Index", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }
    ]},
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
  ]
}

Metrics

We will add few metrics in the broker to surface information about the connected clients.

MetricTypeDescriptionCan be plotted?
kafka.server:type=ClientMetrics,name=ConnectedClientsGauge<Integer>The total number of client connected.Yes
kafka.server:type=ClientMetrics,name=ConnectedClients,clientname=([\.\-_a-zA-Z0-9])+,clientversion=([\.\-_a-zA-Z0-9])+Gauge<Integer>The number of client connected, broken down by clientname and clientversion. It gives an overview of the clients.
The metric will be removed when it goes back to zero - when the all the clients with a given name and version are disconnected.
Yes
kafka.server:type=ClientMetrics,name=ConnectionsGauge<List<Map<String, String>>

The clients connected to the broker where each Map represents a connection with the following metadata:

  • ClientId
  • ClientName
  • ClientVersion
  • ClientAddress
  • Principal
  • Listener
  • SecurityProtocol
No - Operator can get the active connections via JMX by using a tool such as jmxterm

Request Log

While the Request Log is not a public interface, it is worth mentioning that we will enrich it with the Client Name and the Client Version.

[2019-07-02 14:11:16,137] DEBUG Completed request:RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, correlationId=11) -- {coordinator_key=console-consumer-17661,coordinator_type=0},response:{throttle_time_ms=0,error_code=15,error_message=null,coordinator={node_id=-1,host=,port=-1}} from connection 192.168.12.241:9092-192.168.12.241:52149-3;totalTime:3.187,requestQueueTime:0.137,localTime:2.899,remoteTime:0.0,throttleTime:0.098,responseQueueTime:0.048,sendTime:0.124,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientName:java,clientVersion:2.2.0 (kafka.request.logger)

Proposed Changes

The idea is to re-use the existing ApiVersions Request to provide the name and the version of the client to the broker. Clients are responsible to provide their name and version.

Broker

ApiVersions Request/Response Handling

The client does not know what ApiVersions versions the brokers support as the ApiVersions is used for this purpose. Today, the client sends an ApiVersionsRequest (AVR) with the latest schema it is aware of. When the broker receives it, it deserializes the AVR with the correct version if it knows it or fail back to version 0 otherwise, and sends back the response with the corresponding version to the client. It means that the broker will ignore any additional information if a newer version is used that it does not know about yet. As an example, if version 4 of AVR would be released and used by the client, the broker wouldn't get the client name and the client version even though they would be passed on the wire.

To circumvent this, we propose to update the logic of the broker to fail back to the latest version it knows instead of using version 0. This is possible if every new version (version N) of the request is always prefixed by its predecessor (version N-1). Oppositely, the client continues to fail back to the oldest version it knows (version 0).

Metadata

We propose to attach the various metadata captured to the connection alongside existing metadata such as the principal or the listener. A registry will be created to store metadata about all the active connections. Connections will be removed when they are closed.

Validation

We propose to validate the client name and the client version with ([\.\-_a-zA-Z0-9])+, and to close the connection and log the error if they are not valid.

Metrics & Log

The various metrics described above will be created based on the metadata available in the connection registry. Metrics will be removed when they are inactive (gauge equals to zero). The request log will be extended to include the metadata collected.

Client

ApiVersions Request/Response Handling

The client continues to use the latest version of the ApiVersionsRequest it knows and continues to fail back to version 0 when it can not parse the ApiVersionsRequest it receives from the broker.

Name and Version

The client uses the version provided in the `kafka/kafka-version.properties` file and the name `apache-kafka-java`.

Compatibility, Deprecation, and Migration Plan

What impact (if any) will there be on existing users?

Existing users extracting and parsing the Request Log may have to update their parsing logic to accommodate the new fields.

Rejected Alternatives

Put clientName and clientVersion in the RequestHeader

clientName and clientVersion could be sent in every request alongside to the clientId in the header. While this would be fairly simple to implement once KIP-482 is implemented, it would make adding more metadata in the future hard and would wast few bytes in every request for something which does not change within a session.

Put clientName and clientVersion in the RequestHeader but provide it only once

clientName and clientVersion could be added to the RequestHeader but sent only in the first request to save bytes in the subsequent requests. Concretely, it means sending it in the ApiVersionsRequest in order to have the info as soon as possible in the broker. Why not putting it in the ApiVersionsRequest directly? Moreover, it would make the implementation of a client ambiguous.

Add a new request to communicate the client metadata to the broker

Instead of piggy backing on the ApiVersionsRequest, we could implement a new Request/Response only for this purpose. This request would need to be sent as early as possible when the connection is established in order to have the information in the broker. Concretely, it means that it would be sent right after the ApiVersionsRequest/Response round trip and before any other request is sent. It would add another round trip to the broker before the client can proceed with its regular stuff. It also would require to be done before the authentication (TLS AuthN aside) and thus requiring specific treatment, similarly to the ApiVersionsRequest.


  • No labels