Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Authors: Henry Cai, Thomas Thornton

Motivation

A common queue scheduling feature is delayed messages where the message is not supposed to be delivered or consumed right away.  The use case is a large influx of messages or other activities happening in the system at the moment, the message producer wants to make sure the messages are being consumed/processed in a little bit later time or the message consumption is being spread over a period of time.  Another common use is message retry handling (e.g. retries in the new Kafka Queue feature), when a message consumer/worker cannot process the message due to some transient failures in external systems, usually the worker wants to unacknowledge the message and retry it later.  The retry is usually more ideal to be scheduled at a later time usually with some exponential backoff time interval.

Since Kafka was lacking support for message scheduling or delayed message delivery, users have turned to other queuing systems for these features. For example, users have been using AWS SQS delayed message / delayed topic to deliver messages later within a short time frame (e.g. within 15 minutes) and use DynamoDB or traditional database tables for the delayed message for longer delayed duration.

We are proposing to implement delayed messages for Kafka to fill in this feature gap. Similar to SQS, the proposal is also focusing on messages delayed delivery for a short time window (up to 15 minutes late).  In our use cases, most of the message late delivery is within 1 minute.

Public Interfaces

Delayed Message

We are proposing add a new top level field: deliveredAfter in ProducerRecord

    public class ProducerRecord {

        public long deliverAfter();

        …

    }

The producer will set the deliverAfter timestamp on the message and Kafka system will deliver the message to the consumer at or after the specified timestamp.  Note that the broker has the freedom of delivering that message sometime after that specified timestamp.

When Kafka consumer issues a fetch request to the broker and broker will only return the records when the message's deliverAfter constraint is satisfied.  

Delayed Topic

In order not to affect the regular message processing and similar to SQS design, all the delayed messages need to be enqueued at a specialized topic: DelayedTopic.

Propose adding a new boolean parameter in TopicConfig:

    delayed.delivery.enable=true

The default value of the parameter is false.  Setting the parameter to true will turn the topic into a delayed topic.

Proposed Changes

For the regular kafka topics, the messages are enqueued and dequeued in a FIFO fashion.  When the broker serves the consumer's FetchRequest, the response records are returned/sorted based on the message's offset.  For delayed messages, the messages are supposed to be consumed/sorted based on the message's deliverAfter field. Internally the broker implementation will need to provide a deliverAfter timeIndex to order the messages based on deliverAfter field. For the messages having the same deliverAfter, the order is based on the message's original offset.

Consumer Fetching Behavior

Currently Kafka consumer usually bookkeep and store the offset it last fetched from the broker and when it needs to fetch from the broker again, the consumer will set the fetchOffset in the FetchRequest.PartitionData and the broker will return the records at and after that offset with offset increasing.

With delayed topic, Kafka consumer will bookkeep and store the fetchOffset of the last message from the last FetchResponse stream.  The consumer will set this fetchOffset in the next FetchRequest.PartitionData.  The broker will use the fetchOffset to find the corresponding message it last sent and retrieve the deliverAfter from the message.  Using the pair of deliverAfter/fetchOffset, the broker processes the FetchRequest to find all records whose deliverAfter is at or  after deliverAfter in FetchRequest.PartitionData.  For the messages having the same deliverAfter, the return records will be sorted based on the message's offset. The FetchResposne stream will terminate when the record's deliverAfter is later than the current system clock.

Broker Implementation

In Kafka broker for the regular topic we will append the records into the current log segment file based on the enqueue order and close the log segment file either on time or size trigger.  We will maintain this behavior for the delayed topic enqueue and add a new DeliverAfterIndex to help us to get messages dequeued based on deliverAfter/offset order.

In addition to the regular TimeIndex file for each log segment, we will add a DeliverAfterIndex file which sorts the messages in the segment based on their deliverAfter field in the messages.  For the messages having the same deliverAfter values, the order will be based on the message's offset.

For each entry in DeliverAfterIndex, key is deliverAfter and offset and sorted in that order, the value is the physical position of the record in the log segment file.

   Public class DeliverAfterIndexKey {

      Public long deliverAfter;

      Public long offset;

   }

For the active log segment, the DeliverAfterIndex is kept in memory. When the log segment is closed, the in-memory lookup table will be persisted to file. In the case when the broker crashes before the index persistence can finish, the broker restart will build up the DeliverAfterIndex file and persist it to disk.

In the current UnifiedLog.LogSegments, we kept a ConcurrentNavigableMap<Long, LogSegment> segments data structure to quickly find the log segments satisfying a requesting offset from FetchRequest. We will add a similar lookup data structure ConcurrentNavigableMap<DeliverAfterIndexKey, LogSegment> deliverAfterSegments which returns the segments at or after the requesting deliverAfter/offset pair.  From there LogSegment.slice() method will be refactored to return a batch iterator of records.  Note in the above DeliverAfterIndex design, we are storing the physical location of the message/message-batch in the log segment file so the new BatchIterator can quickly retrieve the underlying records from the log segment file.

Merging streams from multiple log segments

