Status

Current stateVoting

Discussion thread: here

Voting thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today, all Kafka protocol requests include the client ID. This identifier can be set by the user setting the client configuration property client.id. This is a useful capability, but it has limitations. The client ID was meant for identifying an application, and not the instances of an application. Often, it is not sufficient to identify a particular client instance.

This KIP adds a UUID called the client instance ID into the request header of all Kafka protocol requests. Each client has a different client instance ID, so correlating requests from a particular client becomes much easier. An immediate benefit is that it can be included in request logging for troubleshooting. It is being added to improve traceability and problem determination.

Proposed Changes

This KIP proposes the following changes to the Kafka protocol.

Client Instance ID

This KIP proposes adding a UUID called the client instance ID into the request header of all Kafka protocol requests. Those familiar with KIP-714 will be aware that it already introduced the client instance ID, so this KIP actually proposes that concept to a universal unique identifier for client instances in all RPCs.

The client instance ID is now calculated by the client during the constructor of the client before it makes its initial connection to the cluster. This differs slightly from how KIP-714 initialized the client instance ID. The client uses the same client instance ID for its connections to every broker throughout its lifetime, even when it rebootstraps and makes new connections. The consistency and uniqueness of the identifiers are both important characteristics.

In KIP-714, the client instance ID was created by the broker which responded to a client's first GetTelemetrySubscriptions RPC. In KIP-848, the member ID was created by the group coordinator in response to a heartbeat, but subsequently KIP-1082 changed this so that the client created its own member ID. As a result, this KIP calculates the client instance ID on the client.

The client instance ID is added as a tagged field in the RPC request header, so it is present on all RPCs that use the v2 request header, which is almost all of the current versions of the RPCs (only SaslHandshake and OffsetDelete use the v1 request header for their latest versions). The client is required to be consistent in its use of client instance ID in request headers. If it specifies the value in its request headers, every request on a connection must specify the same value. If it does not specify the value in its request headers, every request on a connection must not specified a value.

If a connection specifies a client instance ID in the request header of its first request which uses the v2 request header, it must specify the same client instance ID in the request header for all subsequent requests which use the v2 request header. The initial client instance ID for each connection will be cached by the broker for checking (this is an implementation detail, but caching it in the ChannelMetadataRegistry is an option). Once a client has specified a client instance ID in the request header of its first request, any subsequent requests which are missing the client instance ID (with the exception of requests using the v1 request header) or which specify a different value for the client instance ID will be rejected with error code INVALID_REQUEST.

If a client does not specify a client instance ID in the request header of its first request which uses the v2 request header, it must not specify a client instance ID in the request header of any subsequent requests. If it does so, the request will be rejected with error code INVALID_REQUEST.

Alignment of identifiers

By adding client instance ID to the request headers, we now had a unique application instance identifier which we can use for other purposes such as the member ID in the group protocols and the client telemetry client instance ID. The alignment of these other identifiers is by convention (and the Java client will follow the convention) rather than mandate. In the language of standards, the client SHOULD use the same UUID in request headers as it uses for the member ID in the group protocols and the client telemetry client instance ID. This alignment just makes traceability and problem determination more straightforward.

It would be possible to go around the existing RPCs such as ConsumerGroupHeartbeat  and GetTelemetrySubscriptions , and remove the fields containing the existing identifiers which are intended to be aligned. Doing so would be a bad idea though, because we would then have RPC versions which essentially depend upon the presence of a tagged field in the request header. This is a protocol-compatibility nightmare.

This KIP makes one change to the GetTelemetrySubscriptions  behavior. If the client requests a new client instance ID on its initial GetTelemetrySubscriptions  request and it sends a client instance ID in the request header, the broker will send back that client instance ID rather than generating a new UUID. This will automatically align the UUID in the request headers and client telemetry.

Key:

  • UUID-H - client instance ID sent by client in header, generated by client
  • UUID-R - client instance ID sent by client in request, which is not equal to UUID-H
  • UUID-B - client instance ID sent by broker in response, generated by broker

Pre-KIP-1313 - broker does not expect ClientInstanceId  in request header and ignores it

Client sends

Broker responds

Notes

GetTelemetrySubscriptions v0

request.ClientInstanceId = 0

response.ClientInstanceId = UUID-B

response.ErrorCode = NONE

This is KIP-714 initial GetTelemetrySubscriptions .

Client is requesting a new client instance ID from the broker.

GetTelemetrySubscriptions v0

request.ClientInstanceId = UUID-R

response.ClientInstanceId = 0

response.ErrorCode = NONE

This is KIP-714 non-initial GetTelemetrySubscriptions .

The client is using UUID-R for client telemetry.

Post-KIP-1313 - broker is aware of optional ClientInstanceId  in request header

Client sends

Broker responds

Notes

GetTelemetrySubscriptions v0

header.clientInstanceId not present

request.ClientInstanceId = 0

response.ClientInstanceId = UUID-B

response.ErrorCode = NONE

This is KIP-714 initial GetTelemetrySubscriptions  request from a pre-KIP-1313 client.

The client is requesting a new client instance ID from the broker.

GetTelemetrySubscriptions v0

header.ClientInstanceId = UUID-H

request.ClientInstanceId = 0

response.ClientInstanceId = UUID-H

response.ErrorCode = NONE

This is KIP-714 initial GetTelemetrySubscriptions  request from a post-KIP-1313 client.

The client is requesting a new client instance ID from the broker, but it has sent UUID-H in the request header which the broker uses, thus aligning the client instance ID for the request headers and client telemetry. This is what we expect from the Apache Kafka Java client after this KIP.

GetTelemetrySubscriptions v0

header.ClientInstanceId not present

request.ClientInstanceId = UUID-R

response.ClientInstanceId = 0

response.ErrorCode = NONE

This is KIP-714 non-initial GetTelemetrySubscriptions  request from a pre-KIP-1313 client.

The client is using UUID-R for client telemetry.

GetTelemetrySubscriptions v0

header.ClientInstanceId = UUID-H

request.ClientInstanceId = UUID-H

response.ClientInstanceId = 0

response.ErrorCode = NONE

This is KIP-714 non-initial GetTelemetrySubscriptions  request from a post-KIP-1313 client.

The client is using UUID-H for request headers and client telemetry. This is what we expect from the Apache Kafka Java client after this KIP.

GetTelemetrySubscriptions v0

header.ClientInstanceId = UUID-H

request.ClientInstanceId = UUID-R

response.ClientInstanceId = 0

response.ErrorCode = NONE

This is also KIP-714 non-initial GetTelemetrySubscriptions  request from a post-KIP-1313 client.

For some reason, the client is using UUID-H for request headers, but UUID-R for client telemetry. This is allowed, but not preferred.

Public Interfaces

Client API Changes

The client instance ID is calculated during the constructor of the Producer , Consumer , ShareConsumer and Admin  implementations so there is no need to have a timeout parameter on the accessor method. The following method is added to these interfaces:

public Uuid clientInstanceId()

and then the following method is deprecated for removal in Apache Kafka 5.0:

public Uuid clientInstanceId(Duration timeout)

In a similar vein, the following method in KafkaStreams is added:

public ClientInstanceIds clientInstanceIds()

and then the following method is deprecated for removal in Apache Kafka 5.0:

public ClientInstanceIds clientInstanceIds(Duration timeout)

Kafka Protocol Changes

Request Header

This KIP introduces a tagged field ClientInstanceId into version 2 of the request header. This means it can be introduced without any other RPC changes.

{
  "type": "header",
  "name": "RequestHeader",
  // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
  //
  // Version 0 of the RequestHeader is only used by v0 of ControlledShutdownRequest.
  //
  // Version 1 is the first version with ClientId.
  //
  // Version 2 is the first flexible version.
  // Tagged field 0 introduces ClientInstanceId. (KIP-1313)
  "validVersions": "1-2",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "RequestApiKey", "type": "int16", "versions": "0+",
      "about": "The API key of this request." },
    { "name": "RequestApiVersion", "type": "int16", "versions": "0+",
      "about": "The API version of this request." },
    { "name": "CorrelationId", "type": "int32", "versions": "0+",
      "about": "The correlation ID of this request." },

    // The ClientId string must be serialized with the old-style two-byte length prefix.
    // The reason is that older brokers must be able to read the request header for any
    // ApiVersionsRequest, even if it is from a newer version.
    // Since the client is sending the ApiVersionsRequest in order to discover what
    // versions are supported, the client does not know the best version to use.
    { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "flexibleVersions": "none",
      "about": "The client ID, for identifying an application." },
    { "name": "ClientInstanceId", "type": "uuid", "versions": "2+", "taggedVersions": "2+", "tag": 0, "ignorable": "true",
      "about": "The client instance ID, for identifying an instance of an application." }
  ]
}

GetTelemetrySubscriptions API

A very small behavioral change is made in the broker handling of GetTelemetrySubscriptions v0. If the client sends a zero ClientInstanceId  in the request body and it also sends a ClientInstanceId  in the request header, rather than generating a new UUID, the broker will respond with the ClientInstanceId  from the request header. This naturally aligns the request header and client telemetry values for ClientInstanceId .

Compatibility, Deprecation, and Migration Plan

The addition of a tagged field in the request header should have no impact.

Test Plan

Unit tests will be added to ensure that the new behavior works as expected. The existing integration and system tests should be entirely unaffected by the change, which would show that there was no behavioral impact.

Rejected Alternatives

It would be possible to add an untagged field to the request header and bump the version of the request header but this is expensive. Each version of the Kafka protocol RPCs has an associated request header version, so it would be necessary to bump the versions of all the other RPCs.

It was also planned to send the client ID only on the initial request on each connection, and then send a null client ID for all subsequent requests. This eliminates the unnecessary overhead of repeatedly sending the same client ID string to the broker on every request. However, doing such a change without bumping the request versions was considered to be too risky. For example, if a new client was communicating with an older broker, the fact that client ID was only present on the initial request could break client ID-based quotas. As a result, the client ID change was removed from this KIP, and could potentially be introduced the future alongside version bumps of the RPCs.

  • No labels