This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Current state: WIP - DRAFT
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Searching offset by timestamp in Kafka has very coarse granularity (log segment level), it also does not work well when replica is reassigned. This KIP tries to introduce a time-based log index to allow searching message in Kafka by timestemp at a finer granularity.
This KIP depends on KIP-32.
Search message by Timestamp.
New time-based log index
In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well. Broker will build time index based on LogAppendTime of messages.
Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption.
The time index file needs to be built just like the log index file based on each log segment file.
Use a time index for each log segment to save the timestamp -> log offset at minute granularity
Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:
The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the leaner scan over the log until find the target message. The reason we prefer minute level indexing is because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.
The time index will be built based on the log index file. Every time when a new entry is inserted into log index file, we take a look at the timestamp of the message and if it falls into next minute, we insert an entry to the time index as well. The following table give the summary of memory consumption using different granularity. The number is calculated based on a broker with 3500 partitions.
Users don't typically need to look up offsets with seconds granularity.
Use case discussion
Search message by timestamp
There could be a few reasons people want to search messages by timestamp. One important use case is for disaster recovery.
Imagine people have cluster 1 and cluster 2 at different locations. The two clusters has same data. When cluster 1 goes down, the consumers of cluster 1 will try switching to consume from cluster 2. The problem is which offset should those consumers resume consuming from. The goals here are:
- Not lose messages.
- Reconsume as less as possible duplicate messages that has already been consumed from cluster 1.
The challenge is that there is little cross reference between cluster 1 and cluster 2. The offset checkpoint for cluster 1 does not apply for cluster 2. This is a hard to get accurate resuming point, because the message order and content for each partition could be significantly different between the two clusters. An approximate approach to take in this case is to take a look at the timestamp T of the last consume message from cluster 1 (), and then start to consume from the message produced to cluster 2 at (T - X) where x is some safety buffer based on latency difference between cluster 1 and cluster 2, say 15 min.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Add a timestamp field to log index entry
The most straight forward approach to have a time index is to let the log index files have a timestamp associate with each entry.
Because the index entry size become 16 bytes instead of 8 bytes. The index file size also needs to be doubled. As an example, one of the broker we have has ~3500 partitions. The index file took about 16GB memory. With this new format, the memory consumption would be 32GB.