Author: Viquar Khan

Status: Draft

JIRA: [TBD]

Pull Request: [TBD]

Vote Thread: [TBD]

Discussion Thread:

Target Release: Apache Kafka 4.4 (alongside KIP-1191)

Depends On: KIP-1191 (DLQ for Share Groups)

Motivation

KIP-1191 introduces DLQ routing for Share Groups. When a record is written to the DLQ, the broker has authoritative knowledge of why the record was routed there it is the entity making the routing decision. However, KIP-1191 does not standardize this information as a record header on the DLQ message.

Without a standardized disposition header, operators face a significant observability gap:

  • Poison pill vs. systemic failure are indistinguishable: A record that failed because it is malformed (MAX_DELIVERY_ATTEMPTS_REACHED) looks identical in the DLQ to a record that failed because the consumer explicitly rejected it (CLIENT_REJECTED NACK) or because of a transient infrastructure issue.

  • Debugging requires broker log correlation: Operators must cross-reference DLQ record offsets with broker logs to determine failure cause, which is operationally expensive at scale.

  • Automated remediation is blocked: DLQ replay tooling cannot make intelligent routing decisions without knowing the original failure reason.

Since the broker already knows the disposition reason at routing time, making this a mandatory, broker-written header has zero additional runtime cost and significant operational value.

Public Interfaces

New Record Header: _dlq.errors.disposition

All records written to a DLQ topic by the Share Group coordinator MUST include the following headers:

Header KeyValue TypeDescription
_dlq.errors.dispositionString (UTF-8)The reason the record was routed to the DLQ. See disposition values below.
_dlq.errors.original.topicString (UTF-8)The source topic from which the record originated.
_dlq.errors.original.partitionString (UTF-8, numeric)The source partition.
_dlq.errors.original.offsetString (UTF-8, numeric)The source offset.
_dlq.errors.original.consumer.groupString (UTF-8)The share group ID that routed the record.
_dlq.errors.timestampString (UTF-8, ISO-8601)UTC timestamp of the DLQ routing event.

Disposition Values for _dlq.errors.disposition

ValueTrigger Condition
MAX_DELIVERY_ATTEMPTS_REACHEDRecord exhausted max.delivery.attempts without a successful acknowledgement.
CLIENT_REJECTEDConsumer sent an explicit NACK with AcknowledgeType=REJECT.
CIRCUIT_BREAKER_ARCHIVEDRecord was archived via the DLQ write timeout fall-through (proposed in KIP-XXXX). Indicates DLQ infrastructure was unavailable at routing time.
COORDINATOR_INITIATEDReserved for future use coordinator-initiated routing outside of normal delivery flow.

New Broker Configuration

Config KeyTypeDefaultDescription
share.group.dlq.disposition.header.enablebooleantrueControls whether the broker writes _dlq.errors.disposition and companion headers to DLQ records. Defaults to true (mandatory). Provided for emergency rollback only disabling is strongly discouraged.

The default of true reflects the design intent: this header is mandatory. The config exists solely as an emergency escape hatch, not as a feature toggle.

Proposed Changes

Coordinator-Side Header Injection

When the Share Group coordinator writes a record to the DLQ topic, it MUST inject the headers defined above before producing the record. The injection occurs at the coordinator level, not the consumer level, ensuring:

  1. Consistency: All DLQ records have the header regardless of which consumer client library is in use.

  2. Accuracy: The broker has authoritative knowledge of the disposition reason; consumer-side injection would require the consumer to self-report, which is unreliable.

  3. Zero consumer-side changes required: Existing consumer implementations require no modification.

Header Precedence

If a consumer client has already written a _dlq.errors.disposition header (e.g., via a custom producer interceptor), the broker-injected value takes precedence and overwrites the consumer-provided value. The broker's disposition is authoritative.

Interaction with KIP-1289 (Transactional DLQ Writes)

When KIP-1289 transactional DLQ writes are in use, the headers are written as part of the same transaction. A rolled-back transaction does not produce headers. This ensures header presence is a reliable signal that the DLQ write committed.

Compatibility, Deprecation, and Migration Plan

  • Backward compatibility: Record headers are additive. Existing consumers that do not read these headers are unaffected.

  • DLQ consumers that already parse custom headers: The _dlq.errors.* namespace is reserved by this KIP. Operators using conflicting custom header names should migrate to the standardized names.

  • No deprecations.

  • Wire protocol: No changes to the Kafka wire protocol. Record headers are an existing mechanism.

Test Plan

  1. Unit tests: Header injection for each disposition value; header precedence; disposition.header.enable=false suppresses headers.

  2. Integration tests: End-to-end DLQ write for each disposition type; verify correct header values on consumed DLQ records.

  3. Compatibility tests: Existing consumers that ignore unknown headers continue to function correctly.

  4. Transactional tests: Headers present on committed transactions; headers absent on rolled-back transactions.

Rejected Alternatives

  1. Consumer-side header injection: Rejected. Consumer-side injection is unreliable it requires all consumer implementations to correctly self-report their failure reason, and the broker has more accurate context.

  2. Advisory (opt-in) header: Rejected. The broker always knows the disposition reason at routing time. Making the header optional introduces inconsistency across deployments and undermines its value as a debugging signal.

  3. Separate DLQ metadata topic: Rejected as disproportionate complexity for the information being conveyed. Record headers are the idiomatic Kafka mechanism for per-record metadata.

Open Questions

  1. Should CIRCUIT_BREAKER_ARCHIVED be defined in this KIP (creating a forward dependency on KIP-1298) or deferred to that KIP as an addendum?

  2. Should the _dlq.errors.original.* companion headers be part of this KIP or a separate micro-KIP? (Proposed: include them here they are low-cost and high-value for DLQ replay tooling.)

  3. Should the timestamp use Kafka's internal epoch-ms format or ISO-8601 string for human readability? (Proposed: ISO-8601 for operator usability.)

  • No labels

2 Comments

  1. Moved this page under Kafka Improvement Proposals as it was created under Index

  2. ViquarKhan I wonder if it's worth starting the discussion for this KIP. I think this would be a nice addition to KIP-1191 and it should be possible to get it in Apache Kafka 4.4. Having disposition as part of the initial delivery of DLQ, rather than a separately enabled feature seems best to me.