DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Draft
Discussion thread
JIRA: KAFKA-20458 - Getting issue details... STATUS
1 Motivation
This KIP introduces a new IN_PROGRESS state that the client transitions records into via an explicit markInProgress API.
The state carries a progress timestamp (and optional progress hints), giving the broker an authoritative signal of real work, surfacing it through metrics and admin RPCs,
and enabling staleness-based detection of stuck workers.
1.1 Why this matters now
These problems are blocking adoption of share groups for the workloads where they would matter most:
heterogeneous-latency stream processing, ML inference pipelines, external API enrichment, batch-flushing sinks.
These workloads have processing times measured in seconds-to-minutes per record, where the
difference between "broker knows work is happening" and "broker silently holds the lock" determines whether the system can be operated with confidence.
2 Public Interfaces
2.1 New client API
public interface ShareConsumer<K, V> {
void markInProgress(ConsumerRecord<K, V> record);
}
Why: explicit, payload-carrying signal that real work is happening (vs RENEW's payload-less timer reset).
2.2 New RecordState value
RecordState.IN_PROGRESS ((byte) 5)
Why: a state, not a flag. Visible in metrics, admin RPCs, and the state machine. Enables operator queries like "show me records that have been processed or in progress state."
Allowed transitions (additions only)
ACQUIRED → IN_PROGRESS (markInProgress)
IN_PROGRESS → ACKNOWLEDGED (ACCEPT)
IN_PROGRESS → AVAILABLE (RELEASE / staleness / max-extension)
IN_PROGRESS → ARCHIVING (REJECT / max-delivery-count)
Why: preserves the invariant "client must hold the lock before claiming work." All terminal/release paths from ACQUIRED are mirrored.
2.3 New configs
| Config | Default | Purpose |
|---|---|---|
group.share.in.progress.staleness.threshold.ms | 90000 (3× lock duration) | Release IN_PROGRESS record if no progress signal in this window — detects stuck workers |
group.share.in.progress.max.lock.extension.ms | 1800000 (30 min) | Hard upper bound on total IN_PROGRESS time — detects runaway workers |
2.4 New wire RPC
MarkInProgressRequest/Response (apiKey 81). Schema mirrors ShareAcknowledgeRequest with ProgressBatches carrying (firstOffset, lastOffset).
Why a separate RPC, not a new AcknowledgeType: progress is conceptually distinct from acknowledgement.
2.5 New admin API
admin.describeShareGroupInFlight(groupId, options)
.heldLongerThanMs(...)
.progressStalenessMs(...)
.onlyState(IN_PROGRESS)
Backed by new DescribeShareGroupInFlightRequest/Response (apiKey 82).
Why: operators need to query "show me stuck records" without scraping logs.
kafka-share-groups.sh additions
--in-flight --state IN_PROGRESS --progress-stale-longer-than-ms 30000
Why: same operator visibility on the CLI surface, including a STALE annotation that flags problem records.
3 Proposed Changes
3.1 InFlightState extension
Add four fields to SharePartition.InFlightState:
lastProgressTimeMs (-1 if never markInProgress'd) inProgressStartTimeMs (when transitioned to IN_PROGRESS)
3.2 markInProgress handler
In SharePartitionManager.handleMarkInProgressRequest:
1. Validate ownership (memberId matches) and current state (ACQUIRED or IN_PROGRESS).
2. Validate cumulative IN_PROGRESS time ≤ max.lock.extension.ms.
3. Transition ACQUIRED → IN_PROGRESS (or refresh lastProgressTimeMs if already in IN_PROGRESS).
4. Cancel + reschedule AcquisitionLockTimerTask.
5. Async-write delta record to __share_group_state.
Why: minimal addition; reuses the existing writeLock + timer + persistence machinery from acknowledge().
3.3 Staleness sweeper
A background task per SharePartition runs every (staleness.threshold.ms / 4):
• If lastProgressTimeMs is older than threshold → release to AVAILABLE, increment deliveryCount.
• If inProgressStartTimeMs exceeds max.extension.ms → release regardless.
Why: broker is the only authoritative detector of "no signal in N seconds" — clients that need detection are the ones that have crashed.
Rebalance flow
In SharePartitionManager.handleMemberLeaving, distinguish:
• ACQUIRED records → release immediately (existing behaviour).
• IN_PROGRESS records with fresh progress → hold for group.share.in.progress.rebalance.grace.ms (default 5000) before releasing.
• IN_PROGRESS records with stale progress → release immediately (worker was already stuck).
Why: preserves work for healthy in-progress records across brief member churn.
DelayedShareFetch
acquirablePartitions already excludes ACQUIRED; extend to also exclude IN_PROGRESS.
Client side
ShareConsumerImpl.markInProgress enqueues a pending progress batch; flushed in a separate MarkInProgress RPC on the next poll() boundary
or when the pending count exceeds max.in-flight.progress.batches (default 100).
Why: amortises RPC cost; mirrors how acknowledge() is piggybacked today.