DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under discussion
Discussion thread: here
JIRA:
KAFKA-20367
-
Getting issue details...
STATUS
1 Motivation
1.1 The Problem: Scaling Kafka-to-Kafka Pipelines Today
Currently, Kafka Connect sink connectors rely on traditional consumer groups that enforce a strict 1:1 mapping between partitions and tasks. T
his model is often incompatible with unordered message processing and creates three primary bottlenecks for task queue workloads:
1. Partition-Coupled Scaling: Parallelism is hard-limited by the partition count
2. Head-of-Line Blocking: Because partition ownership is exclusive, a single slow task—often caused by downstream latency—stalls all subsequent records in its assigned partitions
3. Rebalance-Driven Gaps: Adding or removing tasks triggers "rebalance storms."
1.2 How Share Groups Solve This
Share Groups (KIP-932) introduce queue semantics for Kafka consumers. Unlike consumer groups, Share Groups do not assign partitions exclusively.
Instead, records from a partition are acquired by any available consumer in the group. After processing, the consumer acknowledges the record (ACCEPT, RELEASE, ARCHIEVED, or REJECT).
This provides:
- Elastic Scaling: Decouples parallelism from partition count,
- No Head-of-Line Blocking: Supports unordered message processing; if a task slows down, records time out and are redelivered to available workers.
- Seamless Scaling: Eliminates "rebalance storms" by removing the partition assignment protocol, ensuring zero downtime during task membership changes.
Note: The share groups are only suitable for connectors with idempotent, order-independent processing.
2. Scope
2.1 In Scope (What we are building)
New Task Type: Introducing
WorkerShareSinkTaskto handle Share Group logic without changing existing connector code.Flexible Activation: Toggle queue semantics globally or per-connector via
consumer.override.group.protocol=share.Delivery Guarantees: * At-least-once: Standard support for all sink types.
Exactly-once: Supported for same-cluster Kafka-to-Kafka paths (requires KIP-1289).
Observability: New Share Group metrics (acquisition, release, and rejection rates) integrated into the existing
sink-task-metricsgroup.
2.2 Out of Scope (Future/Separate efforts)
Source Connectors: Share Group support is currently for Sinks only (Source support and MirrorMaker 2 are excluded).
Cross-Cluster EOS: Exactly-once delivery between different Kafka clusters is not supported in this phase.
API Changes: No modifications will be made to the public
SinkTaskJava API or individual connector codebases.Complex Transactions: External 2PC coordinators and cross-cluster transactional protocols are not addressed.
3. Public Interfaces
3.1 New Configuration Properties
Worker-Level (connect-distributed.properties)
consumer.group.protocol: Set toshareto enableKafkaShareConsumerglobally for all sink tasks (Default:consumer).
Connector-Level (Per-connector JSON)
| Property | Default | Description |
consumer.override.group.protocol | Inherited | Set to share to opt a specific connector into queue semantics. |
share.group.id | connect-<name> | Custom Share Group ID; follows standard naming conventions. |
share.acknowledgement.mode | explicit | explicit: Acknowledge after task.put(). implicit: Acknowledge on the next poll. |
share.acquisition.lock.timeout.ms | 30000 | Max time a record stays acquired before re-delivery. Must exceed task.put() latency. |
share.delivery.semantics | at-least-once | Toggle between at-least-once and exactly-once (requires KIP-1289). |
share.max.delivery.count | 5 | Max re-delivery attempts before sending to a Dead Letter Queue. |
3.2 New / Modified Java Interfaces
3.2.1 `WorkerShareSinkTask` (new class)
A new internal class—parallel to WorkerSinkTask—that drives SinkTask using a KafkaShareConsumer.
No API Changes: The public
SinkTaskinterface andput()contract remain identical; existing connectors require no code modifications.
```
// New class: parallel to WorkerSinkTask but backed by ShareConsumer
class WorkerShareSinkTask extends WorkerTask<ConsumerRecord<byte[], byte[]>, SinkRecord> {
private final ShareConsumer<byte[], byte[]> shareConsumer;
private final SinkTask task;
// ...
}
```
Note: The existing `SinkTask` interface is not modified. Connectors do not need code changes. The `put(Collection<SinkRecord>)` contract remains the same.
The difference is entirely in the worker runtime:
| Aspect | WorkerSinkTask (Traditional) | WorkerShareSinkTask (Proposed) |
| Consumer | KafkaConsumer | KafkaShareConsumer |
| Tracking | Consumer Offsets + commitSync() | Per-record acknowledge(ACCEPT) |
| Rebalance | Rebalance listener triggers open/close | None. task.open() called once at startup. |
| Failures | Pause consumer and retry batch | acknowledge(RELEASE) for broker re-delivery |
3.2.2`Worker.baseConsumerConfigs()` (modified)
Updated to detect group.protocol=share. It dynamically constructs ShareConsumerConfig properties (like share.group.id) instead of traditional consumer configs.
Metrics
New sensors are registered only in Share Group mode to keep dashboards clean and verify the connector state. All metrics belong to the existing sink-task-metrics group.
New Share-Specific Metrics:
sink-record-acquire: Rate/Total of records pulled from the group.sink-record-acknowledge: Rate/Total of successfulACCEPTacks.sink-record-release/reject: Rate/Total of records released for retry or rejected to DLQ.acknowledge-time: Time betweenpoll()andacknowledge().sink-record-redelivery: Total records with delivery count > 1.
Exclusions: The following traditional sensors are not registered in share mode as they are not applicable: partition-count, offset-seq-number, and offset-commit-completion.
Proposed Changes
At‑Least‑Once (Share Group → SinkTask → External Sink)
Exactly‑Once (Same‑Cluster Kafka‑to‑Kafka, KIP‑1289)
`WorkerShareSinkTask` Lifecycle
Initialization
The setup phase is simplified because Share Groups eliminate partition-level management.
Consumer Creation: Instantiates
KafkaShareConsumer.Subscription: Subscribes to topics directly (no
RebalanceListenerneeded).Task Startup: Calls
task.initialize()andtask.start(). Unlike traditional sinks,task.open()is called once at startup for all topics since there are no rebalances.
Main Loop (Iteration)
The worker drives the at-least-once delivery guarantee by following this execution flow:
Poll: Pulls records from the share group.
Convert: Transforms messages into
SinkRecords.Deliver: Passes records to the connector via
task.put().Acknowledge:
Success: Marks all records as
ACCEPT.Retriable Error: Marks records as
RELEASEfor immediate broker re-delivery.Fatal Error: Marks records as
REJECT(routes to Dead Letter Queue if configured).
Commit: Calls
shareConsumer.commitSync()to durably persist the acknowledgments on the broker.
Ensuring No Data Loss (At-Least-Once)
he system guarantees that no data is lost by ensuring a record is only acknowledged (ACCEPT) after task.put() returns successfully.
Failure Recovery: If a task crashes before acknowledging, the record remains in an
ACQUIREDstate until theshare.acquisition.lock.timeout.msexpires.The broker then makes the record
AVAILABLEfor another task.Durable Commits: The default explicit mode uses
commitSync()to ensure acknowledgments are persistent. If a commit fails, the record is re-delivered.Duplicates: Occasional duplicates may occur if a crash happens after
task.put()but before acknowledgment. This is standard for at-least-once delivery and is best handled by idempotent sinks (e.g., upserts).
Exactly-Once Semantics (Future Phase, requires KIP-1289)
For Kafka-to-Kafka pipelines, exactly-once delivery is achieved by atomizing the producer's records and the consumer's acknowledgments within a single transaction via KIP-1289.
| Step | Action |
| 1. Begin | Start producer transaction. |
| 2. Produce | Send transformed records to output topics. |
| 3. Bind | Call producer.sendShareAcksToTransaction() to link share acks to the transaction. |
| 4. Commit | Atomically commit both output records and source acknowledgments. |
Configuration Resolution Order
The proposal reuses the existing consumer.override.* mechanism in Kafka Connect for a seamless transition.
Resolution Order:
Worker Config:
consumer.group.protocol=share(Global default).Connector Config:
consumer.override.group.protocol=share(Per-connector override).
Important Note on Group IDs: Share groups use a specific state topic (
__share_group_state).To avoid membership conflicts, users must ensure that a Share Group ID does not match an existing Consumer Group ID. A validation check will be implemented to prevent this collision.
4. Compatibility, Deprecation, and Migration Plan
Impact on Users: KIP-1302 is opt-in only and purely additive. Standard consumer groups remain the default, and zero code changes are required for existing connectors.
Migration Path:
Ensure brokers are version 4.0+.
Set
group.protocol=shareat the worker level (all connectors) or per-connector JSON.Tune
share.acquisition.lock.timeout.msto exceed your highest expectedtask.put()latency.
Rollback: Removing the
shareconfig reverts the connector to standard consumer groups.Note: Consumer groups and Share Groups track progress separately; a rollback may result in processing records that were already handled by the Share Group.
5. Test Plan
| Test Level | Key Objectives |
| Unit | Verify the poll-put-acknowledge loop (ACCEPT/RELEASE/REJECT) and config resolution. |
| Integration | Test elastic scaling (adding/removing tasks), task failure recovery, and interoperability between share and traditional connectors. |
| System | Performance benchmarking against traditional consumer groups and chaos testing to verify zero data loss. |
6. Rejected Alternatives
6. Rejected Alternatives
| Alternative | Reason for Rejection |
Modify SinkTask API | Adding explicit acknowledge() methods would break backward compatibility for all existing connectors. |
| Limit to MirrorMaker 2 | Generic sinks (S3, JDBC, etc.) benefit just as much from elastic scaling as Kafka-to-Kafka pipelines. |
| Require EOS Initially | At-least-once is sufficient for most use cases, and exactly-once is blocked by the pending KIP-1289/KIP-1310. |

