...
In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.
Public Interfaces
Zookeeper
Add znode /metadata_epoch with the following json format
Code Block |
---|
{
"version" : int32,
"metadata_epoch" : int64
} |
Request/Response protocol
1) Add metadata_epoch field to UpdateMetadataRequest
1) Add leader_epoch field to MetadataResponse
Code Block |
---|
MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata
throttle_time_ms => int32
brokers => [MetadatBroker]
cluster_id => nullable_str |
Code Block |
UpdateMetadataRequest => controller_id controller_epoch metadata_epoch partition_states live_brokers controller_id => int32 controllertopic_epochmetadata => int32[TopicMetadata] TopicMetadata => topic_error_code topic is_internal partition_metadata_epoch topic_error_code => int64int16 topic <-- New=> str partitionis_statesinternal => [UpdateMetadataRequestPartitionState]boolean livepartition_brokersmetadata => [UpdateMetadataRequestBroker] |
2) Add metadata_epoch field to MetadataResponse
Code Block |
---|
MetadataResponsePartitionMetadata] PartitionMetadata => throttlepartition_timeerror_mscode brokers clusterpartition_id controller_id metadata_epoch topic_metadata throttle_time_ms leader replicas isr offline_replicas partition_error_code => int16 partition_id => int32 leader => int32 brokersreplicas => [MetadatBrokerint32] cluster_idisr => nullable_str[int32] controlleroffline_idreplicas => [int32] metadataleader_epoch => int64int32 <-- New topic_metadata => [TopicMetadata] <-- NEW |
2) Add leader3) Add metadata_epoch field to OffsetCommitRequest
Code Block |
---|
OffsetCommitRequest => group_id generation_id memeber_id retention_time topics group_id => str generation_id => int32 member_id => str retention_time => int64 topics => [OffsetCommitRequestTopic] OffsetCommitRequestTopic => topic partitions topic => str partitions => [OffsetCommitRequestPartition] OffsetCommitRequestPartition => partition offset metadata_epoch metadata partition => int32 offset => int64 metadataleader_epoch => int64int32 <-- NEW metadata => nullable_str |
43) Add metadataleader_epoch field epoch field to OffsetFetchResponse
Code Block |
---|
OffsetFetchResponse => throttle_time_ms response error_code throttle_time_ms => int32 responses => [OffsetFetchResponseTopic] error_code => int16 OffsetFetchResponseTopic => topic partition_responses topic => str partition_responses => [OffsetFetchResponsePartition] OffsetFetchResponsePartition => partition offset metadata_epoch metadata error_code partition => int32 offset => int64 metadataleader_epoch => int64int32 <-- NEW metadata => nullable_str error_code => int16 |
Offset topic schema
Add metadata_epoch field 4) Add leader_epoch field to the schema of the offset topic value.
Code Block |
---|
OFFSET_COMMIT_VALUE_SCHEMA => offset metadataleader_epoch metadata commit_timestamp expire_timestamp offset => int64 metadataleader_epoch => int64int32 <-- NEW metadata => str commit_timestamp => int64 expire_timestamp => int64 |
...
15) Add error INVALID_METADATALEADER_EPOCH. This is will be a non-retriable error and it which may be thrown from consumer's API.
2) Add the following two methods to the interface org.apache.kafka.clients.consumer.Consumer
Code Block |
---|
// This method returns the current metadata_epoch in the Consumer. It will be -1 if consumer has never received any MetadataResponse.
public long metadataEpoch()
// This method will block until either the timeout has reached or the consumer has received a MetadataResponse whose metadata_epoch >= minMetadataEpoch
public boolean waitForMetadataUpdate(long minMetadataEpoch, long timeout) |
Proposed Changes
1) Controller metadata update
Every time controller sends UpdateMetadataRequest, the controller will read the metadata_epoch in the znode /metadata_epoch by 1 and includes the new value in UpdateMetadataRequest
2) Client's Metadata refresh
After client receives MetadataResponse from a broker, it compares metadata_epoch in the MetadataResponse with the metadata_epoch in the last-accepted MetadataResponse. Client will consider the MetadataResponse as outdated and re-fetch Metadata again with the exiting backoff mechanism if the metadata_epoch in the MetadataResponse is smaller than the metadata_epoch in the last-accepted MetadataResponse.
Proposed Changes
1) Metadata refresh
After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if any of the following conditions are true:
- Across all those partitions that the client is interested in, there exists a partition whose leader_epoch in the MetadataResposne < the leader_epoch in the cached metadata
- Across all those partitions that the client is interested in, there exists a partition A which is found in the cached metadata but not in the MetadataResponse. And there exists another partition B in MetadataResponse where B.topic == A.topic. This is needed for the partition expansion scenario.
The client will be forced to refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.
Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is an optimization which aim to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client.
23) Offset commit
When consumer commits offset, it looks up leader_epoch of the partition in the cached metadata and includes the metadatathis value in the OffsetCommitRequest. The leader_epoch from the last-accepted MetadataResponse in the OffsetCommitRequestwill included in the message appended to the offset topic.
When coordinator receives the OffsetCommitRequest, for each partition in the OffsetCommitRequest, it will additionally check whether the metadataleader_epoch in epoch in the request >= metadata_epoch in leader epoch in the last commit. If not, the offset for this partition is not committed and the error for that partition in the OffsetCommitResponse will be INVALID_METADATALEADER_EPOCH. If yes, both the offset and the metadata_epoch for this partition will be written to the offset topic.
43) Offset fetch
After consumer receives OffsetFetchResponse, it remembers the metadataleader_epoch for each partition it needs to consume. Then the consumer needs to refresh metadata repeatedly until the metadataleader_epoch from MetadataResponse >in the cached metadata >= the metadataleader_epoch in OffsetFetchResponse for all partitions in the OffsetFetchResponspartitions it wants to consume. Note that these logic are all hidden from user and the leader_epoch will not be exposed to user via consumer's public API (e.g. OffsetAndMetadata).
For existing version of the offset topic, metadataleader_epoch will epoch will not be available in the value of the offset topic message. We will use metadataleader_epoch epoch = -1 to indicate the missing metadataleader_epoch. In this case metadataleader_epoch in epoch in any MetadataResponse will be larger than the metadataleader_epoch epoch = -1 and the consumer behavior will be the same as it is now.
5) Consumer initialization if offset is stored externally.
After getting the offset to be stored externally, user needs to additionally use the newly-added API metadataEpoch()
to get the metadata_epoch in the consumer. This metadata_epoch should be stored externally as well. Before the next consumer seeks to the offset that is stored externally, this consumer should first read the metadata_epoch from the external store and call waitForMetadataUpdate()
to make sure that the consumer receives MetadataResponse whose metadata_epoch >= previously-stored metadata_epoch.
Compatibility, Deprecation, and Migration Plan
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Rejected Alternatives
- Use a global per-metadata version.
This can be a bit more complicated by introducing a new state in Kafka. leader_epoch is an existing state we already maintain in zookeeper. By using per-partition leader_epoch the client will only be forced to re-fresh metadata if the MetadataResponse contains out-dated metadata for those partitions that the client is interested in.