Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Minor edits for review comments

...

  • The assignor hashes the member IDs of the members and maps the partitions assigned to the members based on the hash. This gives approximately even balance.

  • If any partitions which were not assigned any members, members are assigned members round-robin until each partition has at least one member assigned to it.

...

In order to retain similarity with KafkaConsumer  and make it easy for applications to move between the two interfaces, KafkaShareConsumer  follows the same threading rules as KafkaConsumer . It is not thread-safe and only one thread at a time may called the call the methods of KafkaShareConsumer . Unsynchronized access will result in ConcurrentModificationException . The only exception to this rule is KafkaShareConsumer.wakeup()  which may be called from any thread.

...

The records also include a leader epoch. Whenever the share-partition leader calls the share coordinator, it provides the leader epoch of the partition in the request. The share coordinator uses this to fence zombie share-partition leaders. When a new leader is elected for a share-partition, the leader epoch of the partition is incremented. This means that the new leader will use a higher leader epoch in its requests to the share coordinator, and any trailing requests from earlier share-partition leaders can be rejected with FENCED_LEADER_EPOCH . The leader epoch is persisted by the share coordinator in its ShareSnapshot record. When a new leader epoch is received, it causes the share coordinator to write a new ShareSnapshot record and ShareUpdate records. When a new state epoch is used, the leader epoch is initialized to -1 , and it is properly initialized when a share-partition leader makes a request.

...

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * A detailed description of a single share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupDescription {
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

  /**
   * The share group state, or UNKNOWN if the state cannot be parsed.
   */
  public ShareGroupState state();

  /**
   * The share group coordinator, or null if the coordinator is not known.
   */
  public Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

...

Code Block
{
  "type": "data",
  "name": "ShareGroupMetadataKey",
  "validVersions": "11",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "311",
      "about": "The group id." }
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsMetadataTopicPartitionsInfo",
      "about": "The topics with initialized share-group state." },
    { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicMetadataTopicInfo",
      "about": "The topics whose share-group state is being deleted." }
  ],
  "commonStructs": [
    { "name": "TopicPartitionsMetadataTopicPartitionsInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "TopicMetadataTopicInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." }
    ]}
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareUpdateValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The leader epoch of the share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

...