Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Small changes in response to discussion points.

...

Status

Current stateUnder DiscussionDiscussionf

Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qtlg0gq96lq

...

For each share-partition, the share group adds some state management for the records being consumed. The starting offset of records which are eligible for consumption is known as the share-partition start offset (SSOSPSO), and the last offset of records which are eligible for consumption is known as the share-partition end offset (SEOSPEO). The records between starting at the SSO SPSO and up to the SEO SPEO are known as the in-flight records. So, a share-partition is essentially managing the consumption of the in-flight records.

The SEO SPEO is not necessarily always at the end of the topic-partition and it just advances freely as records are fetched beyond this point. The segment of the topic-partition between the SSO SPSO and the SEO SPEO is a sliding window that moves as records are consumed. The share-partition leader limits the distance between the SSO SPSO and the SEOSPEO. The upper bound is controlled by the broker configuration share.record.lock.partition.limit. Unlike existing queuing systems, there’s no “maximum queue depth”, but there is a limit to the number of in-flight records at any point in time.

...

State

Description

Available

The record is available for a consumer

Acquired

The record has been acquired for a specific consumer, with a time-limited acquisition lock

Acknowledged

The record has been processed and acknowledged by a consumer

Archived

The record is not available for a consumer

All records before the SSO SPSO are in Archived state. All records after the SEO SPEO are in Available state, but not yet being delivered to consumers.

...

Code Block
+--------------+
|  Available   |<------------------+
+--------------+                   |
       |                           |
       | acquired                  | - if (delivery count < share.delivery.attempt.limit)
       | for consumer              |     - released by consumer
       | (delivery count++)        |     - acquisition lock elapsed
       V                           |
+--------------+                   |
|   Acquired   |-------------------+
+--------------+                   |
       |                           |
       | accepted                  | - if (delivery count == share.delivery.attempt.limit)
       | by consumer               |     - released by consumer
       |                           |     - acquisition lock elapsed
       V                           | OR
+--------------+                   | - rejected by consumer as unprocessable
| Acknowledged |                   |
+--------------+                   |
       |                           |
       | SSOSPSO moves                 |
       | past record               |
       |                           |
       V                           |
+--------------+                   |
|   Archived   |<------------------+
+--------------+

When records are fetched for a consumer, the share-partition leader starts at the SSO SPSO and finds Available records. For each record it finds, it moves it into Acquired state, bumps its delivery count and adds it to a batch of acquired records to return to the consumer. The consumer then processes the records and acknowledges their consumption. The delivery attempt completes successfully and the records move into Acknowledged state.

Alternatively, if the consumer cannot process a record or its acquisition lock elapses, the delivery attempt completes unsuccessfully and the record’s next state depends on the delivery count. If the delivery count has reached the cluster’s share delivery attempt limit (5 by default), the record moves into Archived state and is not eligible for additional delivery attempts. If the delivery count has not reached the limit, the record moves back into Available state and can be delivered again.

This means that the delivery behavior is at-least-once.

Ordering

Share groups focus on primarily on sharing to allow consumers to be scaled independently of partitions. The records in a share-partition can be delivered out of order to a consumer, in particular when redeliveries occur.

...

The records returned in a batch for particular share-partition are guaranteed to be in order of increasing offset. There are no guarantees about the ordering of offsets between different batches.

Managing the

...

SPSO and

...

SPEO

The consumer group concepts of seeking and position do not apply to share groups. The SSO SPSO for each share-partition can be initialized for an empty share group and the SEO SPEO naturally moves forwards as records are consumed.

When a topic subscription is added to a share group for the first time, the SSO SPSO is initialized for each share-partition. By default, the SSO SPSO for each share-partition is initialized to the latest offset for the corresponding topic-partitions.

Alternatively, there is an administrative action available using either AdminClient.alterShareGroupOffsets or the kafka-share-groups.sh tool to reset the SSO SPSO for an empty share group with no active members. This can be used to “reset” a share group to the start of a topic, a particular timestamp or the end of a topic. It can also be used to initialize the share group to the start of a topic. Resetting the SSO SPSO discards all of the in-flight record state and delivery counts.

