Author: Vaquar Khan

Status: Draft

JIRA: [TBD]

Pull Request: [TBD]

Vote Thread: [TBD]

Discussion Thread:

Target Release: Apache Kafka 4.5 (follow-on to KIP-1191 / AK 4.4)

Depends On: KIP-1191 (DLQ for Share Groups), KIP-1249 (Pause Delivery for Share Groups)

Motivation

KIP-1191 introduces a Dead Letter Queue (DLQ) mechanism for Kafka Share Groups. While this is a significant operational improvement, it introduces two distinct systemic risks at scale that this KIP addresses.

Risk 1: DLQ Flood (Downstream Service Failure)

If a downstream consumer service becomes unavailable or consistently rejects messages, every message in the share group will exhaust its max.delivery.attempts and be routed to the DLQ. In a high-throughput pipeline, this can drain the main topic into the DLQ within minutes, causing:

  • Loss of message ordering: DLQ records are no longer in the original partition order relative to unprocessed records.

  • Re-processing complexity: Operators must triage and replay a large DLQ backlog, often under incident pressure.

  • Masking of root cause: A flood of DLQ writes obscures whether the failure is a poison pill (single bad record) or a systemic outage (downstream service down).

  • Broker resource pressure: Sustained DLQ writes consume broker I/O, network bandwidth, and coordinator CPU.

Risk 2: Head-of-Line Blocking via DLQ Infrastructure Failure

While group.share.partition.max.record.locks correctly prevents broker memory pressure by bounding in-flight records, hitting that lock ceiling has a critical liveness consequence: fetching operations yield no further records from the primary partition. If the DLQ topic suffers a prolonged metadata or storage outage, the coordinator holds record locks until the limit is exhausted stalling the primary pipeline entirely.

This re-introduces head-of-line blocking via a secondary infrastructure failure precisely the problem Share Groups were originally designed to eliminate. A DLQ (a secondary concern) taking down the primary stream is an architectural single point of failure (SPOF). The safer operational posture is to prioritize system liveness over strict DLQ durability during infrastructure degradation.

Concrete example: A share group is processing 50,000 records/sec across 20 partitions. The DLQ topic's leader undergoes an unplanned election lasting 45 seconds. Without share.group.dlq.write.timeout.ms, the coordinator accumulates ARCHIVING-state record locks until group.share.partition.max.record.locks is exhausted. At that point, primary partition fetching halts entirely for the duration of the DLQ outage. A secondary infrastructure failure (DLQ topic) has cascaded into a full primary pipeline outage precisely the head-of-line blocking scenario Share Groups were designed to prevent.

This KIP proposes:

  1. A configurable rolling-window circuit breaker that automatically pauses a share group when the DLQ write rate exceeds a defined threshold (addressing Risk 1).

  2. A configurable DLQ write timeout with a fall-through to the ARCHIVED state and logged error (addressing Risk 2).

Public Interfaces

New Share Group Configuration Properties

Config KeyTypeDefaultDescription
share.group.dlq.circuit.breaker.enablebooleanfalseEnables the circuit breaker for this share group.
share.group.dlq.circuit.breaker.threshold.percentint20Percentage of messages routed to DLQ within the rolling window that triggers the circuit breaker. Range: 1–100.
share.group.dlq.circuit.breaker.window.mslong60000Rolling window duration (ms) over which the DLQ rate is evaluated. Default: 60 seconds.
share.group.dlq.circuit.breaker.min.messagesint100Minimum number of messages processed within the window before the circuit breaker can trigger. Prevents false positives on low-volume groups.
share.group.dlq.circuit.breaker.auto.resume.enablebooleanfalseIf true, the group automatically resumes after share.group.dlq.circuit.breaker.auto.resume.ms.
share.group.dlq.circuit.breaker.auto.resume.mslong300000Time (ms) to wait before auto-resuming a paused group. Default: 5 minutes.
share.group.dlq.write.timeout.mslong30000Maximum time (ms) the coordinator will wait for a DLQ write to complete before falling through to ARCHIVED state with a logged error. Default: 30 seconds.
share.group.dlq.strict.isolationbooleanfalseIf true, coordinator rejects group startup if the configured DLQ topic is already registered by another share group.

New Broker Metrics

MetricDescription
kafka.share.group:type=ShareGroupMetrics,name=DlqCircuitBreakerTripped,group={group-id}Counter incremented each time the circuit breaker trips for a given share group.
kafka.share.group:type=ShareGroupMetrics,name=DlqWriteTimeouts,group={group-id}Counter incremented each time a DLQ write times out and falls through to ARCHIVED.
kafka.share.group:type=ShareGroupMetrics,name=DlqTopicSharedByMultipleGroups,topic={dlq-topic}Gauge emitted at coordinator startup and on group registration when a DLQ topic is shared by more than one share group. No enforcement observability only.

Share Group State Extension

The share group state machine gains a new observable reason for the PAUSED state:

PAUSED (reason=DLQ_CIRCUIT_BREAKER_TRIPPED)

This reason is surfaced in:

  • DescribeGroups API response (new pause-reason field)

  • Coordinator logs at WARN level

  • The DlqCircuitBreakerTripped broker metric above

Proposed Changes

Share Group State Machine Delta

This KIP adds one new observable reason to the existing PAUSED state defined in KIP-1249. No new states are introduced.




All other state transitions remain as defined in KIP-932 and KIP-1249.

Trigger Condition 1: Rolling-Window Rate Threshold (DLQ Flood)

The Share Group Coordinator tracks a sliding window counter per share group:

  1. For each record disposition event, the coordinator increments either a delivered counter or a dlq_written counter within the current window bucket.

  2. At the end of each window bucket, the coordinator evaluates:

dlq_rate = dlq_written / (delivered + dlq_written)

if dlq_rate >= threshold AND (delivered + dlq_written) >= min.messages:

TRIP circuit breaker → PAUSE group (reason=DLQ_CIRCUIT_BREAKER_TRIPPED)

  1. The coordinator uses the KIP-1249 pause mechanism to halt delivery. No new pause infrastructure is introduced.

  2. The paused state persists until:

    • An operator explicitly resumes the group via admin tooling, OR

    • auto.resume.enable=true and auto.resume.ms has elapsed.

Trigger Condition 2: DLQ Write Timeout (Liveness / HOL-Blocking Prevention)

When the coordinator attempts a DLQ write:

  1. A per-write timer is started, bounded by share.group.dlq.write.timeout.ms.

  2. If the write completes within the timeout: normal flow continues.

  3. If the timeout expires before the write completes:

    • The record state transitions directly to ARCHIVED (not ARCHIVING).

    • A WARN-level log entry is emitted with the record offset, partition, and failure reason.

    • The DlqWriteTimeouts metric is incremented.

    • The record lock is released, allowing the primary partition to continue delivery.

DLQ Isolation (Opt-In)

When share.group.dlq.strict.isolation=true:

  • At group startup, the coordinator checks whether the configured DLQ topic is already registered by another active share group.

  • If a conflict is detected, group startup is rejected with a descriptive error.

  • Default is false to preserve the intentional flexibility of the current design.

Regardless of the strict.isolation setting, the DlqTopicSharedByMultipleGroups metric is always emitted when a shared DLQ condition is detected.

Interaction with Existing Configs

  • max.delivery.attempts (KIP-1191): Unchanged. The circuit breaker fires after DLQ writes occur.

  • KIP-1289 (Transactional acknowledgement): A failed transaction that does not result in a committed DLQ write is not counted toward the rate threshold. A timed-out write is counted as a timeout event, not a DLQ write event.

  • KIP-1249 (Pause delivery): The circuit breaker uses KIP-1249's existing pause mechanism directly.

Admin Tooling

The kafka-share-groups.sh tool is extended with:

{code:language=bash}

Describe circuit breaker state for a group

kafka-share-groups.sh --bootstrap-server <host>

--describe --group <group-id> --circuit-breaker

Manually resume a circuit-breaker-paused group

kafka-share-groups.sh --bootstrap-server <host>

--resume --group <group-id> --reason "incident resolved"

Show DLQ topology across all share groups

kafka-share-groups.sh --bootstrap-server <host>

--describe --dlq-topology

{code}

The --reason string provided to --resume is logged at INFO level on the coordinator at resume time and included in the DescribeGroups response as a last-resume-reason field for audit purposes. It is not persisted to the share group state topic.

Compatibility, Deprecation, and Migration Plan

  • Backward compatibility: All new configs default to off/safe values. Existing share groups are unaffected unless explicitly configured.

  • Protocol changes: The DescribeGroups response is extended with an optional pause-reason field. Older clients ignore it gracefully.

  • No deprecations.

  • KIP-1249 dependency: This KIP requires KIP-1249 to be available on the broker. Attempting to enable share.group.dlq.circuit.breaker.enable=true on a broker without KIP-1249 support will result in a configuration validation error at group startup with a descriptive message indicating the missing dependency.

Test Plan

  1. Unit tests: Circuit breaker threshold calculation, window bucket rotation, counter reset on resume, timeout fall-through logic.

  2. Integration tests: End-to-end trip and auto-resume under simulated DLQ write failures; timeout fall-through with primary pipeline continuity verification.

  3. Soak tests: 72-hour automated soak with a consumer rejecting 25% of messages; verify group pauses, metrics fire, and no broker resource leak.

  4. HOL-blocking regression tests: Simulate sustained DLQ topic outage; verify primary partition delivery continues after timeout fall-through and lock release.

  5. Chaos tests: Coordinator failover mid-window; verify counter state integrity and no false-trip on recovery.

  6. Isolation tests: Multi-group DLQ sharing scenarios with strict.isolation=true and false.

  7. Upgrade tests: Rolling upgrade from AK 4.4 to AK 4.5; verify no disruption.

Rejected Alternatives

  1. Client-side circuit breaker only: Rejected because client-side logic cannot observe the full DLQ write rate across all consumers in a share group. The coordinator has the authoritative view.

  2. Hard-coded 20% threshold: Rejected in favor of a configurable threshold. Different workloads have different tolerances.

  3. Hard coordinator enforcement of DLQ isolation: Rejected in favor of opt-in strict.isolation config + observability metric, preserving intentional flexibility while giving regulated environments a safety valve.

  4. Automatic DLQ topic compaction on circuit breaker trip: Out of scope. DLQ remediation tooling is a separate concern.

Open Questions

  1. Should the circuit breaker window counter be persisted in the share group state topic (for coordinator failover resilience), or is an in-memory counter acceptable given that a failover naturally resets the window?

    Proposed: In-memory counter is acceptable for the initial implementation. A coordinator failover resets the window, which is a safe failure mode the circuit breaker may take up to one additional window period to re-trip after failover. Persisting the counter to the state topic adds complexity and write amplification that is not justified for a rolling-window metric. This can be revisited in a follow-on KIP if operational experience shows failover-induced false negatives are a problem.

  2. Should auto.resume.enable default to true for development environments?

    Proposed: No auto.resume.enable defaults to false in all environments. Differentiating defaults by environment would require broker-side environment detection, which is out of scope. Operators who want auto-resume behavior in development can set it explicitly. Defaulting to manual resume is the safer posture for production and avoids a footgun where a misconfigured production broker auto-resumes into a still-broken downstream service.

  3. Should the pause-reason field be added to the existing DescribeGroups RPC, or introduced as a new DescribeShareGroupState RPC?

  4. Should timed-out DLQ writes (ARCHIVED fall-through) count toward the rate threshold circuit breaker, or be tracked as a separate trigger condition only?

    Proposed: No. Timeout events and rate-threshold events are distinct failure modes with different operational meanings a timeout indicates DLQ infrastructure unavailability, while a high DLQ rate indicates downstream consumer failure. Conflating them would make the circuit breaker harder to reason about and could cause false trips during DLQ infrastructure recovery. Timeout events are tracked separately via the DlqWriteTimeouts metric and should trigger operator alerting independently.

  • No labels