Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Certain Kafka operations are implemented by sending the request to the least loaded broker. The broker handles these request requests by reading its in-memory cache an and returning the information stored in the cache. The content of the metadata cache is backed by the state in the cluster metadata partition (__cluster_metadata-0). The content of the metadata cache and the cluster metadata partition is consistent for a given log offset and only include includes committed data. Committed data is data that is guarantee not be truncated due to log divergence.

Since some read operations are sent the least loaded node, there is no guarantee that future requests from the same client will see a consistent state that is more recent (larger log offset) than previously seen state. This is because there is no guarantee that future read operations will be sent to a node that contain contains previously seen log offsets.

Proposed Changes

General solution to this problem is to allow clients to only receive RPC responses with monotonically increasing cluster metadata log offsets.

Server

Kafka public API will be extended by allowing clients to provide the latest metadata offset known by the client. This information will be encoded in the RequestHeader using the cluster metadata offset. When a server handles a request it will check that its own cluster metadata offset is as up to date as the client's. If the server is behind the client it may wait up to the request timeout for its own cluster metadata to be as update as the client's or it may return an INCONSISTENTSTALE_METADATA error.

After handling a request, Kafka servers will include their latest cluster metadata offset in the ResponseHeader.

Clients

TODO: explain this...

Public Interfaces

Remote Produce Calls

Error

Kafka will add a new retrievable error named INCONSISTENT_METADATA. This error will be return by the Kafka servers if their cluster metadata version is not as up to date as the client provided cluster metadata version. This error is not fatal and it is retrievable. The client should retry the operation if they receive an INCONSISTENT_METADATA error.

RequestHeader

The Kafka clients will be updated to include in the RequestHeader the latest cluster metadata offset seen by the client. The client will update its latest seen cluster metadata offset by reading it from the ResponseHeader. The will also need to handle the new STALE_METADATA error by retrying the operation.

The Java Kafka client will allow the users to share and update the latest cluster metadata offset across multiple client instances in the same JVM. This feature will be supported by the Admin, Producer and Consumer clients. This will be done by introducing a few new objects. The ConsistencyContextStore object will be used for storing and read the latest cluster metadata offset across all configured clients. The Factory object will be responsible for creating clients so that they share the same ConsistencyContextStore object and hence the same cluster metadata offset. Finally the ConsistencyContext object will abstract the cluster metadata offset.

If the user is interested in share ConsistencyContext across process boundary, they are responsible for reading the latest consistency context, serializing it, sending it across process boundaries, deserializing it and updating the remove consistency context store.


Public Interfaces

Remote Produce Calls

Error

Kafka will add a new retrievable error named STALE_METADATA. This error will be return by the Kafka servers if their cluster metadata version is not as up to date as the client provided cluster metadata version. This error is not fatal and it is retrievable. The client should retry the operation if they receive an STALE_METADATA error.

RequestHeader

Code Block
{
  "type": "header",
  "name": "RequestHeader",
  // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
  //
  // Version 0 of the RequestHeader is only used by v0 of ControlledShutdownRequest.
  //
  // Version 1 is the first version with ClientId.
  //
  // Version 2 is the first flexible version.
  "validVersions": "1-2",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "RequestApiKey", "type": "int16", "versions": "0+",
      "about": "The API key of this request." },
    { "name": "RequestApiVersion", "type": "int16", "versions": "0+",
      "about": "The API version of this request." },
Code Block
{
  "type": "header",
  "name": "RequestHeader",
  // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
  //
  // Version 0 of the RequestHeader is only used by v0 of ControlledShutdownRequest.
  //
  // Version 1 is the first version with ClientId.
  //
  // Version 2 is the first flexible version.
  "validVersions": "1-2",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "RequestApiKey", "type": "int16", "versions": "0+",
      "about": "The API key of this request." },
    { "name": "RequestApiVersion", "type": "int16", "versions": "0+",
      "about": "The API version of this request." },
    { "name": "CorrelationId", "type": "int32", "versions": "0+",
      "about": "The correlation ID of this request." },

    // The ClientId string must be serialized with the old-style two-byte length prefix.
    // The reason is that older brokers must be able to read the request header for any
    // ApiVersionsRequest, even if it is from a newer version.
    // Since the client is sending the ApiVersionsRequest in order to discover what
    // versions are supported, the client does not know the best version to use.
    { "name": "ClientIdCorrelationId", "type": "stringint32", "versions": "10+", "nullableVersions": "1+", "flexibleVersions": "none",
      "about": "The clientcorrelation ID stringof this request." },

    // { "name": "ConsistencyState", "type": "ConsistencyState", "versions": "2+", "taggedVersions": "2+", "tag": 0,The ClientId string must be serialized with the old-style two-byte length prefix.
    // The "about": "Consistency context for the request.", "fields": [
      { "name": "ClusterId", "type": "string", "versions": "2+", "nullableVersions": "2+", "default": "null",reason is that older brokers must be able to read the request header for any
    // ApiVersionsRequest, even if it is from a newer version.
    // Since the client "about": "The clusterId if known. This is used to validate request against the expected cluster." },
      { "name": "ConsistencyTokenis sending the ApiVersionsRequest in order to discover what
    // versions are supported, the client does not know the best version to use.
    { "name": "ClientId", "type": "int64string", "versions": "21+", "defaultnullableVersions": "-1+",
 "flexibleVersions": "none",
      "about": "The latestclient consistency token seen by the clientID string." },
    ]}
  ]
}

ResponseHeader

Code Block
{
  "typename": "headerConsistencyState",
  "nametype": "ResponseHeaderConsistencyState",
  // Version 1 is the first flexible version.
  "validVersions"versions": "0-12+",
  "flexibleVersionstaggedVersions": "12+",
  "fieldstag": [0,
     { "nameabout": "CorrelationIdConsistency context for the request.", "typefields": "int32", "versions": "0+",[
      "about": "The correlation ID of this response." },
    { "name": "ConsistencyStateClusterId", "type": "ConsistencyStatestring", "versions": "12+", "taggedVersionsnullableVersions": "12+", "tagdefault": 0"null",
        "about": "ConsistencyThe contextclusterId forif the request.", "fields": [known. This is used to validate request against the expected cluster." },
      { "name": "ClusterIdConsistencyToken", "type": "stringint64", "versions": "12+", "nullableVersionsdefault": "-1+", "default": "null",
        "about": "The clusterIdlatest ifconsistency known.token Thisseen isby usedthe client." }
    ]}
  ]
}

ResponseHeader

Code Block
{
  "typeto validate request against the expected cluster." },
      { "name": "ConsistencyTokenheader",
  "typename": "int64ResponseHeader",
 "versions // Version 1 is the first flexible version.
  "validVersions": "0-1+",
  "defaultflexibleVersions": "-1+",
        ""fields": [
    { "name": "CorrelationId", "type": "int32", "versions": "0+",
      "about": "The latest consistency token seen by the client." }
    ]}
  ]
}

Handling

TODO

Sending

 correlation ID of this response." },
    { "name": "ConsistencyState", "type": "ConsistencyState", "versions": "1+", "taggedVersions": "1+", "tag": 0,
      "about": "Consistency context for the request.", "fields": [
      { "name": "ClusterId", "type": "string", "versions": "1+", "nullableVersions": "1+", "default": "null",
        "about": "The clusterId if known. This is used to validate request against the expected cluster." },
      { "name": "ConsistencyToken", "type": "int64", "versions": "1+", "default": "-1",
        "about": "The latest consistency token seen by the client." }
    ]}
  ]
}

Handling

If the client provides a ConsistencyState in the request header the server will handle them by:

  1. Handle the request and currently define and implemented.
  2. Checking that the cluster id matches the cluster id of the server. If the cluster id doesn't match it will send a failed response with the error INCONSISTENT_CLUSTER_ID.
  3. If the operation need to access the metadata cache, check that the consistency token is as old as (less than or equal) the metadata cache's last contained log offset. If the consistency token is newer it will send a failed response with the new error STALE_METADATA.
  4. When constructing the ResponseHeader it will read the metadata cache's last contained log offset and include that in the consistency token. It is important to read the metadata cache's offset after the response has been constructed since the metadata cache is immutable and doesn't contain any synchronization support.

All of the operation that read the metadata cache are:

  1. Metadata with API key 3
  2. DescribeAcls with API key 29
  3. DescribeConfigs with API key 32
  4. DescribeLogDirs with API key 35
  5. DescribeDelegationToken with API key 41
  6. DescribeClientQuotas with API key 48
  7. DescribeUserScramCredentials with API key 50
  8. DescribeCluster with API key 60
  9. ListConfigResources with API key 74
  10. DescribeTopicPartitions with API key 75

Sending

When client TODO

Clients

Factory

Code Block
package org.apache.kafka.clients;

/**
 * Object for creating Kafka clients with a shared consistency.
 *
 * This object allows the user to create Admin clients, Producer clients and Consumer clients with
 * a shared consistency.
 *
 * For example, if you would like to create an Admin client to create ACLs and a topic, and have
 * the producer and consumer to see a consistent view of the cluster metadata then use the same
 * factory to create all of the associated clients.
 *
 * This object implements three important menthods. The method {@code admin} can be used to create
 * Admin clients. The method {@code producer} can be used to create Producer clients. The
 * method {@code consumer} can be used to create Consumer clients.
 */
public final class Factory {
    private final ConsistencyContextStore store;

    /**
     * Creates a Factory object.
     *
     * @param store the store for storing the latest consistency context
     */
    Factory(ConsistencyContextStore store) {
        this.store = store;
    }

    /**
     * Creates an Admin client.
     *
     * @param config the admin client configuration
     */
    public Admin admin(Map<String, Object> config) {
        ...
    }

    /**
     * Creates a Producer client.
     *
     * @param config the producer configuration
     * @param keySerializer the serializer for the key
     * @param ValueSerializer the serializer for the value
     */
    public <K, V> Producer<K, V> producer(
        Map<String, Object> config,
        Serializer<K> keySerializer,
        Serializer<V> valueSerializer
    ) {
        ...
    }

    /**
     * Creates a Consumer clients.
     *
     * @param config the consumer configuration
     * @param keyDeserializer the deserializer for the key
     * @param valueDeserializer the deserializer for the value
     */
    public <K, V> Consumer<K, V> consumer(
        Map<String, Object> config,
        Deserializer<K> keyDeserializer,
        Deserializer<V> valueDeserializer
    ) {
        ...
    }
}

...