DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Kafka Streams does not support ProcessingExceptionHandler for GlobalKTable processors. When a processing exception occurs during GlobalKTable record processing, the GlobalStreamThread fails and terminates, causing the entire Kafka Streams application to shut down. This behaviour is inconsistent with regular KStream/KTable processing, where ProcessingExceptionHandler allows applications to handle exceptions gracefully and continue processing.
Public Interfaces
New property:
| Name | Type | Default | Importance | Deprecated | Removed | Description |
|---|---|---|---|---|---|---|
processing.exception.handler.global.enabled | Boolean | False | HIGH | 4.3 | 5.0 | When false (default), maintains backwards-compatible behaviour where global exceptions terminate or gracefully shut down the application. When true, enables the ProcessingExceptionHandler for GlobalKTable exceptions. This config will be removed in Kafka Streams 5.0, where global exception handling will be enabled by default |
Proposed Changes
This KIP extends the applicability of the existing ProcessingExceptionHandler to GlobalKTable processors. Currently, this exception handler only applies to regular stream processing (KStream/KTable). After this KIP, the same handler will also handle processing exceptions in GlobalKTable. The feature is gated behind a configuration flag to prevent unexpected behaviour in existing handler implementations that may not be designed to handle GlobalKTable exceptions, which could cause crashes or undesired side effects. This provides a safe migration path for users to test and adapt their exception handling logic.
Exception Handling Scope
After this KIP, the ProcessingExceptionHandler configured via `default.processing.exception.handler` will be invoked for:
- KStream processing exceptions (existing)
- KTable processing exceptions (existing)
- GlobalKTable processing exceptions (new capability added by this KIP)
Current State
Currently, while DeserializationExceptionHandler is passed to both regular stream tasks and global state update tasks, ProcessingExceptionHandler is only passed to regular stream tasks, resulting in unhandled processing exceptions for GlobalKTable processors.
Proposed Extension
The infrastructure for configuration and instantiation of the ProcessingExceptionHandler already exists. However, it is currently only passed to regular stream tasks, not to global state update tasks. This KIP extends the existing pattern to pass ProcessingExceptionHandler to GlobalStateUpdateTask, mirroring how DeserializationExceptionHandler is already passed to both regular and global tasks. This enables exception handling during GlobalKTable processor execution.
Implementation
Add ProcessingExceptionHandler as a constructor parameter to GlobalStateUpdateTask and pass it to processor initialisation, mirroring the existing DeserializationExceptionHandler implementation. When invoked, the ProcessingExceptionHandler instance will call the handleError() method (not handle()). This provides automatic backward compatibility through the implementation defined in KIP-1034.
Limitation
In this KIP, DLQ records returned by the handler will be logged with full metadata but NOT sent to Kafka for GlobalKTable processors. This is because GlobalKTable processing does not currently have producer infrastructure. We will cover DLQ in a separate KIP.
Compatibility, Deprecation, and Migration Plan
- Fully backwards compatible by default - existing behaviour is preserved unless config is explicitly enabled
- New configuration `processing.exception.handler.global.enabled` defaults to `false`
- Config is deprecated immediately upon introduction
- In Kafka Streams 5.0, this config will be removed, and the handler will always be invoked for GlobalKTable exceptions
- Migration path: Users should test with the config enabled before upgrading to 5.0
Test Plan
Since this is a fairly small change, unit tests and integration tests should be sufficient.
Rejected Alternatives
Create a new ProcessExceptionalHandler interface for the global thread, which requires introducing new configuration at the stream config level. This would create unnecessary operational overload for the end users as it require to implement multiple interfaces.