Status

Current state: Under discussion

Discussion thread: here

Voting thread (1 of 2): here - KIP reworked following initial vote

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today, all Kafka protocol requests include the client ID. This identifier can be set by the user setting the client configuration property client.id. This is a useful capability, but it has limitations. The client ID was meant for identifying an application, and not the instances of an application. Often, it is not sufficient to identify a particular client instance.

This KIP adds a UUID called the client instance ID into the request header of all Kafka protocol requests. Each client has a different client instance ID, so correlating requests from a particular client becomes much easier. An immediate benefit is that it can be included in request logging for troubleshooting. It is being added to improve traceability and problem determination.

In addition, the client ID is sent on each and every request. Users sometimes have use quite long client IDs, even encoding metadata such as the availability zone into the string. The client ID is then sent on every request, in spite of the fact that the Kafka protocol is connection-oriented and it is really only necessary to send the string on the first request after connection initiation. Sending it repeatedly is wasteful.

Proposed Changes

This KIP proposes the following changes to the Kafka protocol.

Client Instance ID

This KIP proposes adding a UUID called the client instance ID into the request header of all Kafka protocol requests. The client uses the same client instance ID for its connections to every broker throughout its lifetime, even when it rebootstraps and makes new connections. The consistency and uniqueness of the identifiers are both important characteristics.

Version 3 of the RPC request header is introduced which adds the client instance ID. A new version of every RPC is introduced using the v3 request header, with the exception of SaslHandshake  which cannot be bumped due to incorrect version negotiation in the past.

Those familiar with KIP-714 will be aware that it already introduced the client instance ID, so this KIP actually proposes elevating that concept into a universal unique identifier for client instances in all RPCs.

The client instance ID is calculated by the client during the constructor of the client before it makes its initial connection to the cluster. This differs slightly from how KIP-714 initialized the client instance ID. In KIP-714, the client instance ID was created by the broker which responded to a client's first GetTelemetrySubscriptions RPC. Similarly, in KIP-848, the member ID was created by the group coordinator in response to a heartbeat, but subsequently KIP-1082 changed this so that the client created its own member ID. The protocol now favors client-generated IDs so this KIP calculates the client instance ID on the client.

If a connection uses the v3 request header in its first request, it must specify a non-zero client instance ID and it must specify the same client instance ID for all subsequent requests. Note that SaslHandshake is excluded from this checking because it cannot carry a client instance ID. The initial client instance ID for each connection will be cached by the broker for checking (this is an implementation detail, but caching it in the ChannelMetadataRegistry is an option). Once a client has specified a client instance ID in the request header of its first request, any subsequent requests which are missing the client instance ID (with the exception of SaslHandshake  as explained above) or which specify a different value for the client instance ID will be rejected with error code INVALID_REQUEST.

Client ID

This KIP proposes sending the client ID only on the initial request on each connection, and then sending a null client ID for all subsequent requests. This eliminates the unnecessary overhead of repeatedly sending the same client ID string to the broker on every request. The initial client ID for each connection will be cached by the broker (this is an implementation detail, but caching it in the ChannelMetadataRegistry is an option).

This new behavior can only be completely adopted when both client and broker support it. It works as follows:

  • For the initial request regardless of request header version, the client sends the client ID
  • For subsequent requests using the v3 request header (or higher), the client sends a null client ID
  • For subsequent requests using an earlier version of the request header, the client sends the client ID

After this KIP, the broker will assume that the client ID from the initial request applies to all subsequent requests on a connection, and it will ignore the client ID specified on any subsequent requests.

Alignment of identifiers

By adding client instance ID to the request headers, we now had a unique application instance identifier which we can use for other purposes such as the client telemetry client instance ID and the member ID in the group protocols. The client instance ID from the request headers is used as the client telemetry client instance ID; new versions of the KIP-714 RPCs are introduced to ensure this. The alignment of other identifiers is by convention (and the Java client will follow the convention) rather than mandate. In the language of standards, the client SHOULD use the same UUID in request headers as it uses for the member ID in the group protocols. This alignment just makes traceability and problem determination more straightforward.

