Status

Current state: Under Discussion 

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

  Kafka Streams DSL operators silently drop certain records without giving the application any way to inspect or recover them. Examples include:                                 

  • aggregate() , count() , reduce() : drops records with null key or null value
  • windowedBy()  + aggregate() : drops records whose timestamp falls outside the grace period (late records)
  • join()  (stream-table): drops records with null key
  • join()  (foreign-key): drops records with null foreign key extracted from value
  • toTable() : drops records with null key

Today these records are:

  1. Logged at WARN level
  2. Counted in the dropped-records-total metric

But there is no way to capture the dropped records for analysis or reprocessing. This is an observability gap: operators behave differently from deserialization errors or processing exceptions, both of

which can be routed to a Dead Letter Queue (DLQ) via KIP-1034. 

This KIP closes that gap by allowing DSL operators to write dropped records to the DLQ using the same infrastructure introduced in KIP-1034.

Public Interfaces

  1. New method on ProcessingContext

package org.apache.kafka.streams.processor.api;                                                                                                                                                              
                                                                                                                                                                                                               
  public interface ProcessingContext {                                                                                                                                                                         
                  
      /**                                                                                                                                                                                                      
       * Write a record to the Dead Letter Queue topic configured via
       * StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG.
       *                                                                                                                                                                                                       
       * If no DLQ topic is configured, this method is a no-op.
       * The exception is used to populate DLQ error headers but must NOT be thrown by the caller.                                                                                                             
       *                                                                                                                                                                                                       
       * @param record    the record being dropped
       * @param exception the reason the record is being dropped; should not be thrown                                                                                                                         
       */                                                                                                                                                                                                      
      void writeToDlq(Record<?, ?> record, Exception exception);                                                                                                                                               
  }              

                                                                                             
                                                                                                                                                                                                               
  2. New DroppedRecordException  class

 package org.apache.kafka.streams.errors;
                                                                                                                                                                                                               
  public class DroppedRecordException extends RuntimeException {
                                                                                                                                                                                                               
      public enum Reason {
          NULL_KEY,
          NULL_VALUE,
          EXPIRED_WINDOW_RECORD,              
          NULL_FOREIGN_KEY,               
          NULL_JOIN_KEY
      }                                                                                                                                                                                                        
                                          
      private final Reason reason;                                                                                                                                                                             
                                                                                                                                                                                                               
      public DroppedRecordException(final Reason reason) {
          super("Record dropped: " + reason);                                                                                                                                                                  
          this.reason = reason;
      }                                       
                                          
      public Reason reason() {
          return reason;                                                                                                                                                                                       
      }
  } 

                                                                                                                                                                                                    
                

Proposed Changes

DSL operator changes

Each drop site is updated to call context.writeToDlq()  instead of silently dropping. Example with KStreamAggregate:

Before: 

 if (record.key() == null || record.value() == null) {
      LOG.warn("Skipping record due to null key or value. "                                                                                                                                                    
          + "If an actual null key is required as valid business value, "
          + "please read the discussion in the GitHub PR #7679");                                                                                                                                              
      droppedRecordsSensor.record();                             
      return;                                 
  } 

     

After:

if (record.key() == null || record.value() == null) {                                                                                                                                                        
      final DroppedRecordException.Reason reason = record.key() == null                                                                                                                                        
          ? DroppedRecordException.Reason.NULL_KEY                                                                                                                                                             
          : DroppedRecordException.Reason.NULL_VALUE;
      LOG.warn("Skipping record due to {}. If an actual null key is required as valid "
          + "business value, please read the discussion in the GitHub PR #7679", reason);
      context.writeToDlq(record, new DroppedRecordException(reason));                                                                                                                                          
      droppedRecordsSensor.record();                                 
      return;                                                                                                                                                                                                  
  }        

                                                                                                                                                                                          
  The droppedRecordsSensor.record()  call is retained so that existing metrics are unaffected.

  DSL operators covered
                                          
  - KStreamAggregate: null key or value (NULL_KEY, NULL_VALUE)
  - KStreamWindowAggregate: null key or value; expired window record (NULL_KEY, NULL_VALUE, EXPIRED_WINDOW_RECORD)
  - KTableSource: null key (NULL_KEY)
  - KStreamKTableJoinProcessor: null key (NULL_JOIN_KEY)
  - StreamStreamJoinUtil: null key (NULL_JOIN_KEY)
  - FK-join processors: null foreign key (NULL_FOREIGN_KEY)

  ProcessorContextImpl implementation

  writeToDlq()  is implemented in ProcessorContextImpl :

  1. Reads ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG  from the existing StreamsConfig
  2. If no DLQ topic is configured → no-op
  3. Otherwise, calls ExceptionHandlerUtils.buildDeadLetterQueueRecord()  to build a ProducerRecord  with standard error headers
  4. Sends it via the existing RecordCollector


  DLQ error headers written

  The existing header schema from KIP-1034 is reused unchanged:
 

  • __streams.errors.exception : DroppedRecordException
  • __streams.errors.message : e.g., "Record dropped: NULL_KEY"
  • __streams.errors.stacktrace : stack trace
  • __streams.errors.topic : source topic
  • __streams.errors.partition : source partition
  • __streams.errors.offset : source offset

Compatibility, Deprecation, and Migration Plan

  • No existing API is changed or removed.
  • writeToDlq() is a new method added to ProcessingContext. Existing implementations are unaffected.
  • If ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG is not set, behavior is identical to today: records are dropped with a log warning and sensor increment.
  • Existing DLQ consumers are unaffected. The DLQ header schema is unchanged; DroppedRecordException simply appears as the exception class in the existing __streams.errors.exception header.

Test Plan

  • Unit tests for ProcessorContextImpl.writeToDlq(): verify no-op when DLQ is not configured; verify correct headers are written when DLQ is configured
  • Unit tests for each updated DSL operator: verify writeToDlq() is called with the correct DroppedRecordException.Reason
  • Integration test extending DeadLetterQueueIntegrationTest: configure DLQ topic, produce records with null key, verify they appear in the DLQ topic with correct error headers

Rejected Alternatives

1. Throw DroppedRecordException and let ProcessorNode catch it                                                                                                                                               
                                                                                                                                                                                                               
  Rejected because of the "splash radius" problem. If a DSL operator like flatMap() emits multiple records downstream and a downstream operator drops one of them by throwing an exception, all records in the 
  current batch are lost. Only the individual malformed record should be dropped. Using context.writeToDlq() avoids this problem entirely.                                                                     
                                                                                                                                                                                                               
2. Introduce a separate DroppedRecordHandler interface
                                                                                                                                                                                                               
  A new handler interface analogous to ProcessingExceptionHandler could be introduced. However, this adds configuration complexity (a second handler config key, a second interface to implement) while the DLQ
   topic is already configurable via ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG. Routing dropped records through the same DLQ is simpler and consistent with existing behavior.                                
                  
3. Omit the Exception parameter from writeToDlq()                                                                                                                                                            
                                          
  The existing DLQ header schema includes __streams.errors.exception, __streams.errors.message, and __streams.errors.stacktrace. Omitting the exception would break existing DLQ consumers or cause NPEs in    
  clients that expect these headers. Keeping Exception as a required parameter maintains header consistency.

  • No labels