Status

Current state: Accepted (3.9)

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP inspired by Michelin Kstreamplify and coauthored by Damien Gasparina, Loïc Greffier and Sebastien Viale.

There are several places where an exception could arise in Kafka Streams. Currently, developers can implement a DeserializationExceptionHandler, to handle issues during the deserialization, and a ProductionExceptionHandler for issues happening during the production. 

For issues happening during the process of a message, it's up to the developer to add the required try/catch when they leverage the DSL or the Processor API. All uncaught exceptions will terminate the processing or the StreamThread. This approach is quite tedious and error prone when you are developing a large topology.

This proposal aims to add a new exception handling mechanism to manage exceptions happening during the processing of a message. Similar to the other exception handler, it would include two out of the box implementations: LogAndFail, the default to be backward compatible, and LogAndContinue

This KIP also proposes to unify the method signatures of the DeserializationExceptionHandler and the ProductionExceptionHandler. Currently, the DeserializationExceptionHandler is leveraging the old ProcessorContext and the ProductionExceptionHandler does not have access to any context. As the ProcessorContext is exposing many methods that should not be accessed during the exception handler, e.g. forward(), this KIP introduces a new container class exposing only the metadata of the processing context: ErrorHandlerContext.

This KIP also proposes to catch exceptions that can occurs when handling exceptions for all handlers: DeserializationExceptionHandler, ProductionExceptionHandler, and the brand-new ProcessingExceptionHandler proposed by this KIP.

Proposed Changes

Adding New Processing Exception Handler

We propose to add a new processing exception handler that could be set by the user to handle exceptions happening during the processing of a message. A processing exception handler would be instantiated once per StreamTask.

The interface would be:

ProcessingExceptionHandler.java
package org.apache.kafka.streams.errors;

/**
 * An interface that allows user code to inspect a record that has failed processing
 */
public interface ProcessingExceptionHandler extends Configurable {
 /**
  * Inspect a record and the exception received
  * @param context processing context metadata
  * @param record record where the exception occurred
  * @param exception the actual exception
  */
 ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception);

 public enum ProcessingHandlerResponse {
   /* continue with processing */ 
   CONTINUE(1, "CONTINUE"),
   /* fail the processing and stop */
   FAIL(2, "FAIL");
   ...
 }
}

The processing exception handler will be invoked if an exception happens during the invocation of:

  • ProcessorNode#process
  • StreamTask#punctuate

If the processing exception handler response is equal to ProcessingHandlerResponse.FAIL, then the stream should stop processing records and fails. If the response is equal to ProcessingHandlerResponse.CONTINUE, then the stream should continue processing records.

Adding New Error Handler Context

To not expose sensitive information during the handler invocation, this KIP also introduces a new container class exposing only Processing metadata to the handler, as those metadata could hold longer than the ProcessingContext due the ProductionExceptionHandler:

ErrorHandlerContext.java
/**
 * ErrorHandlerContext interface
 */
public interface ErrorHandlerContext {

    /**
     * Return the topic name of the current input record; could be {@code null} if it is not
     * available.
     *
     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
     * punctuation callback}, or while processing a record that was forwarded by a punctuation
     * callback, the record won't have an associated topic.
     * Another example is
     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
     *
     * @return the topic name
     */
    String topic();

    /**
     * Return the partition id of the current input record; could be {@code -1} if it is not
     * available.
     *
     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
     * punctuation callback}, or while processing a record that was forwarded by a punctuation
     * callback, the record won't have an associated partition id.
     * Another example is
     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
     *
     * @return the partition id
     */
    int partition();

    /**
     * Return the offset of the current input record; could be {@code -1} if it is not
     * available.
     *
     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
     * punctuation callback}, or while processing a record that was forwarded by a punctuation
     * callback, the record won't have an associated offset.
     * Another example is
     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
     *
     * @return the offset
     */
    long offset();

    /**
     * Return the headers of the current source record; could be an empty header if it is not
     * available.
     *
     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
     * punctuation callback}, or while processing a record that was forwarded by a punctuation
     * callback, the record might not have any associated headers.
     * Another example is
     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
     * (and siblings), that do not always guarantee to provide valid headers, as they might be
     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
     *
     * @return the headers
     */
    Headers headers();
	
    /**
     * Return the current processor node id.
     *
     * @return the processor node id
     */
    String processorNodeId();

    /**
     * Return the task id.
     *
     * @return the task id
     */
    TaskId taskId();

    /**
     * Return the current timestamp.
     *
     * <p> If it is triggered while processing a record streamed from the source processor,
     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
     * Note, that an upstream {@link Processor} might have set a new timestamp by calling
     * {@link ProcessorContext#forward(Object, Object, To) forward(..., To.all().withTimestamp(...))}.
     * In particular, some Kafka Streams DSL operators set result record timestamps explicitly,
     * to guarantee deterministic results.
     *
     * <p> If it is triggered while processing a record generated not from the source processor (for example,
     * if this method is invoked from the punctuate call), timestamp is defined as the current
     * task's stream time, which is defined as the largest timestamp of any record processed by the task.
     *
     * @return the timestamp
     */
    long timestamp(); 
}

Updating Production Exception Handler

To be consistent with other handlers, this KIP proposes to expose the new ErrorHandlerContext as a parameter to the Deserialization and Production exception handlers and deprecate the previous handle signature. To be backward compatible with existing handlers, a default implementation would be provided to ensure that the previous handler is invoked by default. A default implementation for the deprecated method is also implemented to allow user to not implement the deprecated method while implementing the interface.


The ProductionExceptionHandler interface will be:

ProductionExceptionHandler.java
/**
 * ...
 */
public interface ProductionExceptionHandler extends Configurable {
    /**
     * ...
     * @deprecated Please use the ProductionExceptionHandlerResponse.handle(metadata, record, exception)
     */
    @Deprecated
    default ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                              final Exception exception) {
        throw new UnsupportedOperationException();
    }        

    /**
     * Inspect a record that we attempted to produce, and the exception that resulted
     * from attempting to produce it and determine whether or not to continue processing.
     *
     * @param context The error handler context metadata 
     * @param record The record that failed to produce
     * @param exception The exception that occurred during production
     */
    @SuppressWarnings("deprecation")
    default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,  
 													  final ProducerRecord<byte[], byte[]> record,
                                                      final Exception exception) {
        return handle(record, exception);
    }

    /**
     * ...
     * @deprecated          Please use the handleSerializationException(record, exception, context)
     */
    @Deprecated
    default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record,
                                                                            final Exception exception) {
        return ProductionExceptionHandlerResponse.FAIL;
    }

    /**
     * Handles serialization exception and determine if the process should continue. The default implementation is to
     * fail the process.
     *
     * @param context       the error handler context metadata  
     * @param record        the record that failed to serialize
     * @param exception     the exception that occurred during serialization
     */
    @SuppressWarnings("deprecation")
    default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, 
 																			final ProducerRecord record,
                                                                            final Exception exception,
																			final SerializationExceptionOrigin origin) {
        return handleSerializationException(record, exception);
    }
}

The DefaultProductionExceptionHandler class which implements the ProductionExceptionHandler interface will deprecate the old handle method and implement the new one.

Adding New Serialization Exception Origin

To identify the origin of the serialization exception, a new SerializationExceptionOrigin would be provided in the serialization handler:

SerializationExceptionOrigin.java
package org.apache.kafka.streams.errors;

enum SerializationExceptionOrigin {
   KEY,
   VALUE
}

Updating Deserialization Exception Handler

The DeserializationExceptionHandler would be:

DeserializationExceptionHandler.java
public interface DeserializationExceptionHandler extends Configurable {
    // ...

	/*
     * ...
     * @deprecated Please use the DeserializationExceptionHandlerResponse.handle(errorContextMetadata, record, errorHandlerContext)
     */ 
    @Deprecated
    default DeserializationHandlerResponse handle(final ProcessorContext context,
                                          final ConsumerRecord<byte[], byte[]> record,
                                          final Exception exception) {
        throw new UnsupportedOperationException();
    }

    /**
     * Inspect a record and the exception received.
     *
     * @param context error handler context
     * @param record record that failed deserialization
     * @param exception the actual exception
     */
     default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
                                          final ConsumerRecord<byte[], byte[]> record,
                                          final Exception exception) {
        return handle(((ErrorHandlerContextImpl) context).convertToProcessorContext(), record, exception);
    }
    // ...
}

Both LogAndContinueExceptionHandler and LogAndFailExceptionHandler classes which implement the DeserializationExceptionHandler will deprecate the old handle method and implement the new one.

Managing Handler Crashes

This KIP also proposes to deal with exceptions that can occur during the handling mechanism.

The proposed change is to fail the stream, by throwing an exception, when any exception occurs during the exception handling. This change could be unified for all the handlers and implemented around the 5 following use-cases:

  • ProcessingExceptionHandler#handle, when handling processing exceptions.
  • ProcessingExceptionHandler#handle, when handling punctuation exceptions.
  • ProductionExceptionHandler#handle, when handling production exceptions.
  • ProductionExceptionHandler#handleSerializationException, when handling serialization exceptions.
  • DeserializationExceptionHandler#handle, when handling deserialization exceptions.

Metrics

The following metrics will be incremented each time the ProcessingExceptionHandler has been invoked and returned CONTINUE.


LEVEL 0LEVEL 1LEVEL 2LEVEL 3

Per-Client

Per-Thread

Per-Task 

Per-Processor-Node 
TAGS

type=stream-metrics,client-id=[client-id]

type=stream-thread-metrics,thread-id=[threadId]

type=stream-task-metrics,thread-id=[threadId],task-id=[taskId]

type=stream-processor-node-metrics,thread-id=[threadId],task-id=[taskId],processor-node-id=[processorNodeId]

dropped-records (rate|total)


INFO

Public Interfaces

Users will be able to set an exception handler through the following new config option which points to a class name.

public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler".

Two implementations of the interface will be provided. The default will be the LogAndFailExceptionHandler to ensure the backward compatibility of the API

// logs the error and returns CONTINUE
public class ProcessingLogAndContinueExceptionHandler implements ProcessingExceptionHandler {...}
  
// logs the error and returns FAIL
public class ProcessingLogAndFailExceptionHandler implements ProcessingExceptionHandler {...}
// Then in StreamsConfig.java:
  
.define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
        Type.CLASS,
        ProcessingLogAndFailExceptionHandler.class.getName(),
        Importance.MEDIUM,
        PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)

Compatibility, Deprecation, and Migration Plan

In the current version, exceptions thrown during the processing of a message are not caught, and thus will be sent to the uncaughtExceptionHandler and will terminate the StreamThread. To be backward compatible, the ProcessingLogAndFailExceptionHandler will be set as default.

Examples

LogAndContinueProcessingExceptionHandler.java
public class LogAndContinueProcessingExceptionHandler implements ProcessingExceptionHandler {
    private static final Logger log = LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class);

    @Override
    public void configure(Map<String, ?> configs) {
        // ignore
    }

    @Override
    public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
        log.warn("Exception caught during message processing, " +
                        "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
                context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
                exception);

        return ProcessingHandlerResponse.CONTINUE;
    }
}

Test Plan

  • A unit test will be implemented to ensure that exceptions are caught by the ProcessingExceptionHandler and the provided implementations.
  • A unit test will be implemented to ensure the backward compatibility.
  • An integration test will be done to ensure that the right exceptions are caught.

Rejected Alternatives

  • Allow per node exception handler, specifying an exception handler could be error prone and would add complexity to the DSL API. If required, it is still possible to rely on simple try/catch to have specific error handling behavior per node.
  • Sending to a DeadLetterQueue. DeadLetterQueue implementation could make sense, but the work would be done in a separate KIP. Sending messages to a dead letter queue is possible, but it is up to the user  to implement it
  • No labels