Status

Current state: Under Discussion

Discussion thread: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Connect currently supports basic error tolerance mechanisms (`none`, `all`) and dead letter queues (DLQ) for error reporting. However, there is no flexible way for developers to define how errors should be handled during specific stages of record processing.

Kafka Streams already provides this flexibility via the ProcessingExceptionHandler interface, which allows stream applications to determine whether to continue, fail, or log on processing errors. This KIP proposes a similar mechanism for Connect: a pluggable `ErrorHandler` interface for custom error handling logic.

Proposed changes

A new `ErrorHandler` interface will be added to the Kafka Connect API. It will allow the Connect framework to delegate error handling to a plugin-defined handler. 

This interface is inspired by the existing `ProcessingExceptionHandler` interface in Kafka Streams, which gives users control over how to react to processing errors in stream tasks. Kafka Connect’s `ErrorHandler` brings a similar level of flexibility to Connect pipelines.

package: `org.apache.kafka.connect.handler;`

class name: `org.apache.kafka.connect.reporter.ErrorHandler`

public interface ErrorHandler<T> {

    /**
        * Handle the exception that occurred during a specific stage of record processing.
        *
        * @param context processing context.
     */
    ErrorHandlerResponse handleError(ErrorContext<T> context);

    enum ErrorHandlerResponse {
        DROP,     // Silently skip the record.
        FAIL,       // Fail the task.
        ACK        // Acknowledge and skip; relevant for source connectors.
     }
}

package: `org.apache.kafka.connect.handler;`

class name: `org.apache.kafka.connect.handler.ErrorContext`

public class ErrorContext<T> {

	private final String stage;
	private final String executingClassName;
	private final T original;
	private final Throwable error;

	public ErrorContext(String stage, String executingClassName, T original, Throwable error) {
		this.stage = stage;
		this.executingClassName = executingClassName;
		this.original = original;
		this.error = error;
	}

	public String stage() {
		return stage;
	}

	public String executingClassName() {
		return executingClassName;
	}

	public T original() {
		return original;
	}

	public Throwable error() {
		return error;
	}

}

Additionally, we propose to add the following configuration properties.

Connector Configuration

Extend the existing errors.tolerance config to support a new mode:

  • `none` – current behavior.

  • `all` – current behavior.

  • `custom` – use a pluggable `ErrorHandler`.

Introduces a configuration key to specify the fully qualified class name of the `ErrorHandler`

- `errors.handler` – name of error handler.
- `errors.handler.$alias.type` – fully qualified class name for the error handler.  
- `errors.handler.$alias.*` – all other keys as defined in `ErrorHandler.config()` are prefixed with this alias.

Example:

errors.handler=example

errors.handler.example.type=com.example.SimpleErrorHandler

errors.handler.example.param=testValue

Integration with RetryWithToleranceOperator

When `errors.tolerance=custom`, the Connect runtime will:

  • Instantiate and configure the specified `ErrorHandler`.

  • Delegate error handling for supported stages to the plugin.

  • Act based on the returned `ErrorHandlerResponse`.

For example:

  • If `DROP`: record is silently skipped.

  • If `FAIL`: the task is failed.

  • If `ACK`: record is acknowledged as consumed (only for source connectors).

Small POC: https://github.com/anton-liauchuk/kafka/pull/1/files

Compatibility, Deprecation, and Migration Plan

There are no backward compatibility concerns.

Rejected Alternatives

1. Exposing `ErrorReporter` and `ProcessingContext` as public APIs: The main issue with this design is that it exposes packages with `runtime` in their names, which cannot be changed.
2. Extending the Reporter interface: Considered, but it conflates reporting and control logic. Keeping handling and reporting responsibilities separate promotes clarity and composability.


  • No labels