DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/t4h273md03vzcbzk4r3bn86km8rnym66
JIRA: KAFKA-20280 - Getting issue details... STATUS
Motivation
It's a follow-up of KIP-1270. We do not support records which failed during global store/KTable processing to be available in DLQ. It currently logs record metadata. With this KIP, we aim to enable the DLQ support, which would enable the end user to have more control over the failed record.
Public Interface
No public interface changes.
The existing configuration `processing.exception.handler.global.enabled` (introduced in KIP-1270) will now fully enable DLQ functionality when set to `true`.
Before this KIP:
processing.exception.handler.global.enabled=true # Only logs, drops DLQ records
After this KIP:
processing.exception.handler.global.enabled=true # Fully functional DLQ
Proposed Changes
Overview
This KIP reuses the existing `RecordCollectorImpl` infrastructure to enable DLQ support for global tasks, maintaining consistency with regular stream tasks.
Implementation Details
Using the existing record collector implementation has several benefits
- Critical error handline like ProductionExceptionHandler is automatically invoked if the DLQ send fails.
- Built-in Metrics Infrastructure.
- Same DLQ code path for regular and global tasks.
Similar to ProcessorContextImpl, GlobalProcessorContextImpl would implement the RecordCollector Supplier interface.
public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier {
private RecordCollector recordCollector; // NEW: Add field
// NEW: Add setter method
public void setRecordCollector(final RecordCollector recordCollector) {
this.recordCollector = recordCollector;
}
@Override
public RecordCollector recordCollector() {
return recordCollector; // CHANGED: Return actual collector instead of null
}
}
Similar to StreamThread, GlobalStreamThread would initialise the StreamProducer and create a RecordCollectorImpl object, which would then set it in GlobalProcessorContextImpl.
Compatibility, Deprecation, and Migration Plan
- This is an enhancement, not a breaking change, as DLQ records will now be sent instead of logging.
Test Plan
Since this is a fairly small change, unit tests and integration tests should be sufficient.
Rejected Alternatives
Creating a Single Producer specifically for GlobalTask instead of using RecordCollector. It would be lighter weight and sufficient for DLQ implementation, but it's not future-proof and also breaks consistency with the stream processing.