Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Align with KIP-1043

...

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets)Alter offset information for a share group.
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics)Delete offset information for a set of topics in a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of topics in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds)Delete share groups from the cluster.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds)Describe some share groups in the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs)List the share group offsets available in the cluster for the specified share groups.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster for the specified share groups.
ListShareGroupsResult listShareGroups()List the share groups available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.

The equivalence between the consumer group and share group interfaces is clear. There are some differences:

...

Code Block
languagejava
    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition.
     * @return The AlterShareGroupOffsetsResult.
     */
    default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
        return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
    }

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterShareGroupOffsetsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options);

    /**
     * Delete offsets for a set of topics in a share group with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to delete offsets.
     * @param topics The topics.
     * @return The DeleteShareGroupOffsetsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics) {
        return deleteShareGroupOffsets(groupId, topics, new DeleteShareGroupOffsetsOptions());
    }

    /**
     * Delete offsets for a set of topics in a share group.
     *
     * @param groupId The group for which to delete offsets.
     * @param topics The topics.
     * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<String> topics,
        DeleteShareGroupOffsetsOptions options);

    /**
     * Delete share groups from the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupIds The IDs of the groups to delete.
     * @return The DeleteShareGroupsResult.
     */
    default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
    }

    /**
     * Delete share groups from the cluster.
     *
     * @param groupIds The IDs of the groups to delete.
     * @param options The options to use when deleting a share group.
     * @return The DeleteShareGroupsResult.
     */
    DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);

    /**
     * Describe some share groups in the cluster, with the default options.
     *
     * <p>This is a convenience method for {@link #describeShareGroups(Collection, DescribeShareGroupsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.
     * @return The DescribeShareGroupsResult.
     */
    default DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds) {
        return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
    }

    /**
     * Describe some share groups in the cluster.
     *
     * @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * @return The DescribeShareGroupsResult.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
                                                 DescribeShareGroupsOptions options);

    /**
     * List the share group offsets available in the cluster for the specified share groups with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
     * to list offsets of all partitions for the specified share groups with default options.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListShareGroupOffsetsResult
     */
    default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
        return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
    }

    /**
     * List the share group offsets available in the cluster for the specified share groups.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @param options The options to use when listing the share group offsets.
     * @return The ListShareGroupOffsetsResult
     */
    ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

    

AlterShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;

/**
 * The result  * Listof the share groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListShareGroupsResult{@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
    /**
     * Return a future which succeeds if all the alter offsets succeed.
     */
    defaultpublic ListShareGroupsResultKafkaFuture<Void> listShareGroupsall() {
        return listShareGroups(new ListShareGroupsOptions());
    }

     /**
     * List the share groups available in the cluster.
     *Return a future which can be used to check the result for a given partition.
     */
 @param options The optionspublic toKafkaFuture<Void> usepartitionResult(final whenTopicPartition listing the share groups.partition) {
     * @return The ListShareGroupsResult.
     */
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);

...

}
}

AlterShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {
}

DeleteShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#alterShareGroupOffsetsAdmin#deleteShareGroupOffsets(String groupId, Map<TopicPartitionSet<String>, Long>), AlterShareGroupOffsetsOptionsDeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResultDeleteShareGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the alterdeletions offsets succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given partitiontopic.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartitionString partitiontopic) {
    }
}

...

DeleteShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsetsAdmin#deleteShareGroupOffsets(String groupId, Map<TopicPartitionSet<String>, Long>), AlterShareGroupOffsetsOptionsDeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptionsDeleteShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions>AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

...

DeleteShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsetsAdmin#deleteShareGroups(StringCollection<String>, Set<String>, DeleteShareGroupOffsetsOptionsDeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResultDeleteShareGroupsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future map from group id to futures which can be used to check the resultstatus forof aindividual given topicdeletions.
     */
    public Map<String, KafkaFuture<Void>KafkaFuture<Void>> partitionResultdeletedGroups(final String topic) {
    }
}

...

DeleteShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroupOffsetsAdmin#deleteShareGroups(StringCollection<String>, Set<String>, DeleteShareGroupOffsetsOptionsDeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptionsDeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions>AbstractOptions<DeleteShareGroupsOptions> {
}

...

DescribeShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupsAdmin#describeShareGroups(Collection<String>, DeleteShareGroupsOptionsDescribeShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsResultDescribeShareGroupsResult {
    /**
     * Return a future which yields succeeds onlyall ShareGroupDescription objects, if all the deletionsdescribes succeed.
     */
    public KafkaFuture<Map<String, KafkaFuture<Void>ShareGroupDescription>> all() {
    }

    /**
     * Return a map from group id to futures which yield cangroup bedescriptions.
 used to check the status*/
 of individual deletions.
     */
    public Map<String, KafkaFuture<Void>>KafkaFuture<ShareGroupDescription>> deletedGroupsdescribedGroups() {
    }
}

...

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupsOptions> {
}

DescribeShareGroupsResult

Code Block
languagejava
package
import org.apache.kafka.clientscommon.adminNode;
 
/**
 * The result of the {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsResult {
    /**
     * Return a future which yields all ShareGroupDescription objects, if all the describes succeed.
     */
    public KafkaFuture<Map<String, ShareGroupDescription>> all() {
    }

    /**
     * Return a map from group id to futures which yield group descriptions.
     */
    public Map<String, KafkaFuture<ShareGroupDescription>> describedGroups() {
    }
}

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * A detailed description of a single share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupDescription {
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

  /**
   * The share group state, or UNKNOWN if the state cannot be parsed.
   */
  public ShareGroupState state();

  /**
   * The group coordinator, or null if the coordinator is not known.
   */
  public Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

DescribeShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsOptions extends AbstractOptions<DescribeShareGroupsOptions> {
    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

Code Block
languagejava
packageimport org.apache.kafka.common.GroupState;
import org.apache.kafka.clientscommon.acl.adminAclOperation;
 
/**
 * A Thedetailed resultdescription of a thesingle {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} callshare group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResultShareGroupDescription {
  public  /**
     * Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
    }

    ShareGroupDescription(String groupId, Collection<MemberDescription> members, GroupState groupState, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, GroupState groupState, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

  /**
   * The *group Returnstate, aor futureUNKNOWN whichif yieldsthe astate mapcannot ofbe topicparsed.
 partitions to offsets*/
 for thepublic specified group.GroupState groupState();

  /**
   */
 The group  public KafkaFuture<Map<TopicPartitioncoordinator, Long>>or partitionsToOffset(String groupId) {
    }
}

ListShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptions extends AbstractOptions<ListShareGroupOffsetsOptions> {
}

...

null if the coordinator is not known.
   */
  public Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

DescribeShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Specification of share group offsets to list using Options for {@link Admin#listShareGroupOffsetsAdmin#describeShareGroups(Map<StringCollection<String>, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptionsDescribeShareGroupsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsSpecDescribeShareGroupsOptions {
extends  public ListShareGroupOffsetsSpec();

  /**AbstractOptions<DescribeShareGroupsOptions> {
   * Setpublic the topic partitions whose offsets are to be listed for a share group.
   */
  ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitionsDescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

   /**
 public   * Returns the topic partitions whose offsets are to be listed for a share group.
   */
  Collection<TopicPartition> topicPartitionsboolean includeAuthorizedOperations();
}

...

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#listShareGroups(ListShareGroupsOptions Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsResultListShareGroupOffsetsResult {
    /**
     * ReturnsReturn a future thatwhich yields eitherall anMap<String, exceptionMap<TopicPartition, or the full set of share group listingsLong> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Collection<ShareGroupListing>>KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
    }

      /**
     * ReturnsReturn a future which yields justa themap validof listings.
topic partitions to offsets  */
    public KafkaFuture<Collection<ShareGroupListing>> valid() {
    }
 
    /**
     * Returns a future which yields just the errors which occurredfor the specified group.
     */
    public KafkaFuture<Map<TopicPartition, KafkaFuture<Collection<Throwable>>Long>> errorspartitionsToOffset(String groupId) {
    }
}

...

ListShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * A listing of a share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupListing {
  public ShareGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState> state);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * The share group state.
   */
  public Optional<ShareGroupState> state();
}

...

Options for {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptions extends AbstractOptions<ListShareGroupOffsetsOptions> {
}

ListShareGroupOffsetsSpec

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * Options for Specification of share group offsets to list using {@link Admin#listShareGroups(ListShareGroupsOptionsAdmin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsOptions extends AbstractOptions<ListShareGroupsOptions>ListShareGroupOffsetsSpec {
  public ListShareGroupOffsetsSpec();

  /**
     * IfSet statesthe istopic set,partitions onlywhose groupsoffsets in these states willare to be returned.listed Otherwise,for alla groupsshare are returnedgroup.
     */
    public ListShareGroupsOptions inStates(Set<ShareGroupState> statesListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions);

    /**
     * ReturnReturns the listtopic ofpartitions Stateswhose thatoffsets are requestedto orbe emptylisted iffor noa states have been specifiedshare group.
     */
    public Set<ShareGroupState> statesCollection<TopicPartition> topicPartitions();
}

GroupType

Another case is added to the org.apache.kafka.common.GroupType  enum:

Enum constantDescription
SHARE("share") Share group

ShareGroupState

A new enum org.apache.kafka.common.ShareGroupState  is added:

...

DEAD 

...

EMPTY 

...

STABLE 

...

UNKNOWN 

...

Exceptions

The following new exceptions are added to the org.apache.kafka.common.errors  package corresponding to the new error codes in the Kafka protocol.

...