DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
1 Motivation
Vision: Make Flink–Kafka connector complete - one platform for both streaming and queue-like event processing.
Today, the Flink Kafka connector is strong for classic ordered stream consumption, but it is still not the best fit for queue-like event processing patterns.
By adding Kafka Share Group support, we close this gap and keep users on Kafka for both streaming and operational dispatch workloads.
1.1 Three use cases to mention
Background job processing.
Tasks like email delivery, report generation, thumbnail creation, and async workflow steps often push teams toward RabbitMQ-like systems.Burst-heavy operational events.
Order events, webhook handling, payment callbacks, and alert processing need elastic consumer scaling and queue semantics.Microservice command/event dispatch.
When many producers send work to one service, teams often move to Pub/Sub or RabbitMQ for simple single-consumer delivery.
The Flink connector itself has reduced checkpoint overhead -- no per-partition offset tracking, no offset-to-commit maps, minimal state serialization.
This simplifies the connector code and reduces Flink state backend I/O, less computation and memory usage at flink side.
2 Proposed Changes
2.1 At-Least-Once Guarantee: How It Works
TL;DR
1. User sets scan.share-group.id (SQL) or calls setShareGroupId() (DataStream) -- that one config switches from consumer-group mode to share-group mode.
2. Flink wraps KafkaShareConsumer in EXPLICIT acknowledgment mode -- the fetcher thread calls acknowledgeAll(RENEW) on every poll cycle to keep acquisition locks alive.
3. On checkpoint barrier, the fetcher pauses -- no new records acquired, freezing the in-flight set.
4. On checkpoint complete, the fetcher calls acknowledgeAll(ACCEPT) + commitSync() -- records become ACKNOWLEDGED (terminal) on the broker.
5. On failure, un-ACKed records auto-RELEASE after lock timeout, broker re-delivers them -- at-least-once guaranteed.
2.1.1 The Invariant
A record is ACKNOWLEDGED (terminal, cannot be re-delivered) if and only if its processing state has been successfully checkpointed in Flink's state backend.
Corollary: any record that is processed but not yet checkpointed remains in ACQUIRED state on the broker. If Flink fails before checkpoint,
the acquisition lock expires, the broker transitions the record back to AVAILABLE, and another consumer (or the same consumer after restart) will receive it again.
2.1.2 Steady-state sequence diagram
Why RENEW exists: without it, the lock timer counts down from the moment the record was acquired.
If checkpoint intervals are longer than the lock timeout, the record would be auto-RELEASED before Flink can ACCEPT it. RENEW resets the clock.
Why it is cheap: no state transition, no persistence to __share_group_state, just a timer reset. The broker does not write anything to disk for RENEW.
2.1.3 Checkpoint sequence diagram
Why PAUSE?
Without pausing, the fetcher thread would continue calling consumer.poll(), acquiring new records from the broker, and putting them in elementsQueue.
These new records would arrive AFTER the barrier but would be in-flight on the broker. When acknowledgeAll(ACCEPT) fires later,
it would ACCEPT these post-barrier records too -- records whose processing state is NOT in the checkpoint. That breaks at-least-once.
By pausing:
• The set of in-flight records on the broker is FROZEN at this point
• Everything in-flight = records emitted before the barrier = covered by this checkpoint
• acknowledgeAll(ACCEPT) later will ACCEPT exactly the right set
Why is the state so small?
Compare with traditional KafkaSource:
• KafkaSourceReader.snapshotState() captures Map<TopicPartition, OffsetAndMetadata> -- the exact offset per partition. This is the "seek target" for recovery.
For share group source:
• There are no offsets to store. The broker manages record delivery state.
• There are no pending acknowledgments to store. The client library tracks in-flight records.
• Recovery does not need offsets because there is no seek(). The broker re-delivers un-ACKed records automatically.
The checkpoint state is purely "what am I subscribed to" so that on recovery, Flink can re-subscribe to the same topics with the same share group ID.
what if this step fails?
If any task fails to acknowledge (e.g., one TaskManager crashes during snapshotState), the Checkpoint Coordinator aborts the checkpoint.
notifyCheckpointComplete() never fires. No ACCEPT is ever sent. Records remain ACQUIRED on the broker. Lock eventually expires. Records re-delivered. At-least-once maintained.
2.1.4 Failure recovery sequence diagram
When acquisition.lock.timeout.ms elapses: Broker: SharePartition.releaseAcquisitionLockOnTimeout() fires InFlightState transition:
ACQUIRED -> AVAILABLE deliveryCount incremented: 1 -> 2 Acquisition lock timer removed.
The steps : No ACCEPT sent -> lock expires -> AVAILABLE -> re-delivered. The broker's lock timeout is the safety net. Flink's checkpoint restore + broker re-delivery = at-least-once.
This is valid for carshes happening every step of record lifecyle, and this strong flow design make sure no data loss and at-least-once semantics.
2.1.5 Broker Record State Machine
share.group.delivery.count.limit on the broker must be configured high enough to tolerate Flink checkpoint failures without data loss (recommended >= 20).
If checkpoint is failing, means more re-delivery of records from kafka which increments the records deliveryCount. Once deliverycount exceeded the limit, share group mark it as archieved and will not be re-deliveried.
2.1.6 Record lifecycle timeline
2.1.7 Thread model
• fetch() loop -- calls acknowledgeAll(RENEW) then consumer.poll(). This is where Kafka network I/O happens. The poll() call blocks waiting for the broker's ShareFetch response.
Records received are put into elementsQueue for the mailbox thread.
• handle tasks -- executes PAUSE (set paused=true, fetch() returns empty), ACCEPT+COMMIT (acknowledgeAll(ACCEPT) + commitSync()), RESUME (set paused=false).
• Both fetch() and task handling interact with currentFetch (the Kafka client's internal in-flight record tracker, shown in blue because it is a Kafka client library object).
2.1.8 Component interaction overview
2.2 Exactly Once Guarantee: Not in Scope
Pending transaction work in Kafka Share group
3. Public Interfaces
3.1. KafkaShareGroupSource<T> -- the Source implementation
Users create this via the builder. It plugs into env.fromSource() exactly like KafkaSource.
KafkaShareGroupSource<String> source = KafkaShareGroupSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("orders", "events")
.setShareGroupId("order-processor")
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
3.2. SQL/Table option: scan.share-group.id
CREATE TABLE orders (
id BIGINT, amount DOUBLE
) WITH (
'connector' = 'kafka', -- same connector
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.share-group.id' = 'order-processor', -- this one option switches the mode
'format' = 'json'
);
New Internal Classes (not user-facing, but visible to lead developers)
sharegroup/
KafkaShareGroupSource Source implementation
KafkaShareGroupSourceBuilder Builder
reader/
ShareGroupSourceReader snapshotState: PAUSE, notifyCkptComplete: ACCEPT+RESUME
ShareGroupSplitReader wraps KafkaShareConsumer (EXPLICIT mode)
ShareGroupFetcherManager pause/resume + accept task execution
ShareGroupRecordEmitter emitRecord, no tracking
split/
ShareGroupSplit splitId + topics + shareGroupId (no offsets)
ShareGroupSplitState trivial wrapper (no mutable offset state)
ShareGroupSplitSerializer serialization
ShareGroupSplitRecords RecordsWithSplitIds implementation
enumerator/
ShareGroupEnumerator validates topics, assigns splits (no partition discovery)
ShareGroupEnumState assigned subtasks
ShareGroupEnumStateSerializer serialization
ShareGroupSourceOptions option constants
Modified Existing Classes (3 files, minimal changes)
KafkaConnectorOptions.java +2 options: SHARE_GROUP_ID, SHARE_GROUP_LOCK_TIMEOUT
KafkaDynamicTableFactory.java +1 branch: if share-group.id present, create ShareGroupSource
KafkaDynamicSource.java +1 method: createShareGroupSource()
4.Compatibility, Deprecation, and Migration Plan
Compatibility
Backward compatible. Zero breaking changes.
• All existing KafkaSource, KafkaSink, and Table/SQL configurations continue to work without modification.
• The existing connector = 'kafka' identifier is reused. The presence of scan.share-group.id activates the share group path. Absence of this option preserves the existing behavior exactly.
Kafka version requirement:
• Share groups require Kafka 4.0+ brokers (KIP-932).
Flink version requirement:
• No changes to Flink core. Uses existing Source / SplitEnumerator / SourceReader framework.
Deprecation
Nothing is deprecated. Share groups and consumer groups solve different problems. KafkaSource remains the recommended source for stream processing with offset-based consumption and exactly-once support.
Migration Plan
No migration path exists or is needed. Users do not migrate from consumer groups to share groups. They are different consumption models for different use cases:
| Use Case | Source | Why |
|---|---|---|
| Stream processing, exactly-once | KafkaSource (existing) | Requires offset control and transactions |
| Stream processing, at-least-once | KafkaSource (existing) | Requires offset control and seek for replay |
| Task queue, competing consumers, at-least-once | PubSubSource | Simplifies horizontal scaling and automatic acknowledgement |
5. Test Plan
| Test Level | Key Objectives |
| Unit | Verify the poll-process-put loop (ACCEPT/RELEASE/REJECT) and flink checkpointing, snapshot mechanism. |
| Integration | Test elastic scaling (adding/removing tasks), task failure recovery. |
| System | Performance benchmarking against traditional consumer groups and chaos testing to verify zero data loss. |
6. Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.






