...
Public Interfaces
Search message by Timestamptimestamp.
Proposed Changes
New time-based log index
...
Option 1 - Time based index using LogAppendTime
In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well. Broker will build time index based on LogAppendTime of messages.
...
The time index file needs to be built just like the log index file based on each log segment file.
Use a time index for each log segment to save the timestamp -> log offset at minute granularity
Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:
...
Users don't typically need to look up offsets with seconds granularity.
Option 2 - Time based index using CreateTime
The biggest problem of indexing using CreateTime is that CreateTime can be out of order.
One solution is as below:
- Each broker keeps in memory a timestamp index map - Map[TopicPartitionSegment, Map[TimestampByMinute, Offset]]
- The timestamp is on minute boundary
- The offset is the offset of the first message in the log segment that falls into a minute
Create a timestamp index file for each log segment. The entry in the file is as below:
Code Block language java Time Index Entry => Timestamp Offset Timestamp => int64 Offset => int32
- When a broker, regardless leader or follower, receives a message, it does the following
- Find which minute MIN the message with offset OFFSET falls in
- Check if MIN has already been in the in memory timestamp map for current log segment. If the timestamp does not exist, then the broker add [MIN->OFFSET] to both the in memory timestamp index map and the timestamp index file.
Comparison between Option 1 and Option 2
Option 1 | Option 2 | |
---|---|---|
Accuracy of Searching by time | Millisecond | Locate to the first message in the log falls into the minute. |
Order of timestamp in actual log | monotonically increasing | out of order |
Broker log retention / rolling policy enforcement | Simple to implement | Need to implement separately |
Exposure of LogAppendTime to user? | Yes | Not necessarily needed |
Memory consumption | Using memory mapped file. Typically less memory is needed than option 2 | All entry are in memory. Memory footprint is higher than Option 1 |
Complexity | Both options are similar for indexing | Similar to Option 1, but needs separate design to honor log retention / rolling |
Application friendliness | User need to track CreateTime (assuming we include it in message) and LogAppendTime (See further discussion in Use case discussion section) | User only need to track CreateTime |
Use case discussion
...
Use case | Goal | Solution with LogAppendTime index | Solution with CreateTime index | Comparison |
---|---|---|---|---|
Search |
...
by timestamp |
There could be a few reasons people want to search messages by timestamp. One important use case is for disaster recovery.
Imagine people have cluster 1 and cluster 2 at different locations. The two clusters has same data. When cluster 1 goes down, the consumers of cluster 1 will try switching to consume from cluster 2. The problem is which offset should those consumers resume consuming from. The goals here are:
- Not lose messages.
- Reconsume as less as possible duplicate messages that has already been consumed from cluster 1.
The challenge is that there is little cross reference between cluster 1 and cluster 2. The offset checkpoint for cluster 1 does not apply for cluster 2. This is a hard to get accurate resuming point, because the message order and content for each partition could be significantly different between the two clusters. An approximate approach to take in this case is to take a look at the timestamp T of the last consume message from cluster 1 (), and then start to consume from the message produced to cluster 2 at (T - X) where x is some safety buffer based on latency difference between cluster 1 and cluster 2, say 15 min.
Compatibility, Deprecation, and Migration Plan
...
Not lose messages | If user want to search for a message with CreateTime CT. They can use CT to search in the LogAppendTime index. Because LogAppendTime > CT for the same message (assuming no skew clock). If the clock is skewed, people can search with CT - X where X is the max skew. If user want to search for a message with LogAppendTime LAT, they can just search with LAT and get a millisecond accuracy. | User can just search with CT and get a minute level granularity offset. | If the latency in the pipeline is greater than one minute, user might consume less message by using CreateTime index. Otherwise, LogAppendTime index is probably preferred. Consider the following case:
If user want to search with CT after they consumed m2, they will have to reconsume from m1. Depending on how big LAT2 - LAT1 is, the amount of messages to be reconsumed can be very big. | |
Search by timestamp (bootstrap) |
| In bootstrap case, all the LAT would be close. For example If user want to process the data in last 3 days and did the following:
In this case, LogAppendTime index does not help too much. That means user needs to filter out the data older than 3 days before dumping them into Kafka. | In bootstrap case, the CreateTime will not change, if user follow the same procedure started in LogAppendTime index section. Searching by timestamp will work. | LogAppendTime index needs further attention from user. |
Failover from cluster 1 to cluster 2 |
| Similar search by timestamp. User can choose to use CT or LAT of cluster 1 to search on cluster 2. In this case, searching with CT - MaxLatencyOfCluster will provide strong guarantee on not losing messages, but might have some duplicates depending on the difference in latency between cluster 1 and cluster 2. | User can use CT to search and get minute level granularity. Duplicates are still not avoidable. | In cross cluster fail over case, both solution can provide strong guarantee of not losing messages. Under conditions of |
Get lag for consumers | Alert when a consumer starts to lag. | With LogAppendTime in the message, consumer can easily find out the lag by time and estimate how long it might need to reach the log end. | Not supported. | |
Compatibility, Deprecation, and Migration Plan
The change is backward compatible after KIP-31 and KIP-32 is checked in.
...
Rejected Alternatives
Add a timestamp field to log index entry
...
Because the index entry size become 16 bytes instead of 8 bytes. The index file size also needs to be doubled. As an example, one of the broker we have has ~3500 partitions. The index file took about 16GB memory. With this new format, the memory consumption would be 32GB.
Code Block | ||
---|---|---|
| ||
Time Index Entry => Timestamp Offset
Timestamp => int64
Offset => int32 |