In the modern group protocol RPCs such as ConsumerGroupHeartbeat  and ShareGroupHeartbeat , the member ID is a string. In practice, it is a UUID which is encoded into a string, but the nature of this conversion is not specified in the protocol and it really is treated as a string in the broker. The Java client uses the org.apache.kafka.common.Uuid  class to generate the member ID and convert it into a string. The client instance ID really is a UUID in the protocol. When the Java code in the broker converts this into a string, it uses the same Java code as the client does for the member ID. As a result, the member ID and client instance ID can trivially be the same when represented as strings, even though they have different data types in the protocol. For traceability and problem determination, this "conventional" alignment works well.

However, at least one other Kafka client implementation uses a slightly different string encoding of the member ID which does not match that generated by org.apache.kafka.common.Uuid . This is valid with regards to KIP-848 and KIP-932, but does unfortunately mean that the member ID does not look the same as the client instance ID, even if they have the same original UUID value in the client.

The following table indicates the behavior of the KIP-714 RPCs before and after this KIP. A client implementing this KIP will always calculate its own client instance ID, which it sends to the broker in the GetTelemetrySubscriptions  request body for v0, or in the request header for v1.

Key:

  • UUID-H - client instance ID sent by client in header, generated by client
  • UUID-R - client instance ID sent by client in request, generated by client
  • UUID-B - client instance ID sent by broker in response, generated by broker

In summary, for GetTelemetrySubscriptions, here are the combinations:


Old brokerNew broker
Old client

GetTelemetrySubscriptions v0

Initial request:

  • request.ClientInstanceID=0
  • response.ClientInstanceId=UUID-B

Subsequent requests:

  • request.ClientInstanceId=UUID-B
  • response.ClientInstanceId=0

GetTelemetrySubscriptions v0

Initial request:

  • request.ClientInstanceID=0
  • response.ClientInstanceId=UUID-B

Subsequent requests:

  • request.ClientInstanceId=UUID-B
  • response.ClientInstanceId=0
New client

GetTelemetrySubscriptions v0

Initial request:

  • request.ClientInstanceId=UUID-R
  • response.ClientInstanceId=0

Subsequent requests:

  • request.ClientInstanceId=UUID-R
  • response.ClientInstanceId=0 

GetTelemetrySubscriptions v1

Initial request:

  • header.ClientInstanceId=UUID-H

Subsequent requests:

  • header.ClientInstanceId=UUID-H

Changes to the Kafka protocol message schemas

In the JSON schema files which describe the RPC request and responses, the "flexibleVersions" property in the request schemas indicates which versions of the requests use the v2 request header. For example, consider the DeleteGroups  request schema prior to this KIP.

{
  "apiKey": 42,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteGroupsRequest",
  "validVersions": "0-2",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "GroupsNames", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The group names to delete." }
  ]
}

This schema describes 3 versions of the request. Request versions 0 and 1 use v1 of the request header, and request version 2 uses v2 of the request header. Note that v0 of the request header is no longer used.

This KIP introduces a new property for the request schemas called "headerVersions" which contains a map from request version ranges to request header versions. The keys of the map can be single versions like "0", closed version ranges like "1-2", and must end with an open version range like "3+". The ranges of the keys must be non-overlapping, continuous and cover the range of valid versions. For example, the DeleteGroups  request schema becomes the following.

{
  "apiKey": 42,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteGroupsRequest",
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "headerVersions": {
    "0-1": "1",
    "2": "2",
    "3+": "3"
  },
  "fields": [
    { "name": "GroupsNames", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The group names to delete." }
  ]
}

The modified schema describes 4 versions of the request. Request versions 0 to 1 use v1 of the request header. Request version 2 uses v2 of the request header. Request version 3 and higher use v3 of the request header, which includes the client instance ID.

In the same way, the "headerVersions"  property is also added to the response schemas. Only versions 0 and 1 of the response header exist.

{
  "apiKey": 42,
  "type": "response",
  "name": "DeleteGroupsResponse",
  // Starting in version 1, on quota violation, brokers send out responses before throttling.
  //
  // Version 2 is the first flexible version.
  "validVersions": "0-2",
  "flexibleVersions": "2+",
  "headerVersions": {
    "0-1": "0",
    "2+": "1"
  },
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Results", "type": "[]DeletableGroupResult", "versions": "0+",
      "about": "The deletion results.", "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true, "entityType": "groupId",
        "about": "The group id." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The deletion error, or 0 if the deletion succeeded." }
    ]}
  ]
}

Public Interfaces

Client API Changes

The client instance ID is calculated during the constructor of the Producer , Consumer , RebalanceConsumer , ShareConsumer and Admin  implementations so there is no need to have a timeout parameter on the accessor method. The following method is added to these interfaces:

public Uuid clientInstanceId()

and then the following method is deprecated for removal in Apache Kafka 5.0:

public Uuid clientInstanceId(Duration timeout)

In a similar vein, the following method in KafkaStreams is added:

public ClientInstanceIds clientInstanceIds()

and then the following method is deprecated for removal in Apache Kafka 5.0:

public ClientInstanceIds clientInstanceIds(Duration timeout)

Kafka Protocol Changes

This KIP introduces new versions of all Kafka RPCs with the exception of SaslHandshake . Apart from the 3 RPCs specifically called out below, the RPC requests are only changed by the addition of a version which uses v3 of the request header, as illustrated using the example of DeleteGroups earlier.

Request Header

This KIP introduces version 3 of the request header which adds the ClientInstanceId field.

{
  "type": "header",
  "name": "RequestHeader",
  // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
  //
  // Version 0 of the RequestHeader is only used by v0 of ControlledShutdownRequest.
  //
  // Version 1 is the first version with ClientId.
  //
  // Version 2 is the first flexible version.
  //
  // Version 3 introduces ClientInstanceId. (KIP-1313)
  "validVersions": "1-3",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "RequestApiKey", "type": "int16", "versions": "0+",
      "about": "The API key of this request." },
    { "name": "RequestApiVersion", "type": "int16", "versions": "0+",
      "about": "The API version of this request." },
    { "name": "CorrelationId", "type": "int32", "versions": "0+",
      "about": "The correlation ID of this request." },

    // The ClientId string must be serialized with the old-style two-byte length prefix.
    // The reason is that older brokers must be able to read the request header for any
    // ApiVersionsRequest, even if it is from a newer version.
    // Since the client is sending the ApiVersionsRequest in order to discover what
    // versions are supported, the client does not know the best version to use.
    { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "flexibleVersions": "none",
      "about": "The client ID, for identifying an application." },
    { "name": "ClientInstanceId", "type": "uuid", "versions": "3+",
      "about": "The client instance ID, for identifying an instance of an application." }
  ]
}

OffsetDelete API

A new version of OffsetDelete  is introduced.

Request

The v1 request uses the v3 request header and it is the first flexible version. None of the versions of this request use the v2 request header.

{
  "apiKey": 47,
  "type": "request",
  "listeners": ["broker"],
  "name": "OffsetDeleteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "headerVersions": {
    "0": "1",
    "1+": "3"
  },
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The unique group identifier." },
    { "name": "Topics", "type": "[]OffsetDeleteRequestTopic", "versions": "0+",
      "about": "The topics to delete offsets for.", "fields": [
        { "name": "Name",  "type": "string",  "versions": "0+", "mapKey": true, "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]OffsetDeleteRequestPartition", "versions": "0+",
          "about": "Each partition to delete offsets for.", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." }
          ]
        }
      ]
    }
  ]
}

Response

The v1 response is the first flexible version.

{
  "apiKey": 47,
  "type": "response",
  "name": "OffsetDeleteResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "headerVersions": {
    "0": "0",
    "1+": "1"
  },
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error." },
    { "name": "ThrottleTimeMs",  "type": "int32",  "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]OffsetDeleteResponseTopic", "versions": "0+",
      "about": "The responses for each topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]OffsetDeleteResponsePartition", "versions": "0+",
          "about": "The responses for each partition in the topic.", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
              "about": "The partition index." },
            { "name": "ErrorCode", "type": "int16", "versions": "0+",
              "about": "The error code, or 0 if there was no error." }
          ]
        }
      ]
    }
  ]
}

GetTelemetrySubscriptions API

A new version of GetTelemetrySubscriptions  is introduced which removes the ClientInstanceId from the request and response.

Request

The v1 request uses the v3 request header, which contains the client instance ID. As a result, the clientInstanceId  field is removed in the v1 request body. With v1, the client is responsible for calculating the client instance ID.

