Status
Current state: Under Discussion
Discussion thread:
JIRA: here
Motivation
Currently client (e.g. producer, consumer) fetches metadata from the least loaded node. Because Kafka Controller sends UpdataMetadataRequest to brokers concurrently and there may be difference in when brokers process the UpdateMetadataRequest, it is possible that client fetches a metadata that is older than the existing metadata in its cache. This can cause OffsetOutOfRangeException in consumer even if there is no log truncation in the Kafka cluster (See KAFKA-6262 for more detail). For MirrorMaker whose offset reset policy is oldest, it can cause MM to rewind back to consume from the oldest offset. This increases the latency of transmitting the data from source to destination cluster and duplicates many data in the destination cluster.
In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.
Public Interfaces
UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers controller_id => int32 controller_epoch => int32 controller_metadata_epoch => int32 <-- New. This is instantiated to 0 after a broker becomes controller and monotonically increase over time controller_metadata_version => partition_states => [UpdateMetadataRequestPartitionState] live_brokers => [UpdateMetadataRequestBroker]
MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata throttle_time_ms => int32 controller_epoch => int32 <-- New. This is the same as the controller_epoch in UpdateMetadataRequest. controller_metadata_epoch => int32 <-- New. This is the same as the controller_metadata_epoch in UpdateMetadataRequest. brokers => [MetadatBroker] cluster_id => nullable_str controller_id => int32 topic_metadata => TopicMetadata
Proposed Changes
Controller maintains a controllerMetadataEpoch field in memory. This field is reset to 0 after the broker becomes controller. Then it is incremented by 1 every time the controller broadcasts new UpdateMetadataRequest. Together with the existing controllerEpoch field, this allows client to determine whether a new MetadataResponse is newer or older.
After client receives MetadataResponse from a broker, it compares the controller_epoch and controller_metadata_epoch of the currently cached metadata with those of the incoming MetadataResponse. If (newMetadataResponse.controller_epoch > existingMetadata.controller_epoch || ( newMetadataResponse.controller_epoch == existingMetadata.controller_epoch && newMetadataResponse.controller_metadata_epoch >= existingMetadata.controller_metadata_epoch)), then the client accepts this newMetadataResponse. Otherwise, the client rejects this newMetadataResponse and refreshes metadata again with the existing retry procedure.
Compatibility, Deprecation, and Migration Plan
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Rejected Alternatives