Status

Current state: Draft

Discussion thread 

JIRA:   KAFKA-20458 - Getting issue details... STATUS  

1 Motivation

This KIP introduces a new IN_PROGRESS state that the client transitions records into via an explicit markInProgress API.

The state carries a progress timestamp (and optional progress hints), giving the broker an authoritative signal of real work, surfacing it through metrics and admin RPCs,

and enabling staleness-based detection of stuck workers. 

1.1 Why this matters now

These problems are blocking adoption of share groups for the workloads where they would matter most:

heterogeneous-latency stream processing, ML inference pipelines, external API enrichment, batch-flushing sinks.

These workloads have processing times measured in seconds-to-minutes per record, where the

difference between "broker knows work is happening" and "broker silently holds the lock" determines whether the system can be operated with confidence.

2 Public Interfaces


2.1 New client API

public interface ShareConsumer<K, V> {
    void markInProgress(ConsumerRecord<K, V> record);
}


Why: explicit, payload-carrying signal that real work is happening (vs RENEW's payload-less timer reset).


2.2 New RecordState value

RecordState.IN_PROGRESS  ((byte) 5)


Why: a state, not a flag. Visible in metrics, admin RPCs, and the state machine. Enables operator queries like "show me records that have been processed or in progress state."

Allowed transitions (additions only)

ACQUIRED    → IN_PROGRESS         (markInProgress)
IN_PROGRESS → ACKNOWLEDGED        (ACCEPT)
IN_PROGRESS → AVAILABLE           (RELEASE / staleness / max-extension)
IN_PROGRESS → ARCHIVING           (REJECT / max-delivery-count)


Why: preserves the invariant "client must hold the lock before claiming work." All terminal/release paths from ACQUIRED are mirrored.

2.3 New configs

ConfigDefaultPurpose
group.share.in.progress.staleness.threshold.ms90000 (3× lock duration)Release IN_PROGRESS record if no progress signal in this window — detects stuck workers
group.share.in.progress.max.lock.extension.ms1800000 (30 min)Hard upper bound on total IN_PROGRESS time — detects runaway workers

2.4 New wire RPC

MarkInProgressRequest/Response (apiKey 81). Schema mirrors ShareAcknowledgeRequest with ProgressBatches carrying (firstOffset, lastOffset).

Why a separate RPC, not a new AcknowledgeType: progress is conceptually distinct from acknowledgement.

2.5 New admin API


admin.describeShareGroupInFlight(groupId, options)
   .heldLongerThanMs(...)
   .progressStalenessMs(...)
   .onlyState(IN_PROGRESS)


Backed by new DescribeShareGroupInFlightRequest/Response (apiKey 82).
Why: operators need to query "show me stuck records" without scraping logs.

kafka-share-groups.sh additions

--in-flight   --state IN_PROGRESS   --progress-stale-longer-than-ms 30000


Why: same operator visibility on the CLI surface, including a STALE annotation that flags problem records.


3 Proposed Changes


3.1 InFlightState extension


Add four fields to SharePartition.InFlightState:

lastProgressTimeMs       (-1 if never markInProgress'd)
inProgressStartTimeMs    (when transitioned to IN_PROGRESS)

3.2 markInProgress handler


In SharePartitionManager.handleMarkInProgressRequest:

 1.  Validate ownership (memberId matches) and current state (ACQUIRED or IN_PROGRESS).
 2.  Validate cumulative IN_PROGRESS time ≤ max.lock.extension.ms.
 3.  Transition ACQUIRED → IN_PROGRESS (or refresh lastProgressTimeMs if already in IN_PROGRESS).
 4.  Cancel + reschedule AcquisitionLockTimerTask.
 5.  Async-write delta record to __share_group_state.


Why: minimal addition; reuses the existing writeLock + timer + persistence machinery from acknowledge().

3.3 Staleness sweeper


A background task per SharePartition runs every (staleness.threshold.ms / 4):

•  If lastProgressTimeMs is older than threshold → release to AVAILABLE, increment deliveryCount.
•  If inProgressStartTimeMs exceeds max.extension.ms → release regardless.


Why: broker is the only authoritative detector of "no signal in N seconds" — clients that need detection are the ones that have crashed.

Rebalance flow


In SharePartitionManager.handleMemberLeaving, distinguish:

•  ACQUIRED records → release immediately (existing behaviour).
•  IN_PROGRESS records with fresh progress → hold for group.share.in.progress.rebalance.grace.ms (default 5000) before releasing.
•  IN_PROGRESS records with stale progress → release immediately (worker was already stuck).


Why: preserves work for healthy in-progress records across brief member churn.


DelayedShareFetch


acquirablePartitions already excludes ACQUIRED; extend to also exclude IN_PROGRESS. 


Client side


ShareConsumerImpl.markInProgress enqueues a pending progress batch; flushed in a separate MarkInProgress RPC on the next poll() boundary

or when the pending count exceeds max.in-flight.progress.batches (default 100).
Why: amortises RPC cost; mirrors how acknowledge() is piggybacked today.



  • No labels