DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Streams DSL operators silently drop certain records without giving the application any way to inspect or recover them. Examples include:
aggregate(),count(),reduce(): drops records with null key or null valuewindowedBy()+aggregate(): drops records whose timestamp falls outside the grace period (late records)join()(stream-table): drops records with null keyjoin()(foreign-key): drops records with null foreign key extracted from valuetoTable(): drops records with null key
Today these records are:
- Logged at WARN level
- Counted in the dropped-records-total metric
But there is no way to capture the dropped records for analysis or reprocessing. This is an observability gap: operators behave differently from deserialization errors or processing exceptions, both of
which can be routed to a Dead Letter Queue (DLQ) via KIP-1034.
This KIP closes that gap by allowing DSL operators to write dropped records to the DLQ using the same infrastructure introduced in KIP-1034.
Public Interfaces
1. New method on ProcessingContext
package org.apache.kafka.streams.processor.api;
public interface ProcessingContext {
/**
* Write a record to the Dead Letter Queue topic configured via
* StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG.
*
* If no DLQ topic is configured, this method is a no-op.
* The exception is used to populate DLQ error headers but must NOT be thrown by the caller.
*
* @param record the record being dropped
* @param exception the reason the record is being dropped; should not be thrown
*/
void writeToDlq(Record<?, ?> record, Exception exception);
}
2. New DroppedRecordException class
package org.apache.kafka.streams.errors;
public class DroppedRecordException extends RuntimeException {
public enum Reason {
NULL_KEY,
NULL_VALUE,
EXPIRED_WINDOW_RECORD,
NULL_FOREIGN_KEY,
NULL_JOIN_KEY
}
private final Reason reason;
public DroppedRecordException(final Reason reason) {
super("Record dropped: " + reason);
this.reason = reason;
}
public Reason reason() {
return reason;
}
}
Proposed Changes
DSL operator changes
Each drop site is updated to call context.writeToDlq() instead of silently dropping. Example with KStreamAggregate:
Before:
if (record.key() == null || record.value() == null) {
LOG.warn("Skipping record due to null key or value. "
+ "If an actual null key is required as valid business value, "
+ "please read the discussion in the GitHub PR #7679");
droppedRecordsSensor.record();
return;
}
After:
if (record.key() == null || record.value() == null) {
final DroppedRecordException.Reason reason = record.key() == null
? DroppedRecordException.Reason.NULL_KEY
: DroppedRecordException.Reason.NULL_VALUE;
LOG.warn("Skipping record due to {}. If an actual null key is required as valid "
+ "business value, please read the discussion in the GitHub PR #7679", reason);
context.writeToDlq(record, new DroppedRecordException(reason));
droppedRecordsSensor.record();
return;
}
The droppedRecordsSensor.record() call is retained so that existing metrics are unaffected.
DSL operators covered
- KStreamAggregate: null key or value (NULL_KEY, NULL_VALUE)
- KStreamWindowAggregate: null key or value; expired window record (NULL_KEY, NULL_VALUE, EXPIRED_WINDOW_RECORD)
- KTableSource: null key (NULL_KEY)
- KStreamKTableJoinProcessor: null key (NULL_JOIN_KEY)
- StreamStreamJoinUtil: null key (NULL_JOIN_KEY)
- FK-join processors: null foreign key (NULL_FOREIGN_KEY)
ProcessorContextImpl implementation
writeToDlq() is implemented in ProcessorContextImpl :
- Reads
ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIGfrom the existingStreamsConfig - If no DLQ topic is configured → no-op
- Otherwise, calls
ExceptionHandlerUtils.buildDeadLetterQueueRecord()to build aProducerRecordwith standard error headers - Sends it via the existing
RecordCollector
DLQ error headers written
The existing header schema from KIP-1034 is reused unchanged:
__streams.errors.exception: DroppedRecordException__streams.errors.message: e.g., "Record dropped: NULL_KEY"__streams.errors.stacktrace: stack trace__streams.errors.topic: source topic__streams.errors.partition: source partition__streams.errors.offset: source offset
Compatibility, Deprecation, and Migration Plan
- No existing API is changed or removed.
- writeToDlq() is a new method added to ProcessingContext. Existing implementations are unaffected.
- If ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG is not set, behavior is identical to today: records are dropped with a log warning and sensor increment.
- Existing DLQ consumers are unaffected. The DLQ header schema is unchanged; DroppedRecordException simply appears as the exception class in the existing __streams.errors.exception header.
Test Plan
- Unit tests for ProcessorContextImpl.writeToDlq(): verify no-op when DLQ is not configured; verify correct headers are written when DLQ is configured
- Unit tests for each updated DSL operator: verify writeToDlq() is called with the correct DroppedRecordException.Reason
- Integration test extending DeadLetterQueueIntegrationTest: configure DLQ topic, produce records with null key, verify they appear in the DLQ topic with correct error headers
Rejected Alternatives
1. Throw DroppedRecordException and let ProcessorNode catch it
Rejected because of the "splash radius" problem. If a DSL operator like flatMap() emits multiple records downstream and a downstream operator drops one of them by throwing an exception, all records in the
current batch are lost. Only the individual malformed record should be dropped. Using context.writeToDlq() avoids this problem entirely.
2. Introduce a separate DroppedRecordHandler interface
A new handler interface analogous to ProcessingExceptionHandler could be introduced. However, this adds configuration complexity (a second handler config key, a second interface to implement) while the DLQ
topic is already configurable via ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG. Routing dropped records through the same DLQ is simpler and consistent with existing behavior.
3. Omit the Exception parameter from writeToDlq()
The existing DLQ header schema includes __streams.errors.exception, __streams.errors.message, and __streams.errors.stacktrace. Omitting the exception would break existing DLQ consumers or cause NPEs in
clients that expect these headers. Keeping Exception as a required parameter maintains header consistency.