Status

Current stateDRAFT 

Discussion thread: here 

JIRA: here

Motivation

Kafka Streams maintains a monotonically increasing stream-time, i.e. the maximum observed record timestamp in a task. A single record with a far-future timestamp (e.g., clock misconfiguration producing ts = year 2099) permanently corrupts stream-time, causing:                                                                          

  1.  All subsequent normal records dropped — windowCloseTime = streamTime - gracePeriodMs ≈ 2099, so every legitimate record's window appears expired and is silently discarded.     
  2.  RocksDB time segments deleted, the window state store partitions data into time-based segments, a poisoned stream-time marks all existing segments as expired, triggering immediate deletion of all historical window state, and irreversible data loss.                                                                               
  3. Stream-time punctuators misfiring — STREAM_TIME punctuators fire based on stream-time advancement; with stream-time at year 2099, every subsequent record triggers the punctuator, causing massive CPU waste and incorrect business logic timing.                                                                                       
                    

 There is currently no mechanism to detect or bound the impact of a single bad-timestamp record. 

Public Interfaces

New property:

NameTypeDefaultImportanceDeprecatedRemovedDescription

max.future.record.timestamp.difference.ms



long-1HIGH

The maximum number of milliseconds a record's timestamp may exceed the current stream time. Records beyond this threshold are dropped without updating stream-time. Their offsets are committed, so they are not replayed on restart. Set to -1 to disable (default, preserves existing behaviour).                                                                                                                   │

Proposed Changes

  New config max.future.record.timestamp.difference.ms (default -1, disabled).

When set, records whose timestamp - streamTime > threshold are intercepted in PartitionGroup.nextRecord() before stream-time is updated. They are returned as  CorruptedRecord, the existing mechanism that skips doProcess() but still commits the offset, ensuring no replay on restart and no stream-time update.               
                                                                                                                                                                       

Config propagates via the existing StreamsConfig → TopologyConfig → TaskConfig → PartitionGroup path, identical to maxTaskIdleMs.    


Metrics        

Dropped future records increment the existing dropped-records-rate and dropped-records-total task-level sensors. No new metrics are introduced; this is consistent with how negative-timestamp drops are handled today.    


Future Work

KAFKA-20439: Add Dead Letter Queue (DLQ) support for future records — allow dropped future records to be routed to a configurable DLQ topic instead of being silently  discarded. This is intentionally deferred to keep this KIP focused on the core protection mechanism.               

Compatibility   
                                                                                                                                                                     

  1. Default is -1 (disabled) — fully backwards compatible, no behaviour change for existing applications.
  2.  Existing stream-time semantics are unchanged when config is not set                                                                                                
                     

Test Plan

Since this is a fairly small change, unit tests and integration tests should be sufficient.

Rejected Alternatives

While loop in PartitionGroup.nextRecord() Drop the future record and loop to the next record within nextRecord(). Rejected because the record is already dequeued via queue.poll(), returning without committing the offset means the record would be replayed on restart. 



  • No labels