Status
Current state: Under Discussion
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, the error handling mechanism in Kafka Connect supports only predefined error reporter implementations from the runtime module. The `org.apache.kafka.connect.runtime.errors.ErrorReporter` interface is located in the runtime module, which restricts the ability to define custom error reporting mechanisms. Current error reporter implementations:
- `org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter` - Reports records into a dead letter queue.
- `org.apache.kafka.connect.runtime.errors.LogReporter` - An abstract class for reporting errors via logging. Both sink and source connectors have their own implementations of the log reporter.
In addition to these implementations, custom error reporters can be beneficial for implementing custom logic to report errors to specific storage systems, as well as defining custom metrics and logs based on the error context. This can be particularly useful for source connectors, where the log reporter is the only option to understand the root cause of failures. This proposal aims to introduce the capability to define custom error reporting functionality.
Public Interfaces
We propose the following new public interface:
package: `org.apache.kafka.connect.reporter;`
class name: `org.apache.kafka.connect.reporter.ErrorRecordReporter`
/** * Provides a mechanism for reporting errors * using the information contained in an `ErrorContext`. * * @param <T> the type of the error context */ public interface ErrorRecordReporter<T> extends Configurable, AutoCloseable { /** * Report an error using the provided error context. * * @param context the error context (cannot be null) */ void report(ErrorContext<T> context); @Override default void close() { } }
package: `org.apache.kafka.connect.reporter;`
class name: `org.apache.kafka.connect.reporter.ErrorContext`
public class ErrorContext<T> { private final String stage; private final String executingClassName; private final T original; private final Throwable error; public ErrorContext(String stage, String executingClassName, T original, Throwable error) { this.stage = stage; this.executingClassName = executingClassName; this.original = original; this.error = error; } public String stage() { return stage; } public String executingClassName() { return executingClassName; } public T original() { return original; } public Throwable error() { return error; } }
Additionally, we propose to add the following configuration properties.
Connector Configuration
The error record reporter chain will be configured at the connector-level. The order of the error record reporters is defined by the `errors.reporters` config which represents a list of aliases. Each alias in `errors.reporters` implies that some additional keys are configurable:
- `errors.reporters.$alias.type` – fully qualified class name for the error reporter
- `errors.reporters.$alias.*` – all other keys as defined in `ErrorRecordReporter.config()` are prefixed with this alias.
Example:
errors.reporters=example errors.reporters.example.type=com.example.SimpleErrorRecordReporter errors.reporters.example.param=testValue
Proposed Changes
We propose a new pluggable interface (`ErrorRecordReporter`) which enables the creation of custom error record reporters. These custom reporters will be executed when `errors.tolerance=all`. The draft pull request: https://github.com/apache/kafka/pull/17493
Compatibility, Deprecation, and Migration Plan
There are no backward compatibility concerns.
Rejected Alternatives
1. Exposing `ErrorReporter` and `ProcessingContext` as public APIs: The main issue with this design is that it exposes packages with `runtime` in their names, which cannot be changed.
2. Naming as ErrorReporter: This name is already used for error reporters in the runtime module. To avoid name conflicts, `ErrorRecordReporter` was chosen.