...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * ErrorHandlerContext interface */ public interface ErrorHandlerContext { /** * Return the topic name of the current input record; could be {@code null} if it is not * available. * * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long) * punctuation callback}, or while processing a record that was forwarded by a punctuation * callback, the record won't have an associated topic. * Another example is * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} * (and siblings), that do not always guarantee to provide a valid topic name, as they might be * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. * * @return the topic name */ String topic(); /** * Return the partition id of the current input record; could be {@code -1} if it is not * available. * * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long) * punctuation callback}, or while processing a record that was forwarded by a punctuation * callback, the record won't have an associated partition id. * Another example is * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} * (and siblings), that do not always guarantee to provide a valid partition id, as they might be * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. * * @return the partition id */ int partition(); /** * Return the offset of the current input record; could be {@code -1} if it is not * available. * * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long) * punctuation callback}, or while processing a record that was forwarded by a punctuation * callback, the record won't have an associated offset. * Another example is * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} * (and siblings), that do not always guarantee to provide a valid offset, as they might be * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. * * @return the offset */ long offset(); /** * Return the headers of the current source record; could be an empty header if it is not * available. * * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long) * punctuation callback}, or while processing a record that was forwarded by a punctuation * callback, the record might not have any associated headers. * Another example is * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} * (and siblings), that do not always guarantee to provide valid headers, as they might be * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. * * @return the headers */ Headers headers(); /** * Return the current processor node id. * * @return the processor node id */ String processorNodeId(); /** * Return the task id. * * @return the task id */ TaskId taskId(); } |
...
Updating Production Exception Handler
To be consistent with other handlers, this KIP proposes to expose the new ErrorHandlerContext as a parameter to the Deserialization and Production exception handlers and deprecate the previous handle signature. To be backward compatible with existing handlers, a default implementation would be provided to ensure that the previous handler is invoked by default. A default implementation for the deprecated method is also implemented to allow user to not implement the deprecated method while implementing the interface.
...