Current state: Accepted [VOTE] KIP-93: Improve invalid timestamp handling in Kafka Streams
Discussion thread: [DISCUSS] KIP-93: Improve invalid timestamp handling in Kafka Streams
JIRA:
Released: 0.10.2.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, Kafka Streams does not handle invalid (i.e., negative) timestamps returned from the TimestampExtractor
gracefully, but fails with an exception, because negative timestamps cannot get handled in a meaningful way for any time based operators like window aggregates or joins.
Negative timestamp can occur for several reason.
-1
if the topic is configured with log.message.timestamp.type=CreateTime
1
)KafkaProducer
does check for negative timestamp an raises an exception for this case preventing invalid timestamp in the first place) If all records happen to have negative timestamps (case 1 and 2), this KIP does not improve the situation much. However, if only some records have negative timestamps (could happen for case 3 and 4 above), we want to improve the situation and make it easier for the user to process such topics. Right now, it is not possible to process these topics at all because the Streams application will raise an exception at some point (only a global exception handler could get registered to prevent the application to die, however, this does not solve the issue as the StreamThread
dies, partitions get reassigned and the next thread hits the same issue again, until not threads are left and the application is dead).
Public Interfaces
The signature of TimestampExtractor
will be changed, to give the user a way to "infer" a timestamp from the current processing progress (i.e., internally tracked stream-time) if no valid TS can be extracted from the record.
// current interface public interface TimestampExtractor { long extract(ConsumerRecord<Object, Object> record); } // new interface public interface TimestampExtractor { long extract(ConsumerRecord<Object, Object> record, long previousTimestamp); // previousTimestamp provides the TS from the latest extracted valid TS for the same partition the current record belongs to } |
Furthermore, default implementation of ConsumerRecordTimestampExtractor
is replaces and new classes will be added to provide more predefined timestamp extractors (see details below):
FailOnInvalidTimestamp
(new default extractor, replacing ConsumerRecordTimestampExtractor
; behavior stays the same)LogAndSkipOnInvalidTimestamp
UsePreviousTimeOnInvalidTimestamp
We want to change Streams to an auto-drop behavior for records with negative timestamps (without any further user notification about any dropped records) to enable users to "step over" those records and keep the app running (instead of running into a runtime exception, which would typically bring down the whole application instance). To guard the user from silently dropping messages by default (and to keep the current fail-fast behavior), we change the default extractor ConsumerRecordTimestampExtractor
to raise an exception if the embedded 0.10 message timestamp is negative, which includes the case where there is no 0.10 timestamp embedded in the message.
Furthermore, we want to add some reference implementations of timestamp extractor for covering common use cases (incl. case 3 above):
For any other behavior, users can still provide a custom timestamp extractor implementation. As the TimestampExtractor
interface will change, users cannot reuse old extractors and thus are made aware of the new behavior, thus case 4 is also covered.
Additionally, we want to add a new streams metric that reports the number of skipped record (as absolute count or percentage or both) to give the user a way to monitor if messages get skipped.
Compatibility, Deprecation, and Migration Plan
This is a breaking, incompatible change because TimestampExtractor
interface gets changed. However, it only affect uses that provide a custom timestamp extractor. By default no code change is required and the overall behavior is the same as before this KIP (using default timestamp extractor user gets an exception in case of a negative timestamp).
StreamsException
) the use cannot recover from the exception anyway (i.e., user cannot recover with current behavior and this will not change)Required code changes:
The feature can be tested via unit tests.
StreamsConfig