Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

ApiVersions Request/Response

ApiVersionsRequest is bumped to version 3 with two new fields. ApiVersionsRequest version is a flexible version (KIP-482: The Kafka Protocol should Support Optional Tagged Fields).

Code Block
languagejs
{
  "apiKey": 18,
  "type": "request",
  "name": "ApiVersionsRequest",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  // Versions 0 through 2 of ApiVersionsRequest are the same.
  // Version 3 is the first flexible version and adds ClientSoftwareName and ClientSoftwareVersion.
  "fields": [
	{"name": "ClientSoftwareName", "type": "string", "versions": "3+", "about": "The name of the client."},
	{"name": "ClientSoftwareVersion", "type": "string", "versions": "3+", "about": "The version of the client."}
  ]
}

ApiVersionsResponse is bumped to version 3 but does not have any changes in the schema. Note that ApiVersionsResponse is not a flexible version. This is necessary because the client must look at a fixed offset to find the error code, regardless of the response version, to remain backward compatible.

Code Block
languagejs
{
  "apiKey": 18,
  "type": "response",
  "name": "ApiVersionsResponse",
  // Version 1 adds throttle time to the response.
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  // Version 3 is the same as version 2
  "validVersions": "0-3",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "Index", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }
    ]},
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
  ]
}

`ErrorsErrors.INVALID_REQUEST` REQUEST is added.

Code Block
public enum Errors {
    ...
    INVALID_REQUEST(XX, "The validation of the request has failed.", InvalidRequestException::new);
    ...
}
 
public class InvalidRequestException extends ApiException {
    public InvalidRequestException(String message) {
        super(message);
    }
}

...

MetricTypeDescriptionCan be plotted?
kafka.server:type=ClientMetrics,name=ConnectedClientsGauge<Integer>The total number of client connected.Yes
kafka.server:type=ClientMetrics,name=ConnectedClients,softwarename=([\.\-_a-zA-Z0-9])+,softwareversion=([\.\-_a-zA-Z0-9])+Gauge<Integer>The number of client connected, broken down by softwarename and softwareversion. It gives an overview of the clients.
The metric will be removed when it goes back to zero - when the all the clients with a given name and version are disconnected.
Yes
kafka.server:type=ClientMetrics,name=ConnectionsGauge<List<Map<String, String>>

The clients connected to the broker where each Map represents a connection with the following metadata:

  • ClientId
  • ClientSoftwareName
  • ClientSoftwareVersion
  • ClientAddress
  • Principal
  • Listener
  • SecurityProtocol
No - Operator can get the active connections via JMX by using a tool such as jmxterm

Request Log

While the Request Log is not a public interface, it is worth mentioning that we will enrich it with the Client Name and the Client Version.

Code Block
languagetext
[2019-07-02 14:11:16,137] DEBUG Completed request:RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, correlationId=11) -- {coordinator_key=console-consumer-17661,coordinator_type=0},response:{throttle_time_ms=0,error_code=15,error_message=null,coordinator={node_id=-1,host=,port=-1}} from connection 192.168.12.241:9092-192.168.12.241:52149-3;totalTime:3.187,requestQueueTime:0.137,localTime:2.899,remoteTime:0.0,throttleTime:0.098,responseQueueTime:0.048,sendTime:0.124,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientSoftwareName:java,clientSoftwareVersion:2.2.0 (kafka.request.logger)

Proposed Changes

The idea is to re-use the existing ApiVersions Request ApiVersionsRequest to provide the name and the version of the client to the broker. Clients are responsible to provide their name and version.

Broker

ApiVersions Request/Response Handling

The client does not know which ApiVersions versions the broker supports as the ApiVersions is used for this purpose. Today, the client sends an ApiVersionsRequest (AVR) with  with the latest schema it is aware of. The broker handles it with the correct version if it knows it or sends back an ApiVersionsResponse an ApiVersionsResponse v0 with an `UNSUPPORTEDUNSUPPORTED_VERSION` error VERSION error to the client if it doesn't. When the client receives such error, it retries the whole process with the ApiVersionsRequest v0. It means that the broker won't get any additional information about the client if the client uses a newer version that the broker doesn't know about. To circumvent this, we propose to provide the supported version of the ApiVersionsRequest in the response sent back to the client by populating the existing `apiapi_versions` field versions field when the version is not supported (ErrorCode equals to `UNSUPPORTEDUNSUPPORTED_VERSION`VERSION). This enables the client to use the latest version supported by the broker instead of defaulting to version 0.

At the moment, the ApiVersionsRequest is handled in two different places in the broker: 1) in the SaslServerAuthenticator (when used); and 2) in the KafkaApis. Both places will be updated to ensure that all clients work. We have decided to not refactor the handling of the ApiVersionsRequest for now and to leave it for further improvements.  

...

We propose to validate the client name and the client version with the following regular expression: ([\.\-a-zA-Z0-9])+. The `INVALIDINVALID_REQUEST` REQUEST error is returned to the client if the validation fails. When the client receives an `INVALIDan INVALID_REQUEST`REQUEST, it must error out and close the connection.

...

As mentioned earlier, when the client receives an `UNSUPPORTEDUNSUPPORTED_VERSION` errorVERSION error, it will use the version provided in the `apiapi_versions` versions field of the ApiVersionsResponse and fail back to the higher version know by the broker  instead of defaulting to version 0. When the client receives an `INVALIDINVALID_REQUEST` errorREQUEST error, it will error out and close the connection.

When SASL is used, the (Java) client sends two ApiVersionsRequest to the broker. The first one is sent by the `SaslClientAuthenticator` and SaslClientAuthenticator and the second one is send by the NetworkClient when the `KafkaChannel` KafkaChannel is established. The `SaslClientAuthenticator` always SaslClientAuthenticator always sends version 0 of the AVR. We have decided to not change this for now and to only update the second call which always happens. The reasoning behind this choice is to avoid multiplying the round trip when an unknown version is used by the client, version 0 always works.

...