Status
Current state: Accepted (3.9)
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP inspired by Michelin Kstreamplify and coauthored by Damien Gasparina, Loïc Greffier and Sebastien Viale.
There are several places where an exception could arise in Kafka Streams. Currently, developers can implement a DeserializationExceptionHandler
, to handle issues during the deserialization, and a ProductionExceptionHandler
for issues happening during the production.
For issues happening during the process of a message, it's up to the developer to add the required try/catch when they leverage the DSL or the Processor API. All uncaught exceptions will terminate the processing or the StreamThread. This approach is quite tedious and error prone when you are developing a large topology.
This proposal aims to add a new exception handling mechanism to manage exceptions happening during the processing of a message. Similar to the other exception handler, it would include two out of the box implementations: LogAndFail
, the default to be backward compatible, and LogAndContinue
.
This KIP also proposes to unify the method signatures of the DeserializationExceptionHandler and the ProductionExceptionHandler. Currently, the DeserializationExceptionHandler is leveraging the old ProcessorContext and the ProductionExceptionHandler does not have access to any context. As the ProcessorContext is exposing many methods that should not be accessed during the exception handler, e.g. forward(), this KIP introduces a new container class exposing only the metadata of the processing context: ErrorHandlerContext.
This KIP also proposes to catch exceptions that can occurs when handling exceptions for all handlers: DeserializationExceptionHandler, ProductionExceptionHandler, and the brand-new ProcessingExceptionHandler proposed by this KIP.
Proposed Changes
Adding New Processing Exception Handler
We propose to add a new processing exception handler that could be set by the user to handle exceptions happening during the processing of a message. A processing exception handler would be instantiated once per StreamTask
.
The interface would be:
package org.apache.kafka.streams.errors; /** * An interface that allows user code to inspect a record that has failed processing */ public interface ProcessingExceptionHandler extends Configurable { /** * Inspect a record and the exception received * @param context processing context metadata * @param record record where the exception occurred * @param exception the actual exception */ ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception); public enum ProcessingHandlerResponse { /* continue with processing */ CONTINUE(1, "CONTINUE"), /* fail the processing and stop */ FAIL(2, "FAIL"); ... } }
The processing exception handler will be invoked if an exception happens during the invocation of:
ProcessorNode#process
StreamTask#punctuate
If the processing exception handler response is equal to ProcessingHandlerResponse.FAIL, then the stream should stop processing records and fails. If the response is equal to ProcessingHandlerResponse.CONTINUE, then the stream should continue processing records.
Adding New Error Handler Context
To not expose sensitive information during the handler invocation, this KIP also introduces a new container class exposing only Processing metadata to the handler, as those metadata could hold longer than the ProcessingContext due the ProductionExceptionHandler:
/** * ErrorHandlerContext interface */ public interface ErrorHandlerContext { /** * Return the topic name of the current input record; could be {@code null} if it is not * available. * * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long) * punctuation callback}, or while processing a record that was forwarded by a punctuation * callback, the record won't have an associated topic. * Another example is * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} * (and siblings), that do not always guarantee to provide a valid topic name, as they might be * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. * * @return the topic name */ String topic(); /** * Return the partition id of the current input record; could be {@code -1} if it is not * available. * * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long) * punctuation callback}, or while processing a record that was forwarded by a punctuation * callback, the record won't have an associated partition id. * Another example is * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} * (and siblings), that do not always guarantee to provide a valid partition id, as they might be * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. * * @return the partition id */ int partition(); /** * Return the offset of the current input record; could be {@code -1} if it is not * available. * * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long) * punctuation callback}, or while processing a record that was forwarded by a punctuation * callback, the record won't have an associated offset. * Another example is * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} * (and siblings), that do not always guarantee to provide a valid offset, as they might be * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. * * @return the offset */ long offset(); /** * Return the headers of the current source record; could be an empty header if it is not * available. * * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long) * punctuation callback}, or while processing a record that was forwarded by a punctuation * callback, the record might not have any associated headers. * Another example is * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} * (and siblings), that do not always guarantee to provide valid headers, as they might be * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. * * @return the headers */ Headers headers(); /** * Return the current processor node id. * * @return the processor node id */ String processorNodeId(); /** * Return the task id. * * @return the task id */ TaskId taskId(); /** * Return the current timestamp. * * <p> If it is triggered while processing a record streamed from the source processor, * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. * Note, that an upstream {@link Processor} might have set a new timestamp by calling * {@link ProcessorContext#forward(Object, Object, To) forward(..., To.all().withTimestamp(...))}. * In particular, some Kafka Streams DSL operators set result record timestamps explicitly, * to guarantee deterministic results. * * <p> If it is triggered while processing a record generated not from the source processor (for example, * if this method is invoked from the punctuate call), timestamp is defined as the current * task's stream time, which is defined as the largest timestamp of any record processed by the task. * * @return the timestamp */ long timestamp(); }
Updating Production Exception Handler
To be consistent with other handlers, this KIP proposes to expose the new ErrorHandlerContext as a parameter to the Deserialization and Production exception handlers and deprecate the previous handle signature. To be backward compatible with existing handlers, a default implementation would be provided to ensure that the previous handler is invoked by default. A default implementation for the deprecated method is also implemented to allow user to not implement the deprecated method while implementing the interface.
The ProductionExceptionHandler interface will be:
/** * ... */ public interface ProductionExceptionHandler extends Configurable { /** * ... * @deprecated Please use the ProductionExceptionHandlerResponse.handle(metadata, record, exception) */ @Deprecated default ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, final Exception exception) { throw new UnsupportedOperationException(); } /** * Inspect a record that we attempted to produce, and the exception that resulted * from attempting to produce it and determine whether or not to continue processing. * * @param context The error handler context metadata * @param record The record that failed to produce * @param exception The exception that occurred during production */ @SuppressWarnings("deprecation") default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) { return handle(record, exception); } /** * ... * @deprecated Please use the handleSerializationException(record, exception, context) */ @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.FAIL; } /** * Handles serialization exception and determine if the process should continue. The default implementation is to * fail the process. * * @param context the error handler context metadata * @param record the record that failed to serialize * @param exception the exception that occurred during serialization */ @SuppressWarnings("deprecation") default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception, final SerializationExceptionOrigin origin) { return handleSerializationException(record, exception); } }
The DefaultProductionExceptionHandler class which implements the ProductionExceptionHandler interface will deprecate the old handle method and implement the new one.
Adding New Serialization Exception Origin
To identify the origin of the serialization exception, a new SerializationExceptionOrigin would be provided in the serialization handler:
package org.apache.kafka.streams.errors; enum SerializationExceptionOrigin { KEY, VALUE }
Updating Deserialization Exception Handler
The DeserializationExceptionHandler would be:
public interface DeserializationExceptionHandler extends Configurable { // ... /* * ... * @deprecated Please use the DeserializationExceptionHandlerResponse.handle(errorContextMetadata, record, errorHandlerContext) */ @Deprecated default DeserializationHandlerResponse handle(final ProcessorContext context, final ConsumerRecord<byte[], byte[]> record, final Exception exception) { throw new UnsupportedOperationException(); } /** * Inspect a record and the exception received. * * @param context error handler context * @param record record that failed deserialization * @param exception the actual exception */ default DeserializationHandlerResponse handle(final ErrorHandlerContext context, final ConsumerRecord<byte[], byte[]> record, final Exception exception) { return handle(((ErrorHandlerContextImpl) context).convertToProcessorContext(), record, exception); } // ... }
Both LogAndContinueExceptionHandler and LogAndFailExceptionHandler classes which implement the DeserializationExceptionHandler will deprecate the old handle method and implement the new one.
Managing Handler Crashes
This KIP also proposes to deal with exceptions that can occur during the handling mechanism.
The proposed change is to fail the stream, by throwing an exception, when any exception occurs during the exception handling. This change could be unified for all the handlers and implemented around the 5 following use-cases:
- ProcessingExceptionHandler#handle, when handling processing exceptions.
- ProcessingExceptionHandler#handle, when handling punctuation exceptions.
- ProductionExceptionHandler#handle, when handling production exceptions.
- ProductionExceptionHandler#handleSerializationException, when handling serialization exceptions.
- DeserializationExceptionHandler#handle, when handling deserialization exceptions.
Metrics
The following metrics will be incremented each time the ProcessingExceptionHandler has been invoked and returned CONTINUE.
LEVEL 0 | LEVEL 1 | LEVEL 2 | LEVEL 3 | |
Per-Client | Per-Thread | Per-Task | Per-Processor-Node | |
---|---|---|---|---|
TAGS | type=stream-metrics,client-id=[client-id] | type=stream-thread-metrics,thread-id=[threadId] | type=stream-task-metrics,thread-id=[threadId],task-id=[taskId] | type=stream-processor-node-metrics,thread-id=[threadId],task-id=[taskId],processor-node-id=[processorNodeId] |
dropped-records (rate|total) | INFO |
Public Interfaces
Users will be able to set an exception handler through the following new config option which points to a class name.
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler".
Two implementations of the interface will be provided. The default will be the LogAndFailExceptionHandler
to ensure the backward compatibility of the API
// logs the error and returns CONTINUE public class ProcessingLogAndContinueExceptionHandler implements ProcessingExceptionHandler {...} // logs the error and returns FAIL public class ProcessingLogAndFailExceptionHandler implements ProcessingExceptionHandler {...} // Then in StreamsConfig.java: .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, Type.CLASS, ProcessingLogAndFailExceptionHandler.class.getName(), Importance.MEDIUM, PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
Compatibility, Deprecation, and Migration Plan
In the current version, exceptions thrown during the processing of a message are not caught, and thus will be sent to the uncaughtExceptionHandler
and will terminate the StreamThread. To be backward compatible, the ProcessingLogAndFailExceptionHandler
will be set as default.
Examples
public class LogAndContinueProcessingExceptionHandler implements ProcessingExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class); @Override public void configure(Map<String, ?> configs) { // ignore } @Override public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) { log.warn("Exception caught during message processing, " + "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(), exception); return ProcessingHandlerResponse.CONTINUE; } }
Test Plan
- A unit test will be implemented to ensure that exceptions are caught by the
ProcessingExceptionHandler
and the provided implementations. - A unit test will be implemented to ensure the backward compatibility.
- An integration test will be done to ensure that the right exceptions are caught.
Rejected Alternatives
- Allow per node exception handler, specifying an exception handler could be error prone and would add complexity to the DSL API. If required, it is still possible to rely on simple try/catch to have specific error handling behavior per node.
- Sending to a DeadLetterQueue. DeadLetterQueue implementation could make sense, but the work would be done in a separate KIP. Sending messages to a dead letter queue is possible, but it is up to the user to implement it