Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeGroupsResultDescribeConsumerGroupsResult describeGroupsdescribeConsumerGroups(Collection<String> groupIds, DescribeGroupsOptions DescribeConsumerGroupsOptions options);
 
	public DescribeGroupsResultDescribeConsumerGroupsResult describeGroupsdescribeConsumerGroups(Collection<String> groupIds) {
    	return describeGroupsdescribeConsumerGroups(groupIds, new DescribeGroupsOptionsDescribeConsumerGroupsOptions());
	}
 
    public abstract ListGroupsResultListConsumerGroupsResult listGroupslistConsumerGroups(ListGroupsOptionsListConsumerGroupsOptions options);

    public ListGroupsResultListConsumerGroupsResult listGroupslistConsumerGroups() {
        return listGroups(new ListGroupsOptions());
    }

    public abstract ListGroupsResult listConsumerGroups(ListGroupsOptions options); //filtering groups by ConsumerProtocol.PROTOCOL_TYPE
    

    public ListGroupsResult listConsumerGroups() {
        return listGroups(new ListGroupsOptionsnew ListConsumerGroupsOptions());
    }
    
    public ListGroupOffsetsResultListConsumerGroupOffsetsResult listGroupOffsetslistConsumerGroupOffsets(String groupId, ListGroupsOptionsListConsumerGroupsOptions options);
 
    public ListGroupOffsetsResultListConsumerGroupOffsetsResult listGroupOffsetslistConsumerGroupOffsets(String groupId) {
        return listGroupOffsetslistConsumerGroupOffsets(new ListGroupOffsetsOptionsListConsumerGroupOffsetsOptions());
    }
}
 
public class DescribeGroupOptionsDescribeConsumerGroupOptions extends AbstractOptions<DescribeGroupOptions>AbstractOptions<DescribeConsumerGroupOptions> {
}
 
public class DescribeGroupResultDescribeConsumerGroupResult {
    private final Map<String, KafkaFuture<GroupDescription>>KafkaFuture<ConsumerGroupDescription>> futures;
...
}
 
public class GroupDescriptionConsumerGroupDescription {
    private final String groupId;
    private final Stringboolean protocolTypeisSimpleConsumerGroup;
    private final List<MemberDescription> members;
    private final String partitionAssignor;
}
 
public class MemberDescription {
    private final String consumerId;
    private final String clientId;
    private final String host;
    private final MemberAssignment assignment;
}
 
public class MemberAssignment {
    private final List<TopicPartition> assignmenttopicPartitions;
}

public class ListGroupsOptionsListConsumerGroupsOptions extends AbstractOptions<ListGroupsOptions>AbstractOptions<ListConsumerGroupsOptions> {
}

public class ListGroupsResultListConsumerGroupsResult {
    final KafkaFuture<Map<String, GroupListing>> future;

    //...
}
 
public class GroupListingConsumerGroupListing {
    private final Stringboolean nameisSimpleConsumerGroup;
    private final String protocolType;
}
 
public class ListGroupOffsetsOptionsListConsumerGroupOffsetsOptions extends AbstractOptions<ListGroupsOptions>AbstractOptions<ListConsumerGroupsOptions> {
}
 
public class ListGroupOffsetsResultListConsumerGroupOffsetsResult {
    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
 
    //...
}
 

This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

Proposed Changes

...

  1. Add `#describeConsumerGroups(Collection<String> groupIds, DescribeGroupsOptions DescribeConsumerGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API
  2. Add `#listGroups`#listConsumerGroups(ListGroupsOptions ListConsumerGroupsOptions options)` and `#listGroups`#listConsumerGroups()` to `AdminClient` API 
  3. Add `#listGroupOffsets`#listConsumerGroupOffsets(String groupId, ListGroupOffsetsOptions ListConsumerGroupOffsetsOptions options)` and `#listGroupOffsets`#listConsumerGroupOffsets(String groupId)` to `AdminClient` API and
  4. Implement it on `KafkaAdminClient`.
  5. Solve 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-6058
  6. Add `#deleteConsumerGroups(Collection<String> groupIds, ConsumerGroupsOptions options)` and `#deleteConsumerGroups(Collection<String> groupIds)` to `AdminClient` API once KIP-229: DeleteGroups API is merged.

Compatibility, Deprecation, and Migration Plan

  • There won't be any impact on existing users.
  • There won't be any change of current behavior.
  • No migration tool required.

Rejected Alternatives

  • We are moving one more functionality from `core`'s `AdminClient` API to `clients`'s `AdminClient` API. This is aim to remove dependencies to `core` module.
  • Aggregating both Consumer Groups and Connect Groups in the same API would increase complexity adding generic MemberAssignment to match both groups. This KIP is only including Consumer Groups and will leave Connect Groups to be handled by another KIP. https://github.com/apache/kafka/pull/4454#issuecomment-360553277 

Future Work

  • Streams Resetter Tool will be remove it's dependency to `core` module for `KafkaAdminClient`.

References:

Pull Requesthttps://github.com/apache/kafka/pull/4454