Current state: Adopted
Discussion thread: Here
Vote thread: Here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, KIP-298 provides error handling in Kafka Connect that includes functionality such as retrying, logging, and sending errant records to a dead letter queue. However, the dead letter queue functionality from KIP-298 only supports error reporting within contexts of the transform operation, and key, value, and header converter operation. After records are sent to the connector for processing, there is no support for dead letter queue/error reporting functionality.
As stated in rejected alternatives, ”Write records that fail in the put() step of a sink connector to the dead letter queue: since sink connectors can chose to batch records in a put() method, it is not clear what errors are caused by what records (they might be because of records that were immediately written to put(), or by some previous records that were processed later). Also, there might be connection issues that are not handled by the connector, and simply bubbled up as IOException (for example). Effectively, errors sent back to the framework from the put() method currently do not have sufficient context to determine the problematic records (if any). Addressing these issues would need a separate KIP. “
Thus, this proposal aims to extend KIP-298 and add error reporting functionality even after records are sent to connector tasks without adding redundancy in configuration.
This feature will directly make changes to the
SinkTaskContext class with the addition of a new method and a new interface.
This KIP will add a getter method to the
SinkTaskContext class that will return a
error reporter object, and by default this method will return null. Sink connectors that wish to use an error reporter can call this method within their tasks.
The method will look the following:
This KIP will add an
ErrantRecordReporter interface and will contain one method,
report(SinkRecord record, Throwable error).
The interface will look like the following:
The error reporter interface will have one asynchronous reporting method. The method will accept the errant record and the exception thrown while processing the errant record, and return a
Future<RecordMetadata>. In order to handle the case where the connector is deployed to an older version of AK, when calling the method failedRecordReporter() to get the error reporter, the developer must catch the resulting
NoSuchMethodError and set the reporter to null. Thus, when processing errant records, the developer should check if the reporter is null; if it is, then the developer should wrap the original exception in a
ConnectException and throw it. Records passed to the error reporter should be considered processed with respect to handling offsets, for example within the context of processing current offsets in
The error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and
AdminClient under the hood aside from the existing
admin. configurations present in the worker configurations and
admin.override. configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.
The following is an example of how a sink task can use the error reporter and support connectors being deployed in earlier versions of the Connect runtime:
The error reporting functionality is asynchronous, although tasks can use the resulting future to wait for the record and exception to be written to Kafka.
The Connect framework also guarantees that by the time
preCommit(...) is called on the task, the error reporter will have successfully and fully recorded all reported records with offsets at or before those passed to the
preCommit method. Sink task implementations that need more strict guarantees can use the futures returned by
report(...) to wait for confirmation that reported records have been successfully recorded.
No new metrics will be added for the error reporter. Instead, the metrics detailed in KIP-298 for the dead letter queue will be used for the error reporter.
The Errant Record Reporter will adhere to the existing DLQ error tolerance functionality. For example, if `errors.tolerance` is set to `ALL`, all errors will be tolerated; if the property is set to `NONE`, then the Errant Record Reporter will throw an exception detailing that the tolerance has been exceeded. Even if the developer chooses to catch and swallow any of these exceptions thrown during `report(...)`, the task is guaranteed to be killed following the completion of `put(...)`.
This KIP will update the
FileSinkTask example to use the new API, to demonstrate proper error handling with the ability to use it on older versions of Kafka Connect.
Compatibility, Deprecation, and Migration Plan
This proposal is backward compatible such that existing sink connector implementations will continue to work as before. Developers can optionally modify sink connector implementations to use the new error reporting feature, yet still easily support installing and running those connectors in older Connect runtimes where the feature does not exist.
Moreover, to ensure that new connectors using this new method and interface can still be deployed on older versions of Kafka Connect, the developer should use a try catch block to catch the
NoClassDefFoundError thrown by worker with an older version of AK.
- New library or interface: creating a separate library like
connect-reporterwill cause redundancy in configuration, and creating any new interface or API will limit backwards compatibility. Deploying a connector on an old version of connect will not be a seamless integration with the introduction of a new interface
ProcessingContextas public APIs: the main issue with this design is that the the API exposes packages with
runtimein them and package names cannot be changed.
- Labeling as a dead letter queue: this is too specific of a label; using error reporter creates a more general contract of use case
- Batch error reporting: this would introduce the need for keeping track of order, and the increase in throughput doesn't seem to outweigh the added complication
- Creating a setter method in
SinkTaskthat accepts an error reporter object: while this allows for backwards compatibility, it doesn't follow previous patterns of adding these types of methods to
SinkTaskContextand is not a very aesthetic addition to the interface
- Creating an overloaded
put(...)method that accepts an error reporter object and deprecating the original
put(...)method: deprecating the original method can cause confusion on which method to implement and if the wrong method is implemented, it can cause backwards compatibility issues
- Synchronous-only functionality: this limits developer control on functionality and traditionally a lot of functionality within Kafka Connect has had asynchronous functionality
Future: because a sink task's callback is called from the producer thread, it can risk a poorly written sink task callback killing the reporter's producer without necessarily failing the task