For the regular topics, the messages are naturally inserted/sorted on the message offset in one log segment file and all the messages in the later log segment file are going to be after the messages in the current log segment file.  For delayed messages, with the help from the DeliverAfterIndex file, we can get the messages in one log segment file ordered based on deliverAfter and message offset.  However the deliverAfter for the messages in the next log segment are not necessarily after the deliverAfter for the messages in the previous log segment file since the order of the log segment files is based on enqueue order.  For this reason, when the broker returns the stream of records to the consumer, we will need to do a merge of multiple record streams from multiple log segment files, the merging order will be based on deliverAfter/messageOffset.

Batching behavior

Although the new field deliverAfter is specified on the record/message level, Kafka systems mostly work with the messages in batches for performance reasons.  We will also propose setting a deliverAfter field in RecordBatch level.  The messages in the same batch will satisfy the deliverAfter constraint on the batch level.  For example, for message 1 with deliverAfter set to 10:42 and message 2 with deliverAfter set to 10:44, the batch level deliverAfter will be set to 10:44 so both messages in the batch will be satisfying the constraint.  During message prodding time, the producer will either set the batch level deliverAfter explicitly or implicitly (meaning the producerBatch code will set/adjust the batch level deliverAfter based on the message's deliverAfter inside the batch and in some cases it might decide to split the batch if the message level deliverAfter are too far apart.)

Performance

The enqueue of delayed topics will be a bit slower than the regular topic since we need to build the extra DeliverAfterIndex file.

The dequeue of delayed topic will be a bit slower because we need to a lookup from DeliverAfter index to the log segment file and it is a point read (not a sequential read) and also because we need to do a merge-sorting from multiple records stream from multiple log segment files.  Although we are doing a point retrieval of records in the batch iterator, most of those records are already in memory in page cache so the overhead is not particularly high for recent segment files.  And we are also storing the message location in the log segment file as the value in deliverAfter index to help the retrieval.

Usually the customer for delayed topics can tolerate some performance overhead compared to regular topics.

Note this feature only propose to support short delayed messages (with delay latency up to 15 minutes and majority of those delayed messages are within one minute), so the lookup window for older log segments is small and there should just be a few log segments satisfying the fetch request when most of the messages have very short deliverAfter setting.  For this reason, the merge sorting is also performant.

We are also separating out delayed topic away from the regular topic to not affect regular stream performance.

Tiered Storage

Delayed topics can have remote.storage.enable set to true to have them uploaded to tiered storage.  Tiered storage is already uploading multiple index files together with the log segment file, we will also upload the DeliverAfterIndex files.

With the upper bound of the delayed messages set to 15 minutes, the normal setting of local.retention.ms will be far bigger than 15 minutes.  This means we should be able to resolve the message lookup within the local log segment file and auxiliary indexing structure.

For a catchup read consumer who needs to read from very old logs from tiered storage, we would need to download multiple adjacent log segments together with their deliverAfterIndex files to establish the dequeue order between the messages in those log segment files.

Queue

The delayed messages can naturally support the retry handling in Queues.  When the consumer worker failed to process the message and unack the message, the broker can set a deliverAfter field on the message so the retry of the message delivery can happen sometime later.  When the broker gets the number of messages from Replica class to distribute them to the consumers, it needs to filter out the messages which are not supposed to be delivered at the current time clock.  The refactoring we are going to do in LocalLog.read->LogSegment.read -> LogSegment.slice is going to handle that transparently.

Compatibility, Deprecation, and Migration Plan

This feature does not support:

  • Transactions: Delayed topics cannot participate in transactions due to out of order message delivery going beyond transactional boundaries
  • Log Compaction: Delayed topics will not support cleanup.policy=compact. Log compaction is used primarily to mimic K/V stores and this is not a typical use case with delayed message delivery.
  • Exactly-once: Delayed topics will not participate in exactly-once delivery since exactly-once delivery relies on transaction support and the consumers on delayed topics cannot use READ_COMMITTED isolation level which relies on strict sequential offset ordering

Ordering guarantees:

Delayed topics provide a different ordering guarantee than standard Kafka topics:

  • Standard topics: Messages consumed in offset order per-partition (n before n+1)
  • Delayed topics: Messages consumed in deliverAfter timestamp order, with offset as tie breaker

Backward Compatibility

  • Existing topics are unaffected (delayed.delivery.enable defaults to false)
  • Brokers handle both delayed and standard topics simultaneously
  • Old clients cannot consume from delayed topics (FetchRequest version check)
  • __consumer_offset state remains the same only encoding offset. The broker retrieves the message for the offset and the corresponding deliverAfter.

Migration Strategy

No migration needed. This is a new topic type, not a modification of existing topics.

Deployment order:

  1. Upgrade brokers to support new FetchRequest version
  2. Upgrade consumers needing deliverAfter support to support new FetchRequest version
  3. Create or update topics with delayed.delivery.enable=true

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

  • Kafka Streams with Punctuators: Requires custom application code and additional infrastructure. Users want native enqueue/dequeue API.
  • External Systems (SQS, Redis, RabbitMQ): Forces users outside of Kafka ecosystem, goal is unified platform
  • Application-level filtering: Each consumer must implement delay logic, messages still consume bandwidth and storage during waiting state
  • No labels