DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Certain Kafka operations are implemented by sending the request to the least loaded broker. The broker handles these requests by reading its in-memory cache and returning the information stored in the cache. The content of the metadata cache is backed by the state in the cluster metadata partition (__cluster_metadata-0). The content of the metadata cache and the cluster metadata partition is consistent for a given log offset and only includes committed data. Committed data is data that is guarantee not be truncated due to log divergence.
Since some read operations are sent the least loaded node, there is no guarantee that future requests from the same client will see a state that is more recent (larger log offset) than previously seen state. This is because there is no guarantee that future read operations will be sent to a node that contains previously seen log offsets.
Proposed Changes
General solution to this problem is to allow clients to only receive RPC responses with monotonically increasing cluster metadata log offsets.
Server
Kafka public API will be extended by allowing clients to provide the latest metadata offset known by the client. This information will be encoded in the RequestHeader using the cluster metadata offset. When a server handles a request it will check that its own cluster metadata offset is as up to date as the client's. If the server is behind the client it may wait up to the request timeout for its own cluster metadata to be as update as the client's or it may return an STALE_METADATA error.
After handling a request, Kafka servers will include their latest cluster metadata offset in the ResponseHeader.
Clients
The Kafka clients will be updated to include in the RequestHeader the latest cluster metadata offset seen by the client. The client will update its latest seen cluster metadata offset by reading it from the ResponseHeader. The will also need to handle the new STALE_METADATA error by retrying the operation.
The Java Kafka client will allow the users to share and update the latest cluster metadata offset across multiple client instances in the same JVM. This feature will be supported by the Admin, Producer and Consumer clients. This will be done by introducing a few new objects. The ConsistencyContextStore object will be used for storing and read the latest cluster metadata offset across all configured clients. The Factory object will be responsible for creating clients so that they share the same ConsistencyContextStore object and hence the same cluster metadata offset. Finally the ConsistencyContext object will abstract the cluster metadata offset.
If the user is interested in share ConsistencyContext across process boundary, they are responsible for reading the latest consistency context, serializing it, sending it across process boundaries, deserializing it and updating the remove consistency context store.
Public Interfaces
Remote Produce Calls
Error
Kafka will add a new retrievable error named STALE_METADATA. This error will be return by the Kafka servers if their cluster metadata version is not as up to date as the client provided cluster metadata version. This error is not fatal and it is retrievable. The client should retry the operation if they receive an STALE_METADATA error.
RequestHeader
{
"type": "header",
"name": "RequestHeader",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
//
// Version 0 of the RequestHeader is only used by v0 of ControlledShutdownRequest.
//
// Version 1 is the first version with ClientId.
//
// Version 2 is the first flexible version.
"validVersions": "1-2",
"flexibleVersions": "2+",
"fields": [
{ "name": "RequestApiKey", "type": "int16", "versions": "0+",
"about": "The API key of this request." },
{ "name": "RequestApiVersion", "type": "int16", "versions": "0+",
"about": "The API version of this request." },
{ "name": "CorrelationId", "type": "int32", "versions": "0+",
"about": "The correlation ID of this request." },
// The ClientId string must be serialized with the old-style two-byte length prefix.
// The reason is that older brokers must be able to read the request header for any
// ApiVersionsRequest, even if it is from a newer version.
// Since the client is sending the ApiVersionsRequest in order to discover what
// versions are supported, the client does not know the best version to use.
{ "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "flexibleVersions": "none",
"about": "The client ID string." },
{ "name": "ConsistencyState", "type": "ConsistencyState", "versions": "2+", "taggedVersions": "2+", "tag": 0,
"about": "Consistency context for the request.", "fields": [
{ "name": "ClusterId", "type": "string", "versions": "2+", "nullableVersions": "2+", "default": "null",
"about": "The clusterId if known. This is used to validate request against the expected cluster." },
{ "name": "ConsistencyToken", "type": "int64", "versions": "2+", "default": "-1",
"about": "The latest consistency token seen by the client." }
]}
]
}
ResponseHeader
{
"type": "header",
"name": "ResponseHeader",
// Version 1 is the first flexible version.
"validVersions": "0-1",
"flexibleVersions": "1+",
"fields": [
{ "name": "CorrelationId", "type": "int32", "versions": "0+",
"about": "The correlation ID of this response." },
{ "name": "ConsistencyState", "type": "ConsistencyState", "versions": "1+", "taggedVersions": "1+", "tag": 0,
"about": "Consistency context for the request.", "fields": [
{ "name": "ClusterId", "type": "string", "versions": "1+", "nullableVersions": "1+", "default": "null",
"about": "The clusterId if known. This is used to validate request against the expected cluster." },
{ "name": "ConsistencyToken", "type": "int64", "versions": "1+", "default": "-1",
"about": "The latest consistency token seen by the client." }
]}
]
}
Handling
If the client provides a ConsistencyState in the request header the server will handle them by:
- Handle the request and currently define and implemented.
- Checking that the cluster id matches the cluster id of the server. If the cluster id doesn't match it will send a failed response with the error INCONSISTENT_CLUSTER_ID.
- If the operation need to access the metadata cache, check that the consistency token is as old as (less than or equal) the metadata cache's last contained log offset. If the consistency token is newer it will send a failed response with the new error STALE_METADATA.
- When constructing the ResponseHeader it will read the metadata cache's last contained log offset and include that in the consistency token. It is important to read the metadata cache's offset after the response has been constructed since the metadata cache is immutable and doesn't contain any synchronization support.
All of the operation that read the metadata cache are:
- Metadata with API key 3
- DescribeAcls with API key 29
- DescribeConfigs with API key 32
- DescribeLogDirs with API key 35
- DescribeDelegationToken with API key 41
- DescribeClientQuotas with API key 48
- DescribeUserScramCredentials with API key 50
- DescribeCluster with API key 60
- ListConfigResources with API key 74
- DescribeTopicPartitions with API key 75
Sending
When client
Clients
Factory
package org.apache.kafka.clients;
/**
* Object for creating Kafka clients with a shared consistency.
*
* This object allows the user to create Admin clients, Producer clients and Consumer clients with
* a shared consistency.
*
* For example, if you would like to create an Admin client to create ACLs and a topic, and have
* the producer and consumer to see a consistent view of the cluster metadata then use the same
* factory to create all of the associated clients.
*
* This object implements three important menthods. The method {@code admin} can be used to create
* Admin clients. The method {@code producer} can be used to create Producer clients. The
* method {@code consumer} can be used to create Consumer clients.
*/
public final class Factory {
private final ConsistencyContextStore store;
/**
* Creates a Factory object.
*
* @param store the store for storing the latest consistency context
*/
Factory(ConsistencyContextStore store) {
this.store = store;
}
/**
* Creates an Admin client.
*
* @param config the admin client configuration
*/
public Admin admin(Map<String, Object> config) {
...
}
/**
* Creates a Producer client.
*
* @param config the producer configuration
* @param keySerializer the serializer for the key
* @param ValueSerializer the serializer for the value
*/
public <K, V> Producer<K, V> producer(
Map<String, Object> config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer
) {
...
}
/**
* Creates a Consumer clients.
*
* @param config the consumer configuration
* @param keyDeserializer the deserializer for the key
* @param valueDeserializer the deserializer for the value
*/
public <K, V> Consumer<K, V> consumer(
Map<String, Object> config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer
) {
...
}
}
Consistency Context Store
package org.apache.kafka.clients;
/**
* An object that can be used to read and write the latest metadata consistency context for a Kafka
* cluster.
*
* A {@code ConsistencyContextStore} exposes two operations. The method {@code storeLatest} can be
* used to store the latest metadata consistency context. The method {@code read} can be used to
* read the latest metadata consistent context.
*/
public interface ConsistencyContextStore {
/**
* Method for storing the latest consistency context.
*
* Overrides the stored metadata {@code ConsistencyContext} if the provided
* {@code consistencyContext} is more up to date than the stored metadata consistency context.
* Otherwise the already stored metadata {@code ConsistencyContext} is kept.
*
* Returns {@code true} if the stored metadata consistency context was updated. Otherwise, it
* returns {@code false}.
*
* @param consistencyContext the new consistency context to attempt store
* @return true the consistency context was updated, otherwise returns false
* @throws IllegalArgumentException if the cluster id do not match
*/
boolean storeLatest(ConsistencyContext consistencyContext);
/**
* Method for reading the latest consistency context.
*
* @return the currently stored metadata consistency context
*/
ConsistencyContext read();
/**
* Creates and returns an {@code ConsistencyContextStore} object with
* {@code consistencyContext} as the starting value.
*
* @param consistencyContext the initial value for the consisteny context store
* @return a memory backed consistency context store
*/
public static ConsistencyContextStore of(ConsistencyContext consistencyContext) {
return MemoryConsistencyContextStore.of(consistencyContext);
}
/**
* Creates and returns an {@code ConsistencyContextStore} object with no initial value.
*
* @return a memory backed consistency context store
*/
public static ConsistencyContextStore empty() {
return MemoryConsistencyContextStore.of(ConsistencyContext.unknown());
}
/**
* Legacy metadata consistency context.
*
* This metadata consistency context store achieves eventual consistency context by always
* returning the empty consistency context.
*
* @return a consistency context store that always the empty consistency context
*/
public static ConsistencyContextStore eventuallyConsistent() {
return LegacyConsistencyContextStore.singleton();
}
}
Consistency Context
package org.apache.kafka.common.metadata;
/**
* An object that represent the metadata consistency context for a given Kafka cluster.
*
* A {@code ConsistencyContext} exposes two operations. The method {@code later} compares
* two consistency context and return the more up to date context. The method {@code isUnknown}
* returns true if the object represents an unknown metadata consistency context.
*/
public interface ConsistencyContext extends Serializable {
/**
* Compares two consistency context and returns the more up to date consistency context.
*
* The returned consistency context will have the greater of the two offsets. If the cluster
* id do not match an {@code IllegalArgumentException} is thrown.
*
* @param other the consistency context to compare
* @return the more up to date consistency context
* @throws IllegalArgumentException if neither consistency context is empty and the cluster id
* do not match
*/
public ConsistencyContext later(ConsistencyContext other);
/**
* Returns true is the consistency context is unknown.
*/
public boolean isUnknown();
/**
* Returns the empty consistency context.
*
* This is the default consistency context when the value is unknown.
*/
public static ConsistencyContext unknown() {
return MetadataConsistencyContext.unknown();
}
/**
* Returns a consistency context describing the given cluster id and offset.
*
* @param clusterId the cluster id
* @param offset the metadata offset
* @return an consistency context representing the cluster id and offset
* @throws IllegalArgumentException if offset is negative
* @throws NullPointerException if the cluster id is null
*/
public static ConsistencyContext of(String clusterId, long offset) {
return MetadataConsistencyContext.of(clusterId, offset);
}
}
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.