{
  "apiKey": 71,
  "type": "request",
  "listeners": ["broker"],
  "name": "GetTelemetrySubscriptionsRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "headerVersions": {
    "0": "2",
    "1+": "3"
  },
  "fields": [
    { "name": "ClientInstanceId", "type": "uuid", "versions": "0",
      "about": "Unique id for this client instance, must be set to 0 on the first request." }
  ]
}

Response

The v1 response removes the ClientInstanceId . With v1, the client is responsible for calculating the client instance ID.

{
  "apiKey": 71,
  "type": "response",
  "name": "GetTelemetrySubscriptionsResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "headerVersions": {
    "0+": "1"
  },
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ClientInstanceId", "type": "uuid", "versions": "0",
      "about": "Assigned client instance id if ClientInstanceId was 0 in the request, else 0." },
    { "name": "SubscriptionId", "type": "int32", "versions": "0+",
      "about": "Unique identifier for the current subscription set for this client instance." },
    { "name": "AcceptedCompressionTypes", "type": "[]int8", "versions": "0+",
      "about": "Compression types that broker accepts for the PushTelemetryRequest." },
    { "name": "PushIntervalMs", "type": "int32", "versions": "0+",
      "about": "Configured push interval, which is the lowest configured interval in the current subscription set." },
    { "name": "TelemetryMaxBytes", "type": "int32", "versions": "0+",
      "about": "The maximum bytes of binary data the broker accepts in PushTelemetryRequest." },
    { "name": "DeltaTemporality", "type": "bool", "versions": "0+",
      "about": "Flag to indicate monotonic/counter metrics are to be emitted as deltas or cumulative values." },
    { "name": "RequestedMetrics", "type": "[]string", "versions": "0+",
      "about": "Requested metrics prefix string match. Empty array: No metrics subscribed, Array[0] empty string: All metrics subscribed." }
  ]
}

PushTelemetry API

A new version of PushTelemetry  is introduced which removes ClientInstanceId from the request.

Request

The v1 request uses the v3 request header, which contains the client instance ID. As a result, the clientInstanceId  field is removed in the v1 request body.

{
  "apiKey": 72,
  "type": "request",
  "listeners": ["broker"],
  "name": "PushTelemetryRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "headerVersions": {
    "0": "2",
    "1+": "3"
  },
  "fields": [
    { "name": "ClientInstanceId", "type": "uuid", "versions": "0",
      "about": "Unique id for this client instance." },
    { "name": "SubscriptionId", "type": "int32", "versions": "0+",
      "about": "Unique identifier for the current subscription." },
    { "name": "Terminating", "type": "bool", "versions": "0+",
      "about": "Client is terminating the connection." },
    { "name": "CompressionType", "type": "int8", "versions": "0+",
      "about": "Compression codec used to compress the metrics." },
    { "name": "Metrics", "type": "bytes", "versions": "0+", "zeroCopy": true,
      "about": "Metrics encoded in OpenTelemetry MetricsData v1 protobuf format." }
  ]
}

Response

No change is made to the schema of the response.

{
  "apiKey": 72,
  "type": "response",
  "name": "PushTelemetryResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "headerVersions": {
    "0+": "1"
  },
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

Compatibility, Deprecation, and Migration Plan

The change follows the usual mechanism for versioning of protocol requests. There should be no compatibility or migration issues introduced by this.

Test Plan

Unit tests will be added to ensure that the new behavior works as expected. The existing integration and system tests should be entirely unaffected by the change, which would show that there was no behavioral impact.

Rejected Alternatives

It was planned to add a tagged field for client instance ID to version 2 of the request header. Although this proposal passed a vote, discussion started up again centered around requiring a tagged field which is by its very nature optional. Consequently, the KIP was revised to introduce version 3 of the request header.

It was also planned to send the client ID only on the initial request on each connection, and then send a null client ID for all subsequent requests. This eliminates the unnecessary overhead of repeatedly sending the same client ID string to the broker on every request. However, doing such a change without bumping the request versions was considered to be too risky. For example, if a new client was communicating with an older broker, the fact that client ID was only present on the initial request could break client ID-based quotas. As a result, the client ID change was removed from this KIP, and could potentially be introduced the future alongside version bumps of the RPCs.

  • No labels