...

If the number of partitions is increased for a topic with a subscription in a share group, the SSO SPSO for the newly created share-partitions is initialized to 0 (which is of course both the earliest and latest offset for an empty topic-partition). This means there is no doubt about what happens when the number of partitions is increasedis increased.

If the SPSO is reset to an offset that has been tiered to remote storage (KIP-405: Kafka Tiered Storage), there will be a performance impact just as for existing consumers fetching records from remote storage.

In-flight records example

...

Code Block
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   0   |   1   |   2   |   3   |   4   |   5   |   6   |   7   |   8   |   9   |  ...  | <- offset
| Archv | Archv | Acqrd | Avail | Acqrd | Acked | Archv | Avail | Avail | Avail | Avail | <- state
|       |       |   1   |   2   |   1   |       |       |       |       |       |       | <- delivery count
+-------+-------+---^---+-------+-------+-------+-------+-------+-------+---^---+-------+
                    |                                                       |
                    +-- Share-partition start offset (SSO)         SPSO)                   +-- Share-partition end offset (SEOSPEO) 

The share group is currently managing the consumption of the in-flight records, which have offsets 2 to 8 inclusive.

...

The cluster records this information durably. In this example, the durable state contains the SSO SPSO position, the non-zero delivery count for offset 3, the Acknowledged state of offset 5, and the Archived state of offset 6.

...

The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:

  • The Share-Partition Start Offset (SSOSPSO)

  • The state of the in-flight records

  • The delivery counts of records whose delivery failed

...

Operation

State changes

Cumulative state

Starting state of topic-partition with latest offset 100

SSOSPSO=100, SEOSPEO=100SSO

SPSO=100, SEOSPEO=100

In the batched case with successful processing, there’s a state change per batch to move the SSO SPSO forwards

Fetch records 100-109

SEOSPEO=110, records 100-109 (acquired, delivery count 1)

SSOSPSO=100, SEOSPEO=110, records 100-109 (acquired, delivery count 1)

Acknowledge 100-109

SSOSPSO=110

SSOSPSO=110, SEOSPEO=110

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

Fetch records 110-119

SEOSPEO=120, records 110-119 (acquired, delivery count 1)

SSOSPSO=110, SEOSPEO=120, records 110-119 (acquired, delivery count 1)

Release 110

record 110 (available, delivery count 1)

SSOSPSO=110, SEOSPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

Acknowledge 119

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SSOSPSO=110, SEOSPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

Fetch records 110, 120

SEOSPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SSOSPSO=110, SEOSPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Lock timeout elapsed 111, 112

records 111-112 (available, delivery count 1)

SSOSPSO=110, SEOSPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 113-118

records 113-118 acknowledged

SSOSPSO=110, SEOSPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Fetch records 111,112

records 111-112 (acquired, delivery count 2)

SSOSPSO=110, SEOSPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 110

SSOSPSO=111

SSOSPSO=111, SEOSPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 111,112

SSOSPSO=120

SSOSPSO=120, SEOSPEO=121, record 120 (acquired, delivery count 1)

...

ConfigurationDescriptionValues
share.group.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future release.
share.delivery.count.limit The maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
share.record.lock.duration.ms Share-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
share.record.lock.duration.max.ms Share-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
share.record.lock.partition.limit Share-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000

...

ConfigurationDescriptionValues
group.type Type of the group: "consumer"  or "share" .Default "consumer"
record.lock.duration.ms Record acquisition lock duration in milliseconds.null, which uses the cluster configuration share.record.lock.duration.ms, minimum 1000, maximum limited by the cluster configuration share.record.lock.duration.max.ms 

Kafka protocol changes

Further details to follow as the design progresses.

...

A “browsing” consumer which does not modify the share group state or take acquisition locks could be supported which needs lesser permission ( DESCRIBE ) on the group than a proper consumer ( READ ). This is a little more complicated because it needs to have a position independent of the SSO SPSO so that it can traverse along the queue.

...