Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/t4h273md03vzcbzk4r3bn86km8rnym66

JIRA: KAFKA-20280 - Getting issue details... STATUS

Motivation

It's a follow-up of KIP-1270. We do not support records which failed during global store/KTable processing to be available in DLQ. It currently logs record metadata. With this  KIP, we aim to enable the DLQ support, which would enable the end user to have more control over the failed record. 

Public Interface 

No public interface changes. 

The existing configuration `processing.exception.handler.global.enabled` (introduced in KIP-1270) will now fully enable DLQ functionality when set to `true`.

Before this KIP:
  processing.exception.handler.global.enabled=true  # Only logs, drops DLQ records

After this KIP:
  processing.exception.handler.global.enabled=true  # Fully functional DLQ

Proposed Changes

Overview

This KIP reuses the existing `RecordCollectorImpl` infrastructure to enable DLQ support for global tasks, maintaining consistency with regular stream tasks.

Implementation Details

Using the existing record collector implementation has several benefits

  1. Critical error handline like ProductionExceptionHandler is automatically invoked if the DLQ send fails.
  2. Built-in Metrics Infrastructure.
  3. Same DLQ code path for regular and global tasks.


Similar to ProcessorContextImpl, GlobalProcessorContextImpl would implement the RecordCollector Supplier interface.

GlobalProcessorContextImpl
public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier {
 private RecordCollector recordCollector;  // NEW: Add field

      // NEW: Add setter method
      public void setRecordCollector(final RecordCollector recordCollector) {
          this.recordCollector = recordCollector;
      }

      @Override
      public RecordCollector recordCollector() {
          return recordCollector;  // CHANGED: Return actual collector instead of null
      }
}


Similar to StreamThread, GlobalStreamThread would initialise the StreamProducer and create a RecordCollectorImpl object, which would then set it in GlobalProcessorContextImpl.

Compatibility, Deprecation, and Migration Plan

  • This is an enhancement,  not a breaking change, as  DLQ records will now be sent instead of logging.

Test Plan

Since this is a fairly small change, unit tests and integration tests should be sufficient.

Rejected Alternatives

Creating a Single Producer specifically for GlobalTask instead of using RecordCollector. It would be lighter weight and sufficient for DLQ implementation,  but it's not future-proof and also breaks consistency with the stream processing. 








  • No labels