Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Corrections in RPCs and formatting

...

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available), 
      "DeliveryCount": 1
    }
  ]
}


Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.


Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),
            "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged), 
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available), 
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged), 
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged), 
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111, 112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []
}


...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The topic identifier." },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch for this share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0+",
      "about": "The share-partition start offset, or -1 if the start offset is not being initialized." }
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "InitializeShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The topic identifier." },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index." }
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
   "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch for this share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0+",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[
      { "name": "BaseOffset", "type": "int64", "versions": "0+",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0+",
        "about": "The last offset of this state batch." },
      { "name": "State", "type": "int8", "versions": "0+",
        "about": "The state - 0:Available,2:Acked,4:Archived." },
      { "name": "DeliveryCount", "type": "int16", "versions": "0+",
        "about": "The delivery count." }
    ]}
  ]
}

WriteShareGroupState API

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The topic identifier." },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch for this share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0+",
      "about": "The share-partition start offset, or -1 if the start offset is not being written." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [
      { "name": "BaseOffset", "type": "int64", "versions": "0+",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0+",
        "about": "The last offset of this state batch." },
      { "name": "State", "type": "int8", "versions": "0+",
        "about": "The state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0+",
        "about": "The delivery count." }
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    {"name": "GroupId", "type": "string", "versions": "0+",
     "about": "The group identifier." },
    {"name": "TopicId", "type": "uuid", "versions": "0+",
     "about": "The topic identifier." },
    {"name": "Partition", "type": "int32", "versions": "0+",
     "about": "The partition index." }
   ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupOffsetsStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The topic identifier." },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index." }
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupOffsetsStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch for this share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0+",
      "about": "The share-partition start offset." }
   ]}
 ]
}

Records

This section describes the new record types.

...