Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP inspired by Michelin kstreamplify and coauthored by Damien Gasparina, Loic Greffier and Sebastien Viale.

Kafka Streams does have multiple exception handlers to handle issues while processing messages. Each handler proposes two options: either to log the faulty message and continue processing, or to fail and stop KafkaStreams, the default value.

Both out-of-the-box implementations are not suitable for most use-cases as stopping Kafka Streams due to a single faulty message might be problematic and logging and skipping is at high risk of being missed if the user does not actually check the logs.

Most applications tend to rely on the Dead Letter Queue (DLQ) pattern: in case of an issue, the faulty message that can not be processed is stored in a separate topic. This approach has many advantages:

  1. It is easy to access or replay faulty messages.
  2. It is easy to configure an automated notification if messages are produced in the DLQ topic.
  3. Many metadata could be added in the DLQ record, e.g. stacktrace, source topic/partition/offset, etc… 
  4. The DLQ topic could have a different retention than the application log.

DLQ pattern is becoming a standard, it is already available out of the box in Kafka Connect. Many applications I worked with already implemented this pattern in Kafka Streams. Including a DLQ feature directly in Kafka Streams would allow users to configure production-ready error handlers without having to write custom code.

Proposed Changes

To allow users to send a record in the Dead letter queue, a new attribute "deadLetterQueueRecord'' will be added in each exception handler's responses. If this attribute is set, KafkaStreams will send the provided record to Kafka.

A new configuration will be added: errors.deadletterqueue.topic.name. When set, this configuration indicates the default exception handler implementation to build a Dead letter queue record during the error handling.

In order to build a valid Dead letter queue payload, some additional information needs to be captured and forwarded in the processor context: the source message raw key and value.

Storing the raw key and the raw value allows us to send those raw information in the DLQ topic without having to infer the right serializer. All metadata, e.g. Exceptions, StackTrace, topic, partitions and offset would be provided in the record headers by default.

Additionally, the ProcessingContext would need to be available in each ExceptionHandler. It is currently not available in the ProductionExceptionHandler, thus the handle method will need to be overloaded to provide the context and a default implementation needs to be provided to ensure backward compatibility.

If the default values are not suitable for an application, developers could still reimplement the required exception handlers to build custom DLQ records.

This proposal is to:

  1. Add a new attribute "deadLetterQueueRecord" in the DeserializationHandlerResponse, ProductionExceptionHandlerResponse and ProcessExceptionHandlerResponse (KIP-1033) enums.
  2. Add a new attribute public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name".
  3. Change the existing exception handler to produce a DeadLetterQueue record if the parameter errors.deadletterqueue.topic.name is set.
  4. If the DeadLetterQueue record can not be sent to Apache Kafka, the exception would be sent to the Kafka Streams uncaughtExceptionHandler.


Default Dead letter queue record

Key

Key of the input message that triggered the sub-topology, null if triggered by punctuate

Value

If available, contains the value of the input message that triggered the sub-topology, null if triggered by punctuate

Headers

Existing context headers are automatically forwarded into the new DLQ record

Header: __streams.errors.exception

Name of the thrown exception

Header: __streams.errors.stacktrace

Stacktrace of the thrown exception 

Header: __streams.errors.message

Thrown exception message

Header: __streams.errors.topic

Source input topic, null if triggered by punctuate

Header: __streams.errors.partition

Source input partition, null if triggered by punctuate

Header: __streams.errors.offset

Source input offset, null if triggered by punctuate

Default Dead letter queue topic

By default, this KIP proposes to have on DLQ topic per Kafka Streams application. This topic would not be automatically created by Kafka Streams.

The DLQ topic name is set through the configuration ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name". Users can override the default behavior by implementing custom exception handlers to implement a different DLQ topic strategy if required.

Public Interfaces

StreamsConfig.java

Changes:

  • Adding the errors.deadletterqueue.topic.name configuration. This configuration is only modifying the behavior of the out of the box exceptions handlers and would have no effect if a custom exception handlers is implemented, for example:
    • if errors.deadletterqueue.topic.name=null (default), then no records are sent to any dead letter queue
    • if errors.deadletterqueue.topic.name is set, exceptions happening during processing, production or deserialization will result in the raw source messages that trigger the topology to be send to the DLQ topic. The processing might or might not continue depending of the configuration of the processing.exception.handler, default.production.exception.handler and default.deserialization.exception.handler configurations
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";

.define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
       Type.STRING,
       null, /* default */
       Importance.HIGH,
       ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC)


If the user implement a custom exception handler, it is up to the custom handler to build DLQ records to send, in this case, the errors.deadletterqueue.topic.name configuration has no impact.

    @Override
    public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
        return ProcessingHandlerResponse.CONTINUE
                .andAddToDeadLetterQueue(Collections.singletonList(
                        new ProducerRecord<>("app-dlq", "Hello".getBytes(StandardCharsets.UTF_8), "World".getBytes(StandardCharsets.UTF_8))
                ));
    }

ProductionExceptionHandler.java

Changes:

  • Adding the public ProducerRecord<byte[], byte[]> deadLetterQueueRecord; attribute in the ProductionExceptionHandlerResponse
  • As the ErrorHandlerContext does not provide the sourceKey/Value in the handle method to limit the memory impact, the Dead Letter Queue record would only contains metadata. handleSerializationException is not impacted.
public interface ProductionExceptionHandler extends Configurable {
   ...


   enum ProductionExceptionHandlerResponse {
       . . . 
       public Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]> deadLetterQueueRecords() {
          return this.deadLetterQueueRecord;
       }

       public ProductionExceptionHandlerResponse andAddToDeadLetterQueue(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
           this.deadLetterQueueRecord = deadLetterQueueRecord;
           return this;
       }
   }
}

DeserializationExceptionHandler.java

Changes:

  • Adding the public ProducerRecord<byte[], byte[]> deadLetterQueueRecord; attribute in the ProcessingHandlerResponse 
public interface DeserializationExceptionHandler extends Configurable {

   ...

   /**
    * Enumeration that describes the response from the exception handler.
    */
   enum DeserializationHandlerResponse {
        . . .

       public DeserializationHandlerResponse andAddToDeadLetterQueue(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
           this.deadLetterQueueRecord = deadLetterQueueRecord;
           return this;
       }
   }
}

ProcessingExceptionHandler.java

Changes:

  • Adding the public ProducerRecord<byte[], byte[]> deadLetterQueueRecord; attribute in the ProductionExceptionHandlerResponse 
public interface DeserializationExceptionHandler extends Configurable {

   ...

   /**
    * Enumeration that describes the response from the exception handler.
    */
   enum DeserializationHandlerResponse {
        . . .

       public DeserializationHandlerResponse andAddToDeadLetterQueue(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
           this.deadLetterQueueRecord = deadLetterQueueRecord;
           return this;
       }
   }
}

ErrorHandlerContext.java

Changes:

  • Adding the public byte[] sourceRawKey and byte[] sourceRawValue in the ErrorHandlerContext pointing to the source record data
ErrorHandlerContext.java
/**
 * ErrorHandlerContext interface
 */
public interface ErrorHandlerContext {
    . . .
     
    /**
    * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
     *
     * <p> If this method is invoked within a {@link Punctuator#punctuate(long)
     * punctuation callback}, or while processing a record that was forwarded by a punctuation
     * callback, it will return null.
     *
     * <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
     * to the repartition topic.
     *
     * <p> Always returns null if this method is invoked within a
     * {@link ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)}
     *
     * @return the raw byte of the key of the source message
     */
     byte[] sourceRawKey();
 
     /**
     * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
     *
     * <p> If this method is invoked within a {@link Punctuator#punctuate(long)
     * punctuation callback}, or while processing a record that was forwarded by a punctuation
     * callback, it will return null.
     *
     * <p> If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent
     * to the repartition topic.
     *
     * <p> Always returns null if this method is invoked within a
     * {@link ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)}
     *
     * @return the raw byte of the value of the source message
     */
     byte[] sourceRawValue();
 
    . . .
}


Compatibility, Deprecation, and Migration Plan

All changes are backward compatible and should not impact existing applications.

Test Plan

  • Tests to ensure the backward compatibility of the ProductionExceptionHandler class
  • Tests to ensure that default exception handlers are sending record to the DLQ topic if the DLQ topic name is set
  • Ensure that failure to send the DLQ record kills the StreamThread
  • Ensure that punctuator triggered exceptions are producing the expected payload

Rejected Alternatives

  • Managing DeadLetterQueue directly in the DSL by extending the KStreams interface.
  • Providing no default implementation to build the Dead letter queue record and delegating this task to the user.
  • Only providing exception and metadata information in the default DLQ implementation.
  • Adding a new interface, that could be overloaded by the user, to build the DLQ records.


  • No labels