You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: WIP - DRAFT

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Searching offset by timestamp in Kafka has very coarse granularity (log segment level), it also does not work well when replica is reassigned. This KIP tries to introduce a time-based log index to allow searching message in Kafka by timestemp at a finer granularity.

This KIP depends on KIP-32.

Public Interfaces

Search message by Timestamp.

Proposed Changes

 

New time-based log index

 

In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well. Broker will build time index based on LogAppendTime of messages. 

Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption. 

The time index file needs to be built just like the log index file based on each log segment file.

Use a time index for each log segment to save the timestamp -> log offset at minute granularity

Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:

 

Time Index Entry => Timestamp Offset
  Timestamp => int64
  Offset => int32

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the leaner scan over the log until find the target message. The reason we prefer minute level indexing is because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.

The time index will be built based on the log index file. Every time when a new entry is inserted into log index file, we take a look at the timestamp of the message and if it falls into next minute, we insert an entry to the time index as well. The following table give the summary of memory consumption using different granularity. The number is calculated based on a broker with 3500 partitions.

second864003.4 GB
Minute144057 MB

Users don't typically need to look up offsets with seconds granularity.

Use case discussion

 

Search message by timestamp

 

There could be a few reasons people want to search messages by timestamp. One important use case is for disaster recovery.

 

Imagine people have cluster 1 and cluster 2 at different locations. The two clusters has same data. When cluster 1 goes down, the consumers of cluster 1 will try switching to consume from cluster 2. The problem is which offset should those consumers resume consuming from. The goals here are:

 

  1. Not lose messages.
  2. Reconsume as less as possible duplicate messages that has already been consumed from cluster 1.

 

The challenge is that there is little cross reference between cluster 1 and cluster 2. The offset checkpoint for cluster 1 does not apply for cluster 2. This is a hard to get accurate resuming point, because the message order and content for each partition could be significantly different between the two clusters. An approximate approach to take in this case is to take a look at the timestamp T of the last consume message from cluster 1 (), and then start to consume from the message produced to cluster 2 at (T - X) where x is some safety buffer based on latency difference between cluster 1 and cluster 2, say 15 min.

 

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels