Child pages
  • KIP-222 - Add Consumer Group operations to Admin API

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

 

Status

Current state"Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is aim to add support for describing consumer groups and list consumer groups to `KafkaAdminClient` class. This functionality is required on the Streams Resetter Tool as describe here: 

JIRA
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5965

Public Interfaces

API


Add the following API in AdminClient:

Code Block
languagejava
public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeConsumerGroupsResultDescribeGroupsResult describeConsumerGroupsdescribeGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions DescribeGroupsOptions options);
 
	public DescribeConsumerGroupsResultDescribeGroupsResult describeConsumerGroupsdescribeGroups(Collection<String> groupIds) {
    	return describeConsumerGroupsdescribeGroups(groupIds, new DescribeConsumerGroupsOptionsDescribeGroupsOptions());
	}
 
    public abstract ListConsumerGroupsResultListGroupsResult listConsumerGroupslistGroups(ListConsumerGroupsOptionsListGroupsOptions options);

    public ListConsumerGroupsResultListGroupsResult listConsumerGroupslistGroups() {
        return listConsumerGroupslistGroups(new ListConsumerGroupsOptionsListGroupsOptions());
    }

    public ListGroupsResult listGroups(ListGroupsOptions options) {
        //filtering groups by ConsumerProtocol.PROTOCOL_TYPE
    }

    public class DescribeConsumerGroupOptionsListGroupsResult listGroups() {
        return listGroups(new ListGroupsOptions());
    }
}
 
public class DescribeGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions>AbstractOptions<DescribeGroupOptions> {
}
 
public class DescribeConsumerGroupResultDescribeGroupResult {
    private final Map<String, KafkaFuture<ConsumerGroupDescription>>KafkaFuture<GroupDescription>> futures;
    public DescribeConsumerGroupsResultDescribeGroupsResult(Map<String, KafkaFuture<ConsumerGroupDescription>>KafkaFuture<GroupDescription>> futures) {
        this.futures = futures;
    }

    public Map<String, KafkaFuture<ConsumerGroupDescription>>KafkaFuture<GroupDescription>> values() {
        return futures;
    }

    public KafkaFuture<Map<String, ConsumerGroupDescription>>GroupDescription>> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
            thenApply(new KafkaFuture.Function<Void, Map<String, ConsumerGroupDescription>>GroupDescription>>() {
                @Override
                public Map<String, ConsumerGroupDescription>GroupDescription> apply(Void v) {
                    Map<String, ConsumerGroupDescription>GroupDescription> descriptions = new HashMap<>(futures.size());
                    for (Map.Entry<String, KafkaFuture<ConsumerGroupDescription>>KafkaFuture<GroupDescription>> entry : futures.entrySet()) {
                        try {
                            descriptions.put(entry.getKey(), entry.getValue().get());
                        } catch (InterruptedException | ExecutionException e) {
                            // This should be unreachable, because allOf ensured that all the futures
                            // completed successfully.
                            throw new RuntimeException(e);
                        }
                    }
                    return descriptions;
                }
            });
    }
}
 
public class ConsumerGroupDescriptionGroupDescription {
    private final String groupId;
    private final String protocolType;
    private final List<ConsumerDescription> consumers;
}
 
public class ConsumerDescriptionMemberDescription {
    private final String consumerId;
    private final String clientId;
    private final String host;
    private final List<TopicPartition> assignment;
}
 
public class ListConsumerGroupsOptionsListGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions>AbstractOptions<ListGroupsOptions> {
}

public class ListConsumerGroupsResultListGroupsResult {
    final KafkaFuture<Map<String, ConsumerGroupListing>>GroupListing>> future;

    ListConsumerGroupsResultListGroupsResult(KafkaFuture<Map<String, ConsumerGroupListing>>GroupListing>> future) {
        this.future = future;
    }

    /**
    * Return a future which yields a map of topic names to TopicListing objects.
    */
    public KafkaFuture<Map<String, ConsumerGroupListing>>GroupListing>> namesToListings() {
        return future;
    }

    public /**
     * Return a future which yields a collection of TopicListing objects.
     */
    public KafkaFuture<Collection<ConsumerGroupListing>> KafkaFuture<Collection<GroupListing>> listings() {
        return future.thenApply(new KafkaFuture.Function<Map<String, ConsumerGroupListing>GroupListing>, Collection<ConsumerGroupListing>>Collection<GroupListing>>() {
            @Override
            public Collection<ConsumerGroupListing>Collection<GroupListing> apply(Map<String, ConsumerGroupListing>GroupListing> namesToDescriptions) {
                return namesToDescriptions.values();
            }
        });
    }

    /**
    * Return a future which yields a collection of topic names.
    */
    public KafkaFuture<Set<String>> names() {
        return future.thenApply(new KafkaFuture.Function<Map<String, ConsumerGroupListing>GroupListing>, Set<String>>() {
            @Override
            public Set<String> apply(Map<String, ConsumerGroupListing>GroupListing> namesToListings) {
                return namesToListings.keySet();
            }
        });
    }
}
 
public class ConsumerGroupListingGroupListing {
    private final String name;
    private final String protocolType;
}
 

This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

Proposed Changes

  1. Add `#describeConsumerGroups`#describeGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions DescribeGroupsOptions options)` and `#describeConsumerGroups`#describeGroups(Collection<String> groupIds)` to `AdminClient` API 
  2. Add `#listConsumerGroups`#listGroups(ListConsumerGroupsOptions ListGroupsOptions options)` and `#listConsumerGroups`#listGroups()` to `AdminClient` API and 
  3. Implement it on `KafkaAdminClient`.
  4. Solve 
    JIRA
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-6058

Compatibility, Deprecation, and Migration Plan

  • There won't be any impact on existing users.
  • There won't be any change of current behavior.
  • No migration tool required.

Rejected Alternatives

We are moving one more functionality from `core`'s `AdminClient` API to `clients`'s `AdminClient` API. This is aim to remove dependencies to `core` module.

Future Work

  • Streams Resetter Tool will be remove it's dependency to `core` module for `KafkaAdminClient`.

...