Current state: Under Discussion
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Kafka has a few timestamp based functions, including
- Searching message by timestamp
- Time based log rolling
- Time based log retention.
Currently these operations depend on the create time / modification time of the log segment file. This has a few issues.
- Searching offset by timestamp has very coarse granularity (log segment level), it also does not work well when replica is reassigned.
- The time based log rolling and retention does not work well when replica is reassigned.
In this KIP we propose introducing a time based log index using the timestamp of the messages introduced in KIP-32.
This KIP introduce a new configuration time.index.interval.ms on the broker side to control the granularity of time based index.
Besides that, there will be some behavioral changes to time based log retention and log rolling.
- The log retention will be changed to base on the time index of a log segment instead of basing on the last modification time of the log segment file.
- The time based log rolling will have the following change: The log segment will roll out when log.roll.ms has elapsed since the largest timestamp of the messages in the log segment.
Add a new time-based log index
Broker will build time index based on the timestamp of the messages. The log index works for both LogAppendTime and CreateTime.
Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption.
Use a time index for each log segment to save the (timestamp -> log offset) at a configurable granularity
Create another index file for each log segment with name SegmentBaseOffset.time.index. The density of the index is defined by time.index.interval.ms and index.interval.bytes configuration.
The time index entry format is:
The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the linear scan over the log until find the target message. Although the granularity is configurable, it is recommended to have a minute level granularity because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.
The following table give the summary of memory consumption of one day using different granularity. The number is calculated based on a broker with 3500 partitions.
Add time.index.interval.ms configuration to broker
This configuration allows user to change granularity of indexing
Build the time index
Based on the proposal in KIP-32, the broker will build the time index in the following way:
- When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message will be appended to the log.
- The timestamp will either be LogAppendTime or CreateTime depending on the configuration.
- We will insert a time index entry in the following scenarios:
- If (the_largest_message_timestamp_ever_seen - the_timestamp_of_the_last_time_index_entry) >= time.index.interval.ms AND the broker has appended more than index.interval.bytes since last time index entry insertion.
- When a log segment is closed, if the message with largest timestamp is in this closed segment, the broker will insert a time index entry to the time index. The time index entry points to that message with largest timestamp.
- When a new log segment is created, the broker will insert a time index entry to the time index of the new log segment when appending the first message whose timestamp is greater than the timestamp of last time index entry.
- It is possible that a log segment does not have any time index entry if all the messages has smaller timestamp than the previous log segments. In that case the time based index would be empty.
- The default initial / max size of the time index files is the same as offset index files.
- If all the messages in a log segment have message.format.version before 0.10.0, broker will insert a time index entry (last_modification_time_of_the_segment -> offset_of_the_last_message_in_the_segment) to the time index.
On broker startup, the broker will need to find the latest timestamp of the current active log segment. The latest timestamp may needed for the next log index append. So the broker will need to scan from the current active log segment back to earlier log segment until it finds the latest timestamp of messages.
Enforce time based log retention
To enforce time based log retention, the broker will check the last time index entry of a log segment. The timestamp will be the latest timestamp of the messages in the log segment. So if that timestamp expires, the broker will delete the log segment. If the log index is empty, the broker will check the previous time index.
Enforce time based log rolling
Currently time based log rolling is based on the creating time of the log segment. With this KIP, the time based rolling would be changed to based on the largest timestamp ever seen. A new log segment will be rolled out if current time is greater than largest timestamp ever seen + log.roll.ms. When message.timestamp.type=CreateTime, user should set max.message.time.difference.ms appropriately together with log.roll.ms to avoid frequent log segment roll out.
Search message by timestamp
Searching by timestamp will have better accuracy. The guarantees provided are:
- The messages whose timestamp are after the searched timestamp will be consumed.
- Some messages with earlier timestamp might also be consumed.
Use case discussion
|Use case||Goal||Solution with LogAppendTime index||Solution with CreateTime index||Comparison|
|1||Search by timestamp|
Not lose messages
If user want to search for a message with CreateTime CT. They can use CT to search in the LogAppendTime index. Because LogAppendTime > CT for the same message (assuming no skew clock). If the clock is skewed, people can search with CT - X where X is the max skew.
If user want to search for a message with LogAppendTime LAT, they can just search with LAT and get a millisecond accuracy.
|User can just search with CT and get a minute level granularity offset.|
If the latency in the pipeline is greater than one minute, user might consume less message by using CreateTime index. Otherwise, LogAppendTime index is probably preferred.
Consider the following case:
If user want to search with CT after they consumed m2, they will have to reconsume from m1. Depending on how big LAT2 - LAT1 is, the amount of messages to be reconsumed can be very big.
|2||Search by timestamp (bootstrap)|
In bootstrap case, all the LAT would be close. For example If user want to process the data in last 3 days and did the following:
In this case, LogAppendTime index does not help too much. That means user needs to filter out the data older than 3 days before dumping them into Kafka.
|In bootstrap case, the CreateTime will not change, if user follow the same procedure started in LogAppendTime index section. Searching by timestamp will work.||LogAppendTime index needs further attention from user.|
|3||Failover from cluster 1 to cluster 2|
Similar search by timestamp. User can choose to use CT or LAT of cluster 1 to search on cluster 2. In this case, searching with CT - MaxLatencyOfCluster will provide strong guarantee on not losing messages, but might have some duplicates depending on the difference in latency between cluster 1 and cluster 2.
User can use CT to search and get minute level granularity. Duplicates are still not avoidable.
There can be some tricky cases here. Consider the following case :
In this case, m1 is created before m2. Due to latency difference, m1 arrives cluster 1 then m2 does, m2 arrives cluster 2 before m1 does.
If a consumer consumed m2 in cluster 2 and fail over to cluster 1, simply search by CT2 will miss m1 because m1 has larger offset than m2 in cluster 2 but smaller offset than m2 in cluster 1. So the same trick or CT - MaxLatencyOfCluster is still needed.
|In cross cluster fail over case, both solution can provide strong guarantee of not losing messages. But both needs to depend on the knowledge of MaxLatencyOfCluster.|
|4||Get lag for consumers by time||Know how long a consumer is lagging by time.||With LogAppendTime in the message, consumer can easily find out the lag by time and estimate how long it might need to reach the log end.||Not supported.|
|5||Broker side latency metric||Let the broker to report latency of each topic. i.e. LAT - CT||The latency can be reported as LAT - CT.||The latency can be reported as System.currentTimeMillis - CT||The two solutions are the same. This latency information can be used for MaxLatencyOfCluster in use case 3.|
From the use cases list above, generally having a LogAppendTime index is better than having a CreateTime based timestamp.
Compatibility, Deprecation, and Migration Plan
The change is backward compatible after KIP-31 and KIP-32 are checked in.
Broker will do the followings for log retention during migration:
- The broker will rebuild the time based log index for each segment if the segment does not have a time index.
- If the message.format.version of a topic is before 0.10.0, the time index will only have one entry (last_modification_time_of_the_segment -> offset_of_the_last_message_in_the_segment)
- If the message.format.version of a topic is on 0.10.0, the broker will scan the messages in a log segment and rebuild the timestamp. If no message has a timestamp in the segment, the entry (last_modification_time_of_the_segment -> offset_of_the_last_message_in_the_segment) will be inserted to the log index. Otherwise the time index will be built in the normal way.
- After the entire cluster is migrated to use time based log index for log retention. The broker will enforce log retention using time index. Given what we do in step 1, the behavior is:
- For segments only has messages whose versions are before 0.10.0, the entry with last modification time in the time index will be used for retention.
- For segments has at least one message whose version is after 0.10.0, the max timestamp of the messages will be used for log retention.
Broker will do the followings for log rolling during migration.
- On startup, broker will initially use the segment last modification time as the max message timestamp.
- If a new message whose version is after 0.10.0 and its timestamp is greater than current max message timestamp. The broker updates the current max message timestamp.
- Broker always use the difference between current time and max message timestamp to decide whether roll out a new log segment or not.
Add a timestamp field to log index entry
The most straight forward approach to have a time index is to let the log index files have a timestamp associate with each entry.
Because the index entry size become 16 bytes instead of 8 bytes. The index file size also needs to be doubled. As an example, one of the broker we have has ~3500 partitions. The index file took about 16GB memory. With this new format, the memory consumption would be 32GB.
Option 1 - Time based index using LogAppendTime
In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well. Broker will build time index based on LogAppendTime of messages.
Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption.
The time index file needs to be built just like the log index file based on each log segment file.
Use a time index for each log segment to save the timestamp -> log offset at minute granularity
Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:
The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the leaner scan over the log until find the target message. The reason we prefer minute level indexing is because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.
The time index will be built based on the log index file. Every time when a new entry is inserted into log index file, we take a look at the timestamp of the message and if it falls into next minute, we insert an entry to the time index as well. The following table give the summary of memory consumption using different granularity. The number is calculated based on a broker with 3500 partitions.
Users don't typically need to look up offsets with seconds granularity.
Option 2 - Time based index using CreateTime
Another option is to build index based on CreateTime of messages. Similar to option 1, we are going to have one time index file per log segment.
The biggest challenge of indexing using CreateTime is that CreateTime can be out of order.
One solution is as below:
- Each broker keeps in memory a timestamp index map - Map[TopicPartitionSegment, Map[TimestampByMinute, Offset]]
- The timestamp is on minute boundary
- The offset is the offset of the first message in the log segment that falls into a minute
Create a timestamp index file for each log segment. The entry in the file is in following format:
So the timestamp index file will simply become a persistent copy of timestamp index map. Broker will load the timestamp map from the file on startup.
- When a broker (regardless leader or follower) receives a message, it does the following:
- Find which minute MIN the message with offset OFFSET falls in
- Check if MIN has already been in the in memory timestamp map for current log segment. If the timestamp does not exist, then the broker add [MIN->OFFSET] to both the in memory timestamp index map and the timestamp index file.
- When a log segment is deleted, the broker:
- Remove the TopicPartitionSegment key from in memory map
- Remove the log segment timestamp index file