Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP inspired by Michelin kstreamplify and coauthored by Damien Gasparina, Loic Greffier and Sebastien Viale.

Kafka Streams does have multiple exception handlers to handle issues while processing messages. Each handler proposes two options: either to log the faulty message and continue processing, or to fail and stop KafkaStreams, the default value.

Both out-of-the-box implementations are not suitable for most use-cases as stopping Kafka Streams due to a single faulty message might be problematic and logging and skipping is at high risk of being missed if the user does not actually check the logs.

Most applications tend to rely on the Dead Letter Queue (DLQ) pattern: in case of an issue, the faulty message that can not be processed is stored in a separate topic. This approach has many advantages:

  1. It is easy to access or replay faulty messages.
  2. It is easy to configure an automated notification if messages are produced in the DLQ topic.
  3. Many metadata could be added in the DLQ record, e.g. stacktrace, source topic/partition/offset, etc… 
  4. The DLQ topic could have a different retention than the application log.

DLQ pattern is becoming a standard, it is already available out of the box in Kafka Connect. Many applications I worked with already implemented this pattern in Kafka Streams. Including a DLQ feature directly in Kafka Streams would allow users to configure production-ready error handlers without having to write custom code.

Proposed Changes

To allow users to send a record in the Dead letter queue, a new attribute "deadLetterQueueRecord'' will be added in each exception handler's responses. If this attribute is set, KafkaStreams will send the provided record to Kafka.

A new configuration will be added: errors.deadletterqueue.topic.name. When set, this configuration indicates the default exception handler implementation to build a Dead letter queue record during the error handling.

In order to build a valid Dead letter queue payload, some additional information needs to be captured and forwarded in the processor context: the source message raw key and value.

Storing the raw key and the raw value allows us to send those raw information in the DLQ topic without having to infer the right serializer. All metadata, e.g. Exceptions, StackTrace, topic, partitions and offset would be provided in the record headers by default.

Additionally, the ProcessingContext would need to be available in each ExceptionHandler. It is currently not available in the ProductionExceptionHandler, thus the handle method will need to be overloaded to provide the context and a default implementation needs to be provided to ensure backward compatibility.

If the default values are not suitable for an application, developers could still reimplement the required exception handlers to build custom DLQ records.

This proposal is to:

  1. Add a new attribute "deadLetterQueueRecord" in the DeserializationHandlerResponse, ProductionExceptionHandlerResponse and ProcessExceptionHandlerResponse (KIP-1033) enums.
  2. Add a new attribute public static final String DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name".
  3. Change the existing exception handler to produce a DeadLetterQueue record if the parameter errors.deadletterqueue.topic.name is set.
  4. If the DeadLetterQueue record can not be sent to Apache Kafka, the exception would be sent to the Kafka Streams uncaughtExceptionHandler.


Note: to be complete and rely on the ErrorHandlerContext class proposed in KIP-1033, this KIP has a hard dependency on KIP-1033.

Default Dead letter queue record

Key

Key of the input message, null if triggered by punctuate

Value

If available, contains the value of the input message, null if triggered by punctuate

Headers

Existing context headers are automatically forwarded into the new DLQ record

Header: __streams.errors.exception

Name of the thrown exception

Header: __streams.errors.stacktrace

Stacktrace of the thrown exception 

Header: __streams.errors.message

Thrown exception message

Header: __streams.errors.topic

Source input topic, null if triggered by punctuate

Header: __streams.errors.partition

Source input partition, null if triggered by punctuate

Header: __streams.errors.offset

Source input offset, null if triggered by punctuate

Default Dead letter queue topic

By default, this KIP proposes to have on DLQ topic per Kafka Streams application. This topic would not be automatically created by Kafka Streams.

The DLQ topic name is set through the configuration DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name". Users can override the default behavior by implementing custom exception handlers to implement a different DLQ topic strategy if required.

Public Interfaces

StreamsConfig.java

public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";

.define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, // required with no default value
       Type.STRING,
       null, /* default */
       Importance.HIGH,
       ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC)

ProductionExceptionHandler.java

Changes:

  • Adding the public ProducerRecord<byte[], byte[]> deadLetterQueueRecord; attribute in the ProductionExceptionHandlerResponse
  • As the ErrorHandlerContext does not provide the sourceKey/Value in the handle method to limit the memory impact, the Dead Letter Queue record would only contains metadata. handleSerializationException is not impacted.
public interface ProductionExceptionHandler extends Configurable {
   ...


   enum ProductionExceptionHandlerResponse {
       . . . 

       public ProductionExceptionHandlerResponse withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
           this.deadLetterQueueRecord = deadLetterQueueRecord;
           return this;
       }
   }
}

DeserializationExceptionHandler.java

Changes:

  • Adding the public ProducerRecord<byte[], byte[]> deadLetterQueueRecord; attribute in the ProductionExceptionHandlerResponse 
public interface DeserializationExceptionHandler extends Configurable {

   ...

   /**
    * Enumeration that describes the response from the exception handler.
    */
   enum DeserializationHandlerResponse {
        . . .

       public DeserializationHandlerResponse withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
           this.deadLetterQueueRecord = deadLetterQueueRecord;
           return this;
       }
   }
}

ProcessingExceptionHandler

With KIP-1033, a similar behavior would be added to the potential new ProcessingExceptionHandler: adding a public ProducerRecord<byte[], byte[]> deadLetterQueueRecord; attribute in the ProcessingExceptionHandlerResponse.

The goal is to support, regardless of the timeline, DLQ in the ProcessingExceptionHandler if KIP-1033 is approved. The change will either be part of KIP-1034 or KIP-1033 if this KIP is implemented first.

Compatibility, Deprecation, and Migration Plan

To build a valid record for the DeadLetterQueue, the ProductionExceptionHandler.handle method needs to have access to the ProcessingContext. To ensure backward compatibility, the previous interface would be deprecated and the default implementation of the new interface would invoke the previous one.

All other changes are backward compatible and should not impact existing applications.

Test Plan

  • Tests to ensure the backward compatibility of the ProductionExceptionHandler class
  • Tests to ensure that default exception handlers are sending record to the DLQ topic if the DLQ topic name is set
  • Ensure that failure to send the DLQ record kills the StreamThread
  • Ensure that punctuator triggered exceptions are producing the expected payload

Rejected Alternatives

  • Sending multiple DLQ record for a single issue.
  • Managing DeadLetterQueue directly in the DSL by extending the KStreams interface.
  • Providing no default implementation to build the Dead letter queue record and delegating this task to the user.
  • Only providing exception and metadata information in the default DLQ implementation.
  • Adding a new interface, that could be overload by the user, to build the DLQ record.


  • No labels