...
Operation | State changes | Cumulative state | WriteShareGroupState request | ||
---|---|---|---|---|---|
Starting state of topic-partition with latest offset 100 | SPSO=100, SPEO=100 | SPSO=100, SPEO=100 |
| ||
In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards | |||||
Fetch records 100-109 | SPEO=110, records 100-109 (acquired, delivery count 1) | SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1) | |||
Acknowledge 100-109 | SPSO=110 | SPSO=110, SPEO=110 |
| ||
With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records | |||||
Fetch records 110-119 Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119 | SPEO=120, records 110-119 (acquired, delivery count 1) | SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1) | |||
Release 110 (consumer 1) | record 110 (available, delivery count 1) | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1) |
Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation. | ||
Acknowledge 119 (consumer 3) | record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged |
| ||
Fetch records 110, 120 (consumer 1) | SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) | |||
Lock timeout elapsed 111, 112 (consumer 1's records) | records 111-112 (available, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) |
| ||
Acknowledge 113-118 (consumer 2) | records 113-118 acknowledged | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
| ||
Fetch records 111, 112 (consumer 3) | records 111-112 (acquired, delivery count 2) | SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) | |||
Acknowledge 110 (consumer 1) | SPSO=111 | SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
| ||
Acknowledge 111, 112 (consumer 3) | SPSO=120 | SPSO=120, SPEO=121, record 120 (acquired, delivery count 1) |
|
...
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "InitializeShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about": "The group identifier." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch for this share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset, or -1 if the start offset is not being initialized." } ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "InitializeShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "none0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." } ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ReadShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about":"The group identifier." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." } ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ReadShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "none0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch for this share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[ { "name": "BaseOffset", "type": "int64", "versions": "0+", "about": "The base offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this state batch." }, { "name": "State", "type": "int8", "versions": "0+", "about": "The state - 0:Available,2:Acked,4:Archived." }, { "name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count." } ]} ] } |
WriteShareGroupState API
...
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "WriteShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about": "The group identifier." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch for this share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset, or -1 if the start offset is not being written." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [ { "name": "BaseOffset", "type": "int64", "versions": "0+", "about": "The base offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this state batch." }, { "name": "State", "type": "int8", "versions": "0+", "about": "The state - 0:Available,2:Acked,4:Archived" }, { "name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count." } ]} ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "WriteShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "none0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." } ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "DeleteShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ {"name": "GroupId", "type": "string", "versions": "0+", "about": "The group identifier." }, {"name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, {"name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." } ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DeleteShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "none0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." } ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ReadShareGroupOffsetsStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about":"The group identifier." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." } ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ReadShareGroupOffsetsStateResponse", "validVersions": "0", "flexibleVersions": "none0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch for this share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." } ]} ] } |
Records
This section describes the new record types.
...