...
In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.
Public Interfaces
1) Add leader_epoch field to MetadataResponse
Code Block |
---|
UpdateMetadataRequestMetadataResponse => controller throttle_time_ms brokers cluster_id controller_epochid partitiontopic_states live_brokers metadata throttle_time_ms => int32 brokers => [MetadatBroker] cluster_id => nullable_str controller_id => int32 controllertopic_epochmetadata => int32[TopicMetadata] TopicMetadata controller_metadata_epoch=> topic_error_code topic is_internal partition_metadata topic_error_code => int32int16 topic <-- New. This is instantiated to 0 after a broker becomes controller and monotonically increase over time controller_metadata_version => partition_states=> str is_internal => boolean partition_metadata => [PartitionMetadata] PartitionMetadata => partition_error_code partition_id leader replicas isr offline_replicas partition_error_code => int16 partition_id => int32 leader => int32 replicas => [int32] isr => [UpdateMetadataRequestPartitionStateint32] liveoffline_brokersreplicas => [UpdateMetadataRequestBroker]int32] leader_epoch => int32 <-- NEW |
2) Add leader_epoch field to OffsetCommitRequest
Code Block |
---|
OffsetCommitRequest |
Code Block |
MetadataResponse => throttlegroup_time_ms brokers clusterid generation_id controllermemeber_id topicretention_metadatatime topics throttle_time_msgroup_id => str generation_id => int32 controller_epochmember_id => str retention_time => int64 int32topics => <-- New. This is the same as the controller_epoch in UpdateMetadataRequest. controller_metadata[OffsetCommitRequestTopic] OffsetCommitRequestTopic => topic partitions topic => str partitions => [OffsetCommitRequestPartition] OffsetCommitRequestPartition => partition offset metadata partition => int32 offset => int64 leader_epoch => int32 <-- New. This is the same as the controller_metadata_epoch NEW metadata => nullable_str |
3) Add leader_epoch field to OffsetCommitResponse
Code Block |
---|
OffsetCommitResponse => throttle_time_ms response error_code throttle_time_ms => int32 responses in UpdateMetadataRequest. brokers => [MetadatBrokerOffsetFetchResponseTopic] cluster_iderror_code => int16 OffsetFetchResponseTopic => nullable_str controller_idtopic partition_responses topic => str partition_responses => [OffsetFetchResponsePartition] OffsetFetchResponsePartition => partition offset metadata error_code partition => int32 offset => int64 topic_leader_epoch => int32 <-- NEW metadata => TopicMetadatanullable_str error_code => int16 |
Proposed Changes
Controller maintains a controllerMetadataEpoch field in memory. This field is reset to 0 after the broker becomes controller. Then it is incremented by 1 every time the controller broadcasts new UpdateMetadataRequest. Together with the existing controllerEpoch field, this allows client to determine whether a new MetadataResponse is newer or older.
1) Metadata refresh
After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if any of the following conditions are true:
- Across all those partitions that the client is interested in, there exists a partition whose leader_epoch in the MetadataResposne < the leader_epoch in the cached metadata
- Across all those partitions that the client is interested in, there exists a partition A which is found in the cached metadata but not in the MetadataResponse. And there exists another partition B in MetadataResponse where B.topic == A.topic.
The client will be forced to refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.
Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is an optimization which aim to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client.
2) Offset commit and fetch
When consumer commits offset, it looks up leader_epoch of the partition in the cached metadata and includes this value in the OffsetCommitRequest.
After consumer receives OffsetFetchResponse, it needs to refresh metadata until the leader_epoch in the cached metadata >= the leader_epoch in OffsetFetchResponse for all partitions in the OffsetFetchResponse.
After client receives MetadataResponse from a broker, it compares the controller_epoch and controller_metadata_epoch of the currently cached metadata with those of the incoming MetadataResponse. If (newMetadataResponse.controller_epoch > existingMetadata.controller_epoch || ( newMetadataResponse.controller_epoch == existingMetadata.controller_epoch && newMetadataResponse.controller_metadata_epoch >= existingMetadata.controller_metadata_epoch)), then the client accepts this newMetadataResponse. Otherwise, the client rejects this newMetadataResponse and refreshes metadata again with the existing retry procedure.
Compatibility, Deprecation, and Migration Plan
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Rejected Alternatives
- Use a global per-metadata version.
This can be a bit more complicated by introducing a new state in Kafka. leader_epoch is an existing state we already maintain in zookeeper. By using per-partition leader_epoch the client will only be forced to re-fresh metadata if the MetadataResponse contains out-dated metadata for those partitions that the client is interested in.