Status

Current state: Under discussion

Discussion thread:  here

JIRA: KAFKA-20367 - Getting issue details... STATUS

1 Motivation

1.1 The Problem: Scaling Kafka-to-Kafka Pipelines Today

Currently, Kafka Connect sink connectors rely on traditional consumer groups that enforce a strict 1:1 mapping between partitions and tasks. T

his model is often incompatible with unordered message processing and creates three primary bottlenecks for task queue workloads:

1. Partition-Coupled Scaling: Parallelism is hard-limited by the partition count

2. Head-of-Line Blocking: Because partition ownership is exclusive, a single slow task—often caused by downstream latency—stalls all subsequent records in its assigned partitions

3. Rebalance-Driven Gaps: Adding or removing tasks triggers "rebalance storms."

1.2 How Share Groups Solve This

Share Groups (KIP-932) introduce queue semantics for Kafka consumers. Unlike consumer groups, Share Groups do not assign partitions exclusively.

Instead, records from a partition are acquired by any available consumer in the group. After processing, the consumer acknowledges the record (ACCEPT, RELEASE, ARCHIEVED, or REJECT).

This provides:

- Elastic Scaling: Decouples parallelism from partition count,
- No Head-of-Line Blocking: Supports unordered message processing; if a task slows down, records time out and are redelivered to available workers.
- Seamless Scaling: Eliminates "rebalance storms" by removing the partition assignment protocol, ensuring zero downtime during task membership changes.

Note: The share groups are only suitable for connectors with idempotent, order-independent processing.

2. Scope

2.1 In Scope (What we are building)

  • New Task Type: Introducing WorkerShareSinkTask to handle Share Group logic without changing existing connector code.

  • Flexible Activation: Toggle queue semantics globally or per-connector via consumer.override.group.protocol=share.

  • Delivery Guarantees: * At-least-once: Standard support for all sink types.

    • Exactly-once: Supported for same-cluster Kafka-to-Kafka paths (requires KIP-1289).

  • Observability: New Share Group metrics (acquisition, release, and rejection rates) integrated into the existing sink-task-metrics group.

2.2 Out of Scope (Future/Separate efforts)

  • Source Connectors: Share Group support is currently for Sinks only (Source support and MirrorMaker 2 are excluded).

  • Cross-Cluster EOS: Exactly-once delivery between different Kafka clusters is not supported in this phase.

  • API Changes: No modifications will be made to the public SinkTask Java API or individual connector codebases.

  • Complex Transactions: External 2PC coordinators and cross-cluster transactional protocols are not addressed.


3. Public Interfaces

3.1 New Configuration Properties

Worker-Level (connect-distributed.properties)

  • consumer.group.protocol: Set to share to enable KafkaShareConsumer globally for all sink tasks (Default: consumer).

Connector-Level (Per-connector JSON)

PropertyDefaultDescription
consumer.override.group.protocolInheritedSet to share to opt a specific connector into queue semantics.
share.group.idconnect-<name>Custom Share Group ID; follows standard naming conventions.
share.acknowledgement.modeexplicitexplicit: Acknowledge after task.put(). implicit: Acknowledge on the next poll.
share.acquisition.lock.timeout.ms30000Max time a record stays acquired before re-delivery. Must exceed task.put() latency.
share.delivery.semanticsat-least-onceToggle between at-least-once and exactly-once (requires KIP-1289).
share.max.delivery.count5Max re-delivery attempts before sending to a Dead Letter Queue.

3.2 New / Modified Java Interfaces


3.2.1 `WorkerShareSinkTask` (new class)


A new internal class—parallel to WorkerSinkTask—that drives SinkTask using a KafkaShareConsumer.

  • No API Changes: The public SinkTask interface and put() contract remain identical; existing connectors require no code modifications.

```
// New class: parallel to WorkerSinkTask but backed by ShareConsumer
class WorkerShareSinkTask extends WorkerTask<ConsumerRecord<byte[], byte[]>, SinkRecord> {
    private final ShareConsumer<byte[], byte[]> shareConsumer;
    private final SinkTask task;
    // ...
}
```

Note: The existing `SinkTask` interface is not modified. Connectors do not need code changes. The `put(Collection<SinkRecord>)` contract remains the same.

The difference is entirely in the worker runtime:

AspectWorkerSinkTask (Traditional)WorkerShareSinkTask (Proposed)
ConsumerKafkaConsumerKafkaShareConsumer
TrackingConsumer Offsets + commitSync()Per-record acknowledge(ACCEPT)
RebalanceRebalance listener triggers open/closeNone. task.open() called once at startup.
FailuresPause consumer and retry batchacknowledge(RELEASE) for broker re-delivery

3.2.2`Worker.baseConsumerConfigs()` (modified)

Updated to detect group.protocol=share. It dynamically constructs ShareConsumerConfig properties (like share.group.id) instead of traditional consumer configs.

Metrics

New sensors are registered only in Share Group mode to keep dashboards clean and verify the connector state. All metrics belong to the existing sink-task-metrics group.

New Share-Specific Metrics:

  • sink-record-acquire: Rate/Total of records pulled from the group.

  • sink-record-acknowledge: Rate/Total of successful ACCEPT acks.

  • sink-record-release/reject: Rate/Total of records released for retry or rejected to DLQ.

  • acknowledge-time: Time between poll() and acknowledge().

  • sink-record-redelivery: Total records with delivery count > 1.

Exclusions: The following traditional sensors are not registered in share mode as they are not applicable: partition-count, offset-seq-number, and offset-commit-completion.

Proposed Changes

At‑Least‑Once (Share Group → SinkTask → External Sink)

Exactly‑Once (Same‑Cluster Kafka‑to‑Kafka, KIP‑1289)



`WorkerShareSinkTask` Lifecycle

Initialization

The setup phase is simplified because Share Groups eliminate partition-level management.

  1. Consumer Creation: Instantiates KafkaShareConsumer.

  2. Subscription: Subscribes to topics directly (no RebalanceListener needed).

  3. Task Startup: Calls task.initialize() and task.start(). Unlike traditional sinks, task.open() is called once at startup for all topics since there are no rebalances.

Main Loop (Iteration)

The worker drives the at-least-once delivery guarantee by following this execution flow:

  1. Poll: Pulls records from the share group.

  2. Convert: Transforms messages into SinkRecords.

  3. Deliver: Passes records to the connector via task.put().

  4. Acknowledge:

    • Success: Marks all records as ACCEPT.

    • Retriable Error: Marks records as RELEASE for immediate broker re-delivery.

    • Fatal Error: Marks records as REJECT (routes to Dead Letter Queue if configured).

  5. Commit: Calls shareConsumer.commitSync() to durably persist the acknowledgments on the broker.

Ensuring No Data Loss (At-Least-Once)

he system guarantees that no data is lost by ensuring a record is only acknowledged (ACCEPT) after task.put() returns successfully.

  • Failure Recovery: If a task crashes before acknowledging, the record remains in an ACQUIRED state until the share.acquisition.lock.timeout.ms expires.

  • The broker then makes the record AVAILABLE for another task.

  • Durable Commits: The default explicit mode uses commitSync() to ensure acknowledgments are persistent. If a commit fails, the record is re-delivered.

  • Duplicates: Occasional duplicates may occur if a crash happens after task.put() but before acknowledgment. This is standard for at-least-once delivery and is best handled by idempotent sinks (e.g., upserts).

Exactly-Once Semantics (Future Phase, requires KIP-1289)

For Kafka-to-Kafka pipelines, exactly-once delivery is achieved by atomizing the producer's records and the consumer's acknowledgments within a single transaction via KIP-1289.

StepAction
1. BeginStart producer transaction.
2. ProduceSend transformed records to output topics.
3. BindCall producer.sendShareAcksToTransaction() to link share acks to the transaction.
4. CommitAtomically commit both output records and source acknowledgments.

 Configuration Resolution Order

The proposal reuses the existing consumer.override.* mechanism in Kafka Connect for a seamless transition.

Resolution Order:

  1. Worker Config: consumer.group.protocol=share (Global default).

  2. Connector Config: consumer.override.group.protocol=share (Per-connector override).

Important Note on Group IDs: Share groups use a specific state topic (__share_group_state).

To avoid membership conflicts, users must ensure that a Share Group ID does not match an existing Consumer Group ID. A validation check will be implemented to prevent this collision.

4. Compatibility, Deprecation, and Migration Plan

  • Impact on Users: KIP-1302 is opt-in only and purely additive. Standard consumer groups remain the default, and zero code changes are required for existing connectors.

  • Migration Path:

    1. Ensure brokers are version 4.0+.

    2. Set group.protocol=share at the worker level (all connectors) or per-connector JSON.

    3. Tune share.acquisition.lock.timeout.ms to exceed your highest expected task.put() latency.

  • Rollback: Removing the share config reverts the connector to standard consumer groups.

    • Note: Consumer groups and Share Groups track progress separately; a rollback may result in processing records that were already handled by the Share Group.

5. Test Plan

Test LevelKey Objectives
UnitVerify the poll-put-acknowledge loop (ACCEPT/RELEASE/REJECT) and config resolution.
IntegrationTest elastic scaling (adding/removing tasks), task failure recovery, and interoperability between share and traditional connectors.
SystemPerformance benchmarking against traditional consumer groups and chaos testing to verify zero data loss.


6. Rejected Alternatives

6. Rejected Alternatives

AlternativeReason for Rejection
Modify SinkTask APIAdding explicit acknowledge() methods would break backward compatibility for all existing connectors.
Limit to MirrorMaker 2Generic sinks (S3, JDBC, etc.) benefit just as much from elastic scaling as Kafka-to-Kafka pipelines.
Require EOS InitiallyAt-least-once is sufficient for most use cases, and exactly-once is blocked by the pending KIP-1289/KIP-1310.


  • No labels