Child pages
  • KIP-222 - Add Consumer Group operations to Admin API

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options);
 
	public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
    	return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
	}
 
    public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

    public ListConsumerGroupsResult listConsumerGroups() {
        return listConsumerGroups(new ListConsumerGroupsOptions());
    }

    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupsOptions options);
 
    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
        return listConsumerGroupOffsets(new ListConsumerGroupOffsetsOptions());
    }
 
    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
 
    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
        return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
    }
}
 
public class DescribeConsumerGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions> {
}
 
public class DescribeConsumerGroupResult {
    private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures;
...
}
 
public class ConsumerGroupDescription {
    private final String groupId;
    private final boolean isSimpleConsumerGroup;
    private final List<MemberDescription> members;
    private final String partitionAssignor;
}
 
public class MemberDescription {
    private final String consumerId;
    private final String clientId;
    private final String host;
    private final MemberAssignment assignment;
}
 
public class MemberAssignment {
    private final List<TopicPartition> topicPartitions;
}

public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}

public class ListConsumerGroupsResult {
    final KafkaFuture<Map<String, GroupListing>> future;

    //...
}
 
public class ConsumerGroupListing {
    private final boolean isSimpleConsumerGroup;
}
 
public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}
 
public class ListConsumerGroupOffsetsResult {
    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
 
    //...
}
 
public class DeleteConsumerGroupOptions extends AbstractOptions<DeleteConsumerGroupOptions> {
}
 
public class DeleteConsumerGroupsResult {
    final Map<String, KafkaFuture<Void>> future;
}
 

This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

...

  1. Add `#describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API
  2. Add `#listConsumerGroups(ListConsumerGroupsOptions options)` and `#listConsumerGroups()` to `AdminClient` API 
  3. Add `#listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)` and `#listConsumerGroupOffsets(String groupId)` to `AdminClient` API and
  4. Implement it on `KafkaAdminClient`.
  5. Solve 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-6058
  6. Add `#deleteConsumerGroups(Collection<String> groupIds, ConsumerGroupsOptions options)` and `#deleteConsumerGroups(Collection<String> groupIds)` to `AdminClient` API once based on KIP-229: DeleteGroups API is merged.

Compatibility, Deprecation, and Migration Plan

...