Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The KRaft leader will not perform writes from the state machine (active controller) or client until is has written to the log an AddVoterRecord for every replica id in the controller.quorum.voters  configuration.

If the cluster metadata topic partition contains AddVoterRecords for replica ids that do not enumerated in controller.quorum.voters then the replica will faile to start and shutdown.

controller.quorum.bootstrap.servers

This configuration describe the set of hosts and ports that can be queried to discover the cluster metadata partition leader. Observers and to-be-added voters will send Fetch requests to this list of servers until the leader is discovered.

...

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "response",
  "name": "AddVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch." },
          { "name": "EndPoint", "type": "Endpoint", "versions": "0+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "NameHost", "type": "string", "versions": "0+", "mapKey": true,
              "about": "The name of the endpointhostname." },
            { "name": "HostPort", "type": "stringuint16", "versions": "0+",
              "about": "The hostnameport." },
          ]}
    { "name": "Port", "type": "uint16", "versions": "0+",    ]}
      ]}
        "about": "The port." },
            { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
              "about": "The security protocol." }
          ]}
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a AddVoter request it will do the following:

]}
  ]
}

Handling

When the leader receives a AddVoter request it will do the following:

  1. Wait Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait for until there are no uncommitted add or remove voter records.
  3. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  4. Append the AddVoterRecord to the log.
  5. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  6. Wait for the AddVoterRecord to commit using the majority of new configuration.
  7. Send the AddVoter response to the client.

...

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "response",
  "name": "RemoveVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch." },
          { "name": "EndPoint", "type": "Endpoint", "versions": "0+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "NameHost", "type": "string", "versions": "0+", "mapKey": true,
              "about": "The name of the endpointhostname." },
            { "name": "Host", "type": "string", "versions": "0+",
              "about": "The hostname." },
            { "name": "Port"Port", "type": "uint16", "versions": "0+",
              "about": "The port." },
            { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
              "about": "The security protocol." }
          ]}
        ]}
      ]}
    ]}
  ]
}

Handling

...

  1. Wait for until there are no uncommitted add or remove voter recordsrecords.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  3. Append the RemoveVoterRecord to the log.
  4. The KRaft internal listener will read this record from the log and remove the voter from the voter set.
  5. Wait for the RemoveVoterRecord to commit using the majority of new configuration.
  6. Send the RemoveVoter response to the client.
  7. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.

...

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    ...
    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "ignorable": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        ...
  name." },
       { "name": "CurrentLeaderTopicId", "type": "CurrentLeaderuuid",
          "versions": "1213+", "taggedVersionsignorable": "12+"true, "tagabout": 1, "fields": [
  "The unique topic ID"},
         { "name": "LeaderIdPartitions", "type": "int32[]PartitionData", "versions": "120+", "default": "-1", "entityType": "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
  topic partitions.", "fields": [
        ...
        { "name": "LeaderEpochCurrentLeader", "type": "int32CurrentLeader",
          "versions": "12+", "defaulttaggedVersions": "-112+",
 "tag":           "about1, "fields": "The latest known leader epoch"},
          [
           { "name": "EndPointLeaderId", "type": "Endpointint32", "versions": "1412+", "default": "-1", "entityType": "brokerId",
            "about": "The endpointID thatof canthe becurrent usedleader toor communicate-1 withif the leader is unknown."}, "fields": [
            { "name": "NameLeaderEpoch", "type": "stringint32", "versions": "1412+", "mapKeydefault": true"-1",
              "about": "The namelatest ofknown theleader endpoint.epoch" },
                  { "name": "HostEndPoint", "type": "stringEndpoint", "versions": "14+",
              "about": "The hostname." }, endpoint that can be used to communicate with the leader", "fields": [
            { "name": "PortHost", "type": "uint16string", "versions": "14+",
              "about": "The porthostname." },
            { "name": "SecurityProtocolPort", "type": "int16uint16", "versions": "14+",
              "about": "The security protocolport." }
          ]}
        ]},
        ...
      ]}
    ]}
  ]
}

...

Code Block
languagejs
{
  "apiKey": 59,
  "type": "response",
  "name": "FetchSnapshotResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch fetched",
          "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+" },
          { "name": "Epoch", "type": "int32", "versions": "0+" }
        ]},
        { "name": "CurrentLeader", "type": "CurrentLeader",
          "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",, "fields": [
           { "aboutname": "The latest known leader epoch"},
          { "nameLeaderId", "type": "EndPointint32", "typeversions": "Endpoint0+", "versionsentityType": "1+brokerId",
            "about": "The endpointID thatof canthe becurrent usedleader toor communicate-1 withif the leader is unknown."}, "fields": [
            { "name": "NameLeaderEpoch", "type": "stringint32", "versions": "10+", "mapKey": true,
              "about": "The namelatest ofknown theleader endpoint.epoch" },
                 { "name": "HostEndPoint", "type": "stringEndpoint", "versions": "1+",
              "about": "The hostname." }, endpoint that can be used to communicate with the leader", "fields": [
            { "name": "PortHost", "type": "uint16string", "versions": "1+",
              "about": "The porthostname." },
            { "name": "SecurityProtocolPort", "type": "int16uint16", "versions": "1+",
              "about": "The security protocolport." }
          ]},
        { "name": "Size", "type": "int64", "versions": "0+",
          "about": "The total size of the snapshot." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The starting byte position within the snapshot included in the Bytes field."   },
        { "name": "UnalignedRecords", "type": "records", "versions": "0+",
          "about": "Snapshot data in records format which may not be aligned on an offset boundary" }
      ]}
    ]}
  ]
}

...

Code Block
languagejs
{
  "apiKey": 52,
  "type": "response",
  "name": "VoteResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "VoteGranted", "type": "bool", "versions": "0+",
          "about": "True if the vote was granted and false otherwise"},
           { "name": "LeaderEndPointVoterUuid", "type": "Endpointbool", "versions": "1+", "taggedVersions": "+1", "tag": 0,
          "about": "The endpointreplica thatgenerated canuuid befor usedthe toreplica communicatecasting witha the leader", "fields": [vote." },
             { "name": "NameLeaderEndPoint", "type": "stringEndpoint", "versions": "1+", "mapKeytaggedVersions": true,
            "about": "The name of the endpoint." }"+1", "tag": 0,
          { "name": "Host", "type": "string", "versions": "1+",
            "about": "The hostname." },"about": "The endpoint that can be used to communicate with the leader", "fields": [
          { "name": "PortHost", "type": "uint16string", "versions": "1+",
            "about": "The porthostname." },
          { "name": "SecurityProtocolPort", "type": "int16uint16", "versions": "1+",
            "about": "The security protocolport." }
        ]}
      ]}
    ]}
  ]
}

Handling

...

Code Block
languagejs
{
  "apiKey": 53,
  "type": "response",
  "name": "BeginQuorumEpochResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicNameErrorCode", "type": "stringint16", "versions": "0+", "entityType": "topicName",
        "about": "The topic nametop level error code." },
      { "name": "PartitionsTopics", "type": "[]PartitionDataTopicData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndexTopicName", "type": "int32string", "versions": "0+",
 "entityType": "topicName",
        "about": "The partitiontopic indexname." },
        { "name": "ErrorCodePartitions", "type": "int16[]PartitionData", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityTypefields": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},[
        { "name": "LeaderEpochPartitionIndex", "type": "int32", "versions": "0+",
          "about": "The latestpartition known leader epoch"index." },
           { "name": "LeaderEndPointErrorCode", "type": "Endpointint16", "versions": "10+"},
  "taggedVersions      { "name": "LeaderId", "type": "int32", "versions": "0+1", "tagentityType": 0"brokerId",
          "about": "The endpointID thatof canthe becurrent usedleader toor communicate-1 withif the leader is unknown."}, "fields": [
          { "name": "NameLeaderEpoch", "type": "stringint32", "versions": "10+", "mapKey": true,
            "about": "The namelatest ofknown theleader endpoint.epoch" },
             { "name": "HostLeaderEndPoint", "type": "stringEndpoint", "versions": "1+"+", "taggedVersions": "+1", "tag": 0,
            "about": "The hostname." }, endpoint that can be used to communicate with the leader", "fields": [
          { "name": "PortHost", "type": "uint16string", "versions": "1+",
            "about": "The porthostname." },
          { "name": "SecurityProtocolPort", "type": "int16uint16", "versions": "1+",
            "about": "The security protocolport." }
        ]}
      ]}
    ]}
  ]
}

Handling

...

Code Block
languagejs
{
  "apiKey": 54,
  "type": "response",
  "name": "EndQuorumEpochResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type "versions": "int160+", "versionsfields": "0+"},[
        { "name": "LeaderIdPartitionIndex", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."partition index." },
        { "name": "LeaderEpochErrorCode", "type": "int32int16", "versions": "0+"},
          "about": "The latest known leader epoch"},
        { "name": "LeaderEndPointLeaderId", "type": "Endpointint32", "versions": "10+", "taggedVersionsentityType": "+1", "tag": 0brokerId",
          "about": "The endpointID thatof canthe becurrent usedleader toor communicate-1 withif the leader is unknown."}, "fields": [
          { "name": "NameLeaderEpoch", "type": "string", "versions": "1+int32", "mapKeyversions": true"0+",
            "about": "The namelatest ofknown theleader endpoint.epoch" },
             { "name": "HostLeaderEndPoint", "type": "stringEndpoint", "versions": "1+",
   "taggedVersions": "+1", "tag": 0,
          "about": "The hostname." }, endpoint that can be used to communicate with the leader", "fields": [
          { "name": "PortHost", "type": "uint16string", "versions": "1+",
            "about": "The porthostname." },
          { "name": "SecurityProtocolPort", "type": "int16uint16", "versions": "1+",
            "about": "The security protocolport." }
        ]}
      ]}
    ]}
  ]
}

Handling

...

The features in this KIP will be supported if the ApiVersions of all of the voters and observers is greater than the versions described here. If the leader has a replica UUID for all of the voters then this KIP is supported by all of the voters.

Upgrading to controller. quorum.bootstrap.servers

TODO: figure out a way to do this. The requirement is the AddVoterRecord for all of the voters in the static configuration are committed. How do we guarantee this? Is seeing a committed AddVoterRecord enough?

Test Plan

This KIP will be tested using unittest, integration tests, system test, simulation tests and TLA+ specification.

...

  1. Ongaro, Diego, and John Ousterhout. "In search of an understandable consensus algorithm." 2014 {USENIX} Annual Technical Conference ({USENIX}{ATC} 14). 2014.
  2. Ongaro, Diego. Consensus: Bridging theory and practice. Diss. Stanford University, 2014.

  3. Bug in single-server membership changes
  4. KIP-595: A Raft Protocol for the Metadata Quorum
  5. KIP-630: Kafka Raft Snapshot
  6. KIP-631: The Quorum-based Kafka Controller