You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Searching offset by timestamp in Kafka has very coarse granularity (log segment level), it also does not work well when replica is reassigned. This KIP tries to introduce a time-based log index to allow searching message in Kafka by timestemp at a finer granularity.

This KIP depends on KIP-32.

Public Interfaces

No actual public interface change. The search by timestamp function will still be provide by OffsetRequest.

Proposed Changes

New time-based log index

Option 1 - Time based index using LogAppendTime

In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well. Broker will build time index based on LogAppendTime of messages. 

Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption. 

The time index file needs to be built just like the log index file based on each log segment file.

Use a time index for each log segment to save the timestamp -> log offset at minute granularity

Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:

 

Time Index Entry => Timestamp Offset
  Timestamp => int64
  Offset => int32

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the leaner scan over the log until find the target message. The reason we prefer minute level indexing is because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.

The time index will be built based on the log index file. Every time when a new entry is inserted into log index file, we take a look at the timestamp of the message and if it falls into next minute, we insert an entry to the time index as well. The following table give the summary of memory consumption using different granularity. The number is calculated based on a broker with 3500 partitions.

second864003.4 GB
Minute144057 MB

Users don't typically need to look up offsets with seconds granularity.

Option 2 - Time based index using CreateTime

The biggest problem of indexing using CreateTime is that CreateTime can be out of order.

One solution is as below:

  1. Each broker keeps in memory a timestamp index map - Map[TopicPartitionSegment, Map[TimestampByMinute, Offset]]
    1. The timestamp is on minute boundary
    2. The offset is the offset of the first message in the log segment that falls into a minute
  2. Create a timestamp index file for each log segment. The entry in the file is as below:

    Time Index Entry => Timestamp Offset
      Timestamp => int64
      Offset => int32

    So the timestamp index file will simply become a persistent copy of timestamp index map. Broker will load the timestamp map from the file on startup.

  3. When a broker (regardless leader or follower) receives a message, it does the following:
    1. Find which minute MIN the message with offset OFFSET falls in
    2. Check if MIN has already been in the in memory timestamp map for current log segment. If the timestamp does not exist, then the broker add [MIN->OFFSET] to both the in memory timestamp index map and the timestamp index file.
  4. When a log segment is deleted, the broker:
    1. Remove the TopicPartitionSegment key from in memory map
    2. Remove the log segment timestamp index file

Comparison between Option 1 and Option 2

 Option 1Option 2
Accuracy of Searching by time MillisecondLocate to the first message in the log falls into the minute.
Order of timestamp in actual logmonotonically increasingout of order
Broker log retention / rolling policy enforcementSimple to implementNeed to implement separately
Exposure of LogAppendTime to user?YesNot necessarily needed
Memory consumptionUsing memory mapped file. Typically less memory is needed than option 2All entry are in memory. Memory footprint is higher than Option 1
ComplexityBoth options are similar for indexingSimilar to Option 1, but needs separate design to honor log retention / rolling
Application friendliness

User need to track CreateTime (assuming we include it in message) and LogAppendTime

(See further discussion in Use case discussion section)

User only need to track CreateTime

Use case discussion

Use caseGoalSolution with LogAppendTime indexSolution with CreateTime indexComparison
Search by timestamp

Not lose messages

If user want to search for a message with CreateTime CT. They can use CT to search in the LogAppendTime index. Because LogAppendTime > CT for the same message (assuming no skew clock). If the clock is skewed, people can search with CT - X where X is the max skew.

If user want to search for a message with LogAppendTime LAT, they can just search with LAT and get a millisecond accuracy.

User can just search with CT and get a minute level granularity offset.

If the latency in the pipeline is greater than one minute, user might consume less message by using CreateTime index. Otherwise, LogAppendTime index is probably preferred.

Consider the following case:

  1. A message m1 with CreateTime CT arrives broker at LAT1.
  2. Some time later at LAT2, another message m2 with CreateTime CT arrives at broker.

If user want to search with CT after they consumed m2, they will have to reconsume from m1. Depending on how big LAT2 - LAT1 is, the amount of messages to be reconsumed can be very big.

Search by timestamp (bootstrap)
  1. Not lose messages
  2. Consume less duplicate messages

In bootstrap case, all the LAT would be close. For example If user want to process the data in last 3 days and did the following:

  1. Dump a big database into Kafka
  2. Reprocess the message in last 3 days.

In this case, LogAppendTime index does not help too much. That means user needs to filter out the data older than 3 days before dumping them into Kafka.

In bootstrap case, the CreateTime will not change, if user follow the same procedure started in LogAppendTime index section. Searching by timestamp will work.LogAppendTime index needs further attention from user.
Failover from cluster 1 to cluster 2
  1. Not lose messages
  2. Consume less duplicate messages
Similar search by timestamp. User can choose to use CT or LAT of cluster 1 to search on cluster 2. In this case, searching with CT - MaxLatencyOfCluster will provide strong guarantee on not losing messages, but might have some duplicates depending on the difference in latency between cluster 1 and cluster 2.User can use CT to search and get minute level granularity. Duplicates are still not avoidable.In cross cluster fail over case, both solution can provide strong guarantee of not losing messages. Under conditions of
Get lag for consumers Alert when a consumer starts to lag.With LogAppendTime in the message, consumer can easily find out the lag by time and estimate how long it might need to reach the log end.Not supported. 
     

Compatibility, Deprecation, and Migration Plan

The change is backward compatible after KIP-31 and KIP-32 is checked in.

Rejected Alternatives

Add a timestamp field to log index entry

The most straight forward approach to have a time index is to let the log index files have a timestamp associate with each entry.

Log Index Entry => Offset Position Timestamp
  Offset => int32
  Position => int32
  Timestamp => int64

Because the index entry size become 16 bytes instead of 8 bytes. The index file size also needs to be doubled. As an example, one of the broker we have has ~3500 partitions. The index file took about 16GB memory. With this new format, the memory consumption would be 32GB.

 

Time Index Entry => Timestamp Offset
  Timestamp => int64
  Offset => int32
  • No labels