Status

Current state: Under Discussion

Discussion thread: here 

JIRA: here 

Motivation

KIP-966 introduced the concept of pagination for the DescribeTopicPartitionsRequest, where the caller can specify the maximum number of partitions to be included in the response. If there are more partitions than the limit, these partitions and their topics will not be sent back. In this case, the Cursor field will be populated, and the caller can include this cursor in the next request.

Similar to this behavior, this KIP proposes introducing pagination for more admin-related requests and making this pattern more widely used, especially on large clusters.

Requests that can benefit from such a limit on the number of partitions include:

  • OffsetFetchRequest: After adding support for fetching offsets for multiple groups at a time in version 8, there is a risk that OffsetFetchResponse will timeout if the list of groups has too much metadata per partition per instance.
  • DescribeLogDirsRequest: Describes log directories for multiple topics. This is used by the admin client to describe all topics on a broker. There is a risk that DescribeLogDirsResponse could time out based on the number of partitions on the broker for such an operation.
  • ListPartitionReassignmentsRequest: Similar to DescribeTopicPartitionsRequest, this request can benefit from pagination based on topic partitions.
  • ListOffsetsRequest: This request fetches offsets for a given set of topic partitions. On Admin API, this can be called by a large number of topic partitions, causing this to hit timeout.
  • MetadataRequest: Is used during list topics by Admin API. On large clusters, the number of topics can take a long time for the operator to list.

There are also other requests that could benefit from setting a maximum on other fields, such as:

  • DescribeProducersRequest: Describes active producer states on a set of topic partitions. This request can become overwhelming if there are too many short-lived idempotent producers on these partitions. Therefore, adding pagination based on producerId will be beneficial when performing such admin operations. Note that we can limit based on partitions, but the factor that usually increases the size of the response is the number of short-lived PIDs.
  • ListTransactionsRequest: Lists active transactions for a given set of producer IDs. This can become unmanageable if the producer has too many ongoing transactions. Similarly, adding pagination based on transaction ID might make this operation less chaotic when dealing with misbehaving producers.
  • DescribeTransactionsRequest: Similar to ListTransactionsRequest, despite the fact this is controlled by a given list, it still can hit timeout if the list is big enough or if the transaction includes a large number of partitions.
  • ListGroupsRequest: Lists all consumer groups on a cluster that match a given state or group type. This can become unmanageable in clusters, especially since Kafka doesn't limit the maximum number of registered consumer groups on a cluster. Similarly, adding pagination based on group ID might make this operation less chaotic when dealing with large clusters.
  • DescribeGroupsRequest and ConsumerGroupDescribeRequest: Both fetch details of given consumer groups. While this can be controlled by the number of consumer groups, the number of members in each consumer group can get out of hand. So controlling this will ease the load of describing groups on large clusters.
  • DescribeAclsRequest: Fetches ACL details for a given filter. Depending on the filter, the number of resources in response can get quite large.
  • DescribeConfigsRequest: Similar to DescribeAclsRequest, this request can hit timeout if the number of configs defined for the given resources in the request is large.

Note:

  • Similar to DescribeTopicPartitionsReques t in KIP-966, which uses the server-side configuration max.request.partition.size.limit  to control the maximum number of partitions to return, this KIP introduces max.request.pagination.size.limit as a more generic config and it will be used exactly as KIP-966 which as hard limit. This config also will be added to Admin Client config.
  • Producer States and transaction in the responses will be ordered descending based on ID (assuming here that the highest PID is the newest). Transaction IDs belonging to one PID will be ordered alphabetically.
  • Member ids belonging to one Group ID will be ordered alphabetically.

Public Interfaces

  • Each request will add a new optional field, ResponsePaginationLimit, and Cursor to decide the position of the cursor for the response.
  • The broker will have max.request.pagination.size.limit to control the maximum number of pagination items to return.  Similar  to  KIP-966  this  value  will  be  serving  as  hard limit  to  prevent  misconfigured  requests.
    • max.request.partition.size.limit  will point to `max.request.pagination.size.limit` by default unless it is specified to other wise and will need to be deprecated in favour of `max.request.pagination.size.limit` at some point.
    • New version of Admin Client will have new configuration `max.request.pagination.size.limit` as well however if it can't exceed the value of  max.request.pagination.size.limit on broker side. 
  • Each response will add a new optional field, NextCursor, which holds the details for the cursor's position for the next request.
  • There is no consistency guarantee between requests, similar to KIP-966.
  • Returned resources in any response respect the authorisation level for the client.

OffsetFetchRequest

{  
  "apiKey": 9,  
  "type": "request",  
  "listeners": ["zkBroker", "broker"],  
  "name": "OffsetFetchRequest",  
  "validVersions": "0-<next-version>",  
  "deprecatedVersions": "0",  
  "flexibleVersions": "6+",  
  "fields": [  
    { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId", "about": "The group to fetch offsets for." },  
    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7", "fields": [ ... ]},  
    { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+", "about": "Each group we would like to fetch offsets for", "fields": [ ... ]},  
    { "name": "RequireStable", "type": "bool", "versions": "7+", "default": "false"},  
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000", "about": "The maximum number of partitions included in the response." },  
+   { "name": "Cursor", "ignorable": true, "type": "OffsetFetchCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null", "about": "The first topic and partition index to fetch details for.", "fields": [ 
+     { "name": "GroupId", "type": "string", "versions": "<next-version>+", "about": "The group ID to start with"}  
+     { "name": "TopicName", "type": "string", "versions": "<next-version>+", 
+        "about": "The name for the first topic for given group id to process", "entityType": "topicName"},  
+     { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"}  
+  ]}  
  ]  
}

OffsetFetchResponse

{  
  "apiKey": 9,  
  "type": "response",  
  "name": "OffsetFetchResponse",  
  "validVersions": "0-<next-version>",  
  "flexibleVersions": "6+",  
  "fields": [  
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true},  
    { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7", "about": "The responses per topic.", "fields": [ ... ]},  
    { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true},  
    { "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+", "about": "The responses per group id.", "fields": [ ... ]},  
+   { "name": "NextCursor", "type": "OffsetFetchCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+     "about": "The next topic and partition index to fetch details for.", "fields": [  
+        { "name": "GroupId", "type": "string", "versions": "<next-version>+", "about": "The group ID to start with"}  
+        { "name": "TopicName", "type": "string", "versions": "<next-version>+", 
+           "about": "The name for the first topic to process", "entityType": "topicName"},  
+        { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"}   
+    ]},
   ]  
}

DescribeLogDirsRequest

{  
  "apiKey": 35,  
  "type": "request",  
  "listeners": ["zkBroker", "broker"],  
  "name": "DescribeLogDirsRequest",  
  "validVersions": "0-<next-version>",  
  "deprecatedVersions": "0",  
  "flexibleVersions": "2+",  
  "fields": [  
    { "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+", "about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [ ... ]},  
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of partitions included in the response." },  
+   { "name": "Cursor", "ignorable": true, "type": "Cursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+     "about": "The first topic, partition index and logDir to fetch details for.", "fields": [  
+     { "name": "TopicName", "type": "string", "versions": "<next-version>+",  
+.        "about": "The name for the first topic to process", "entityType": "topicName"},  
+     { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"},
+     { "name: "LogDir", "type": "string", "versions": "<next-version>+", "about": "The absolute log directory path for the topic partition to start with."}
+  ]}  
  ]  
}

DescribeLogDirsResponse

{  
  "apiKey": 35,  
  "type": "response",  
  "name": "DescribeLogDirsResponse",  
  "validVersions": "0-<next-version>",  
  "flexibleVersions": "2+",  
  "fields": [  
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+"},  
    { "name": "ErrorCode", "type": "int16", "versions": "3+", "ignorable": true, "about": "The error code, or 0 if there was no error." },  
    { "name": "Results", "type": "[]DescribeLogDirsResult", "versions": "0+", "about": "The log directories.", "fields": [ ... ]},  
      { "name": "TotalBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1" },  
      { "name": "UsableBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1" }
    }, 
+   { "name": "NextCursor", "type": "Cursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+      "about": "The next topic, partition index and logDir to fetch details for.", "fields": [  
+        { "name": "TopicName", "type": "string", "versions": "<next-version>+", 
+           "about": "The name for the first topic to process", "entityType": "topicName"},  
+        { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"},
+        { "name: "LogDir", "type": "string", "versions": "<next-version>+", "about": "The absolute log directory path for the topic partition to start with."}
+    ]},
  ]  
}

ListPartitionReassignmentsRequest

{
  "apiKey": 46,
  "type": "request",
  "listeners": ["broker", "controller", "zkBroker"],
  "name": "ListPartitionReassignmentsRequest",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "The time in ms to wait for the request to complete." },
    { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The topics to list partition reassignments for, or null to list everything.", "fields": [ ... ]},
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of partitions included in the response." },  
+   { "name": "Cursor", "ignorable": true, "type": "Cursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+     "about": "The first topic and partition index to fetch details for.", "fields": [  
+     { "name": "TopicName", "type": "string", "versions": "<next-version>+",  "about": "The name for the first topic to process", "entityType": "topicName"},  
+     { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"}  
+  ]}  
  ]
}

ListPartitionReassignmentsResponse

{
  "apiKey": 46,
  "type": "response",
  "name": "ListPartitionReassignmentsResponse",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+"},
    { "name": "Topics", "type": "[]OngoingTopicReassignment", "versions": "0+", "about": "The ongoing reassignments for each topic.", "fields": [ ... ]},
+   { "name": "NextCursor", "type": "Cursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+       "about": "The next topic and partition index to fetch details for.", "fields": [  
+      { "name": "TopicName", "type": "string", "versions": "<next-version>+", "about": "The name for the first topic to process", "entityType": "topicName"},  
+      { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"},
+   ]}
  ]
}

ListOffsetsRequest

{
  "apiKey": 2,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListOffsetsRequest",
  "validVersions": "0-<next-version>",
  "deprecatedVersions": "0",
  "flexibleVersions": "6+",
  "latestVersionUnstable": true,
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
    { "name": "IsolationLevel", "type": "int8", "versions": "2+" },
    { "name": "Topics", "type": "[]ListOffsetsTopic", "versions": "0+", "about": "Each topic in the request.", "fields": [ ... ]},
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of partitions included in the response." },  
+   { "name": "Cursor", "ignorable": true, "type": "Cursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+     "about": "The first topic and partition index to fetch details for.", "fields": [  
+     { "name": "TopicName", "type": "string", "versions": "<next-version>+",  "about": "The name for the first topic to process", "entityType": "topicName"},  
+     { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"}  
+  ]}  
  ]
}

ListOffsetsResponse

{
  "apiKey": 2,
  "type": "response",
  "name": "ListOffsetsResponse",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true },
    { "name": "Topics", "type": "[]ListOffsetsTopicResponse", "versions": "0+", "about": "Each topic in the response.", "fields": [...]},
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of partitions included in the response." },  
+   { "name": "NextCursor", "ignorable": true, "type": "Cursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+     "about": "The first topic and partition index to fetch details for.", "fields": [  
+     { "name": "TopicName", "type": "string", "versions": "<next-version>+",  "about": "The name for the first topic to process", "entityType": "topicName"},  
+     { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"},
+  ]}   
  ]
}

MetadataRequest

{
  "apiKey": 3,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "MetadataRequest",
  "validVersions": "0-<next-version>",
  "deprecatedVersions": "0-3",
  "flexibleVersions": "9+",
  "fields": [
    { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
      "about": "The topics to fetch metadata for.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "nullableVersions": "10+",
        "about": "The topic name." }
    ]},
    ...
 +   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000", "about": "The maximum number of partitions included in the response." },  
+   { "name": "Cursor", "ignorable": true, "type": "Cursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null", "about": "The first topic and partition index to fetch details for.", "fields": [ 
+     { "name": "TopicName", "type": "string", "versions": "<next-version>+", "about": "The name for the first topic to process", "entityType": "topicName"},  
+     { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"}  
+  ]}  
  ]
}

MetadataResponse

{
  "apiKey": 3,
  "type": "response",
  "name": "MetadataResponse",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "9+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true},
    { "name": "Brokers", "type": "[]MetadataResponseBroker", "versions": "0+", "about": "A list of brokers present in the cluster.", "fields": [
  		...
    ]},
    { "name": "ClusterId", "type": "string", "nullableVersions": "2+", "versions": "2+", "ignorable": true, "default": "null", },
    { "name": "ControllerId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, "entityType": "brokerId"},
    { "name": "Topics", "type": "[]MetadataResponseTopic", "versions": "0+", "about": "Each topic in the response.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+"},
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "12+"},
      { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true},
      { "name": "IsInternal", "type": "bool", "versions": "1+", "default": "false", "ignorable": true},
      { "name": "Partitions", "type": "[]MetadataResponsePartition", "versions": "0+", "about": "Each partition in the topic.", "fields": [ ... ]},
      { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648"}
    ]},
    { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648",
      "about": "32-bit bitfield to represent authorized operations for this cluster." },
+   { "name": "NextCursor", "type": "Cursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+        "about": "The next topic and partition index to fetch details for.", "fields": [  
+           { "name": "TopicName", "type": "string", "versions": "<next-version>+", "about": "The name for the first topic to process", "entityType": "topicName"},  
+           { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"},
+     ]}
  ]
}

DescribeProducersRequest

{  
  "apiKey": 61,  
  "type": "request",  
  "listeners": ["zkBroker", "broker"],  
  "name": "DescribeProducersRequest",  
  "validVersions": "0-<next-version>",  
  "flexibleVersions": "0+",  
  "fields": [  
    { "name": "Topics", "type": "[]TopicRequest", "versions": "0+", "fields": [ ... ]},  
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of producer id included in the response." },  
+   { "name": "Cursor", "ignorable": true, "type": "ProducerStateCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+     "about": "The first PID, topic and partition index to fetch details for.", "fields": [  
+     { "name": "TopicName", "type": "string", "versions": "<next-version>+", "about": "The name for the first topic to process", "entityType": "topicName"},  
+     { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"},  
+     { "name": "ProducerId", "type": "int64", "versions": "<next-version>+", "about": "The producer id to start with"}
+   ]}
  ]  
}

DescribeProducersResponse

{  
  "apiKey": 61,  
  "type": "response",  
  "name": "DescribeProducersResponse",  
  "validVersions": "0-<next-version>",  
  "flexibleVersions": "0+",  
  "fields": [  
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },  
    { "name": "Topics", "type": "[]TopicResponse", "versions": "0+", "about": "Each topic in the response.", "fields": [ ... ]},  
+    { "name": "NextCursor", "type": "ProducerStateCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default":  "null",  
+        "about": "The next producerId index to fetch details for.", "fields": [  
+       { "name": "ProducerId", "type": "int64", "versions": "<next-version>+", "about": "The producer id to start with"},  
+       { "name": "TopicName", "type": "string", "versions": "<next-version>+", "about": "The name for the first topic to process", "entityType": "topicName"},  
+       { "name": "PartitionIndex", "type": "int32", "versions": "<next-version>+", "about": "The partition index to start with"},  
+   ]}  
  ]  
}

ListTransactionsRequest

{  
  "apiKey": 66,  
  "type": "request",  
  "listeners": ["zkBroker", "broker"],  
  "name": "ListTransactionsRequest",  
  "validVersions": "0-<next-version>",  
  "flexibleVersions": "0+",  
  "fields": [  
    { "name": "StateFilters", "type": "[]string", "versions": "0+" },  
    { "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+", "entityType": "producerId"},  
    { "name": "DurationFilter", "type": "int64", "versions": "1+", "default": -1},  
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of transactions included in the response." },
+   { "name": "Cursor", "ignorable": true, "type": "TransactionalCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+     "about": "The first transaction id and producer id to fetch details for.", "fields": [  
+     { "name": "TransactionalId", "type": "string", "versions": "<next-version>+", "about": "The id for the first transaction to process"},  
+     { "name": "ProducerId", "type": "string", "versions": "<next-version>+", "about": "The id for the first transaction to process"}  
+  ]}  
 ]  
}

ListTransactionsResponse

{  
  "apiKey": 66,  
  "type": "response",  
  "name": "ListTransactionsResponse",  
  "validVersions": "0-1",  
  "flexibleVersions": "0+",  
  "fields": [  
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+"},  
      { "name": "ErrorCode", "type": "int16", "versions": "0+" },  
      { "name": "UnknownStateFilters", "type": "[]string", "versions": "0+",  
        "about": "Set of state filters provided in the request which were unknown to the transaction coordinator" },  
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ ... ]},   
+     { "name": "NextCursor", "type": "TransactionalCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null", "about": "The next transaction id and producer id to fetch details for.", "fields": [  
+        { "name": "TransactionalId", "type": "string", "versions": "<next-version>+", "about": "The id for transaction to start with"},  
+        { "name": "ProducerId", "type": "int64", "versions": "<next-version>+", "about": "The producer id for the next transaction to start with"},
+     ]}  
  ]  
}

DescribeTransactionsRequest

{
  "apiKey": 65,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "DescribeTransactionsRequest",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TransactionalIds", "entityType": "transactionalId", "type": "[]string", "versions": "0+" }
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of transactions included in the response." },
+   { "name": "Cursor", "ignorable": true, "type": "TransactionalCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null",  
+     "about": "The first transaction id and producer id to fetch details for.", "fields": [  
+     { "name": "TransactionalId", "type": "string", "versions": "<next-version>+", "about": "The id for the first transaction to process"},  
+     { "name": "ProducerId", "type": "string", "versions": "<next-version>+", "about": "The id for the first transaction to process"},
+  ]}   
  ]
}

DescribeTransactionsResponse

{
  "apiKey": 65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+"},
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        ....
        { "name": "Topics", "type": "[]TopicData", "versions": "0+",
          "about": "The set of partitions included in the current transaction (if active). When a transaction is preparing to commit or abort, this will include only partitions which do not have markers.",
          "fields": [
            { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true },
            { "name": "Partitions", "type": "[]int32", "versions": "0+" }
          ]
        }
      ]},
+     { "name": "NextCursor", "type": "TransactionalCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null", "about": "The next transaction id and producer id to fetch details for.", "fields": [  
+        { "name": "TransactionalId", "type": "string", "versions": "<next-version>+", "about": "The id for transaction to start with"},  
+        { "name": "ProducerId", "type": "int64", "versions": "<next-version>+", "about": "The producer id for the next transaction to start with"},
+     ]}  
  ]
}

ListGroupsRequest

{
  "apiKey": 16,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequest",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "4+",
      "about": "The states of the groups we want to list. If empty, all groups are returned with their state." },
    { "name": "TypesFilter", "type": "[]string", "versions": "5+",
      "about": "The types of the groups we want to list. If empty, all groups are returned with their type." },
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of group id included in the response." },  
+   { "name": "Cursor", "type": "GroupCursor", "versions": "<next-version>+", "nullableVersions": "0+", "default": "null",
+      "about": "The group id index to fetch details for.", "fields": [
+      { "name": "GroupId", "type": "string", "versions": "<next-version>+", "about": "The group id for the first group to process"}
+    ]}
  ]
}

ListGroupsResponse

{
  "apiKey": 16,
  "type": "response",
  "name": "ListGroupsResponse",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." },
    { "name": "Groups", "type": "[]ListedGroup", "versions": "0+", "about": "Each group in the response.", "fields": [ ... ]},
+   { "name": "NextCursor", "type": "GroupCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null", "about": "The next group id to fetch details for.", "fields": [
+     { "name": "GroupId", "type": "string", "versions": "<next-version>+", "about": "The group id for the first group to process"},
+   ]}
  ]
}

DescribeGroupsRequest

{
  "apiKey": 15,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "DescribeGroupsRequest",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "5+",
  "fields": [
    { "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The names of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+", "about": "Whether to include authorized operations." },
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of members of group included in the response." },  
+   { "name": "Cursor", "type": "GroupMemeberCursor", "versions": "<next-version>+", "nullableVersions": "0+", "default": "null",
+      "about": "The first member and group id index to fetch details for.", "fields": [
+      { "name": "GroupId", "type": "string", "versions": "<next-version>+", "about": "The group id for the first group to process"},
+      { "name": "MemberId", "type": "string", "versions": "<next-version>+", "about": "The member id for the first group to process"}
+   ]}
  ]
}

DescribeGroupsResponse 

{
  "apiKey": 15,
  "type": "response",
  "name": "DescribeGroupsResponse",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "5+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", "about": "Each described group.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+"},
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group ID string." },
      ...
      { "name": "Members", "type": "[]DescribedGroupMember", "versions": "0+", "about": "The group members.", "fields": [...]},
      { "name": "AuthorizedOperations", "type": "int32", "versions": "3+",  "default": "-2147483648" }
    ]},
+   { "name": "NextCursor", "type": "GroupMemeberCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null", "about": "The next member and group id to fetch details for.", "fields": [
+     { "name": "GroupId", "type": "string", "versions": "<next-version>+", "about": "The group id for the first group to process"},
+     { "name": "MemberId", "type": "string", "versions": "<next-version>+", "about": "The member id for the first group to process"}
+   ]}
  ]
}

ConsumerGroupDescribeRequest

{
  "apiKey": 69,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupDescribeRequest",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The ids of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", "about": "Whether to include authorized operations." },
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of members of group included in the response." },  
+   { "name": "Cursor", "type": "GroupMemeberCursor", "versions": "<next-version>+", "nullableVersions": "0+", "default": "null",
+      "about": "The first member and group id index to fetch details for.", "fields": [
+      { "name": "GroupId", "type": "string", "versions": "<next-version>+", "about": "The group id for the first group to process"},
+      { "name": "MemberId", "type": "string", "versions": "<next-version>+", "about": "The member id for the first group to process"}
+   ]}
  ]
}

ConsumerGroupDescribeResponse

{
  "apiKey": 69,
  "type": "response",
  "name": "ConsumerGroupDescribeResponse",
  "validVersions": "0-<next-version",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",  "about": "Each described group.", "fields": [
        { "name": "Members", "type": "[]Member", "versions": "0+", "about": "The members.",  "fields": [ ... ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648"}
      ]
    }
+   { "name": "NextCursor", "type": "GroupMemeberCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null", "about": "The next member and group id to fetch details for.", "fields": [
+     { "name": "GroupId", "type": "string", "versions": "<next-version>+", "about": "The group id for the first group to process"},
+     { "name": "MemberId", "type": "string", "versions": "<next-version>+", "about": "The member id for the first group to process"}
+   ]}
  ],
  "commonStructs": [ ...  ]
}

DescribeAclsRequest

{
  "apiKey": 29,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "DescribeAclsRequest",
  "validVersions": "0-<next-version>",
  "deprecatedVersions": "0",
  "flexibleVersions": "2+",
  "fields": [
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of resources included in the response." },
+   { "name": "Cursor", "type": "ResourceCursor", "versions": "<next-version>+", "nullableVersions": "0+", "default": "null",
+      "about": "The first acl index to fetch details for.", "fields": [
+      { "name": "ResourceName", "type": "string", "versions": "<next-version>+", "about": "The resource name for the first resource to process"},
+      { "name": "Resourcetype", "type": "string", "versions": "<next-version>+", "about": "The resource type for the first resource to process"}
+   ]}
  ]
}


DescribeAclsResponse

{
  "apiKey": 29,
  "type": "response",
  "name": "DescribeAclsResponse",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+"},
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or null if there was no error." },
    { "name": "Resources", "type": "[]DescribeAclsResource", "versions": "0+", "about": "Each Resource that is referenced in an ACL.", "fields": [
      { "name": "ResourceType", "type": "int8", "versions": "0+", "about": "The resource type." },
      { "name": "ResourceName", "type": "string", "versions": "0+", "about": "The resource name." },
      { "name": "PatternType", "type": "int8", "versions": "1+", "default": "3", "ignorable": false, "about": "The resource pattern type." },
      { "name": "Acls", "type": "[]AclDescription", "versions": "0+", "about": "The ACLs.", "fields": [
        { "name": "Principal", "type": "string", "versions": "0+", "about": "The ACL principal." },
        { "name": "Host", "type": "string", "versions": "0+", "about": "The ACL host." },
        { "name": "Operation", "type": "int8", "versions": "0+", "about": "The ACL operation." },
        { "name": "PermissionType", "type": "int8", "versions": "0+", "about": "The ACL permission type." }
      ]}
    ]},
+   { "name": "NextCursor", "type": "ResourceCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null", "about": "about": "The first resources index to fetch details for.", "fields": [
+      { "name": "ResourceName", "type": "string", "versions": "<next-version>+", "about": "The resource name for the first resource to process"},
+      { "name": "Resourcetype", "type": "string", "versions": "<next-version>+", "about": "The resource type for the first resource to process"},
+   ]}
  ]
}

DescribeConfigsRequest

{
  "apiKey": 32,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "DescribeConfigsRequest",
  "validVersions": "0-<next-version",
  "deprecatedVersions": "0",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "Resources", "type": "[]DescribeConfigsResource", "versions": "0+", "fields": [
      { "name": "ResourceType", "type": "int8", "versions": "0+", "about": "The resource type." },
      { "name": "ResourceName", "type": "string", "versions": "0+", "about": "The resource name." },
      { "name": "ConfigurationKeys", "type": "[]string", "versions": "0+", "nullableVersions": "0+"}
    ]},
    { "name": "IncludeSynonyms", "type": "bool", "versions": "1+", "default": "false", "ignorable": false},
    { "name": "IncludeDocumentation", "type": "bool", "versions": "3+", "default": "false", "ignorable": false},
+   { "name": "ResponsePaginationLimit", "ignorable": true, "type": "int32", "versions": "<next-version>+", "default": "2000",  
+     "about": "The maximum number of resources included in the response." },
+   { "name": "Cursor", "type": "ResourceCursor", "versions": "<next-version>+", "nullableVersions": "0+", "default": "null",
+      "about": "The first acl index to fetch details for.", "fields": [
+      { "name": "ResourceName", "type": "string", "versions": "<next-version>+", "about": "The resource name for the first resource to process"},
+      { "name": "Resourcetype", "type": "string", "versions": "<next-version>+", "about": "The resource type for the first resource to process"}
+   ]}
  ]
}

DescribeConfigsResponse

{
  "apiKey": 32,
  "type": "response",
  "name": "DescribeConfigsResponse",
  "validVersions": "0-<next-version>",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+"},
    { "name": "Results", "type": "[]DescribeConfigsResult", "versions": "0+", "about": "The results for each resource.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+" },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+},
      { "name": "ResourceType", "type": "int8", "versions": "0+", "about": "The resource type." },
      { "name": "ResourceName", "type": "string", "versions": "0+", "about": "The resource name." },
      { "name": "Configs", "type": "[]DescribeConfigsResourceResult", "versions": "0+", "about": "Each listed configuration.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "about": "The configuration name." },
        { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The configuration value." },
        { "name": "ReadOnly", "type": "bool", "versions": "0+", "about": "True if the configuration is read-only." },
        { "name": "IsDefault", "type": "bool", "versions": "0", "about": "True if the configuration is not set." },
        { "name": "ConfigSource", "type": "int8", "versions": "1+", "default": "-1", "ignorable": true, "about": "The configuration source." },
        { "name": "IsSensitive", "type": "bool", "versions": "0+", "about": "True if this configuration is sensitive." },
        ...
        ]},
        { "name": "ConfigType", "type": "int8", "versions": "3+", "default": "0", "ignorable": true},
        { "name": "Documentation", "type": "string", "versions": "3+", "nullableVersions": "0+", "ignorable": true }
      ]}
    ]},
+   { "name": "NextCursor", "type": "ResourceCursor", "versions": "<next-version>+", "nullableVersions": "<next-version>+", "default": "null", "about": "about": "The first resources index to fetch details for.", "fields": [
+      { "name": "ResourceName", "type": "string", "versions": "<next-version>+", "about": "The resource name for the first resource to process"},
+      { "name": "Resourcetype", "type": "string", "versions": "<next-version>+", "about": "The resource type for the first resource to process"},
+   ]}
  ]
}

Proposed Changes

Broker config

NameTypeDefaultDescription
max.request.partition.size.limitintsame as max.request.pagination.size.limit

The maximum number of partitions can be served in one request

max.request.pagination.size.limitint2000

The maximum number of pagination items defined in the cursor can be served in one request

Admin Client Config

NameTypeDefaultDescription
max.request.pagination.size.limitint2000

The maximum number of pagination items defined in the cursor can be served in one request. The value can not exceed the value of max.request.pagingation.size.limit on broker level. 

Limitations:

Similar to DescribeTopicPartitionsRequest in KIP-966, the pagination approach here can lead to inconsistencies specially in the case of metadata request as there is a chance the next page request get directed to another broker which lag in metadata. This limitation will not be addressed in this KIP at the moment.

Compatibility, Deprecation, and Migration Plan

  • The new fields are optional so any usage of these requests without setting limit will fall back to the old behaviour.
  • Any old Admin clients will keep the old behaviour without pagination and only the new admin clients will be supporting this new feature.

Rejected Alternatives

  1. Most of these requests has some sense of batching so in theory we can keep the scope of these batches smaller however there are some factors some of which can't be predicted by the caller of these request especially via Admin API like
    • describing the offset on consumer group that consume from too many topics or have too many instances specially that consumer group can be reused between applications that consume from different topics without any problem.
    • describing producer states while the cluster having too many short lived PIDs
    • listing transactions for PID that has too many hanging transactions.
    • request like describe log dir on cluster with jbod can hit timeout so easily on large number or partitions.
  2. Make the pagination more generic by extending FieldSpec  to include pagination flag that indicate if a field will be used for pagination or not. This would simplify the code however, it will be tricky to use on fields for example OffsetFetchResponse  or DescribeLogDirsResponse where we have TopicPartition which usually is defined using two fields topic and partition index which is part of wither `Group` or `LogDir` this making the cursor represented with multiple fields and not only one. Which will be confusing to mark all of them as pagination fields. Also while this might be useful for Response as it will have these fields naturally, it has no usage in Request  as most of the time these fields aren't defined in the request so we still will need a separate Cursor in Request anyway.
  3. Add pagination only on topic partition, while for majority of requests topic partition is the filed that can control the scale of the request however in some cases like describe producers or list transactions these can go out of hand even on small number of partitions as the main factor here is usually misbehaved clients.
  4. Make the pagination more generic by using INDEX instead of a verbose cursor. The argument here that later we can allow users to fire the paging manually instead of Admin API iterating on all the pages.
    • The benefits of this would be unifying all RPCs that need pagination however, this approach comes with some concerns with cases where AdminClient fetch the metadata from multiple brokers/coordinators instead of one broker  
        • The INDEX for RPCs is fired to single broker to fetch a view for some metadata cross the whole cluster would mean the index of the metadata item to start from cross the whole cluster. This definition wouldn't hold true for cases where we fetch metadata from a number of brokers/coordinators like OffsetFetchRequest, DescribeGroupsRequest and ConsumerGroupDescribeRequest where RPC only represent a subset. In this case the index would be the index of next page within this coordinator and not cross cluster. So the definition of the index isn't unified cross RPCs. 
        • For RPCs like OffsetFetchRequest, DescribeGroupsRequest and ConsumerGroupDescribeRequest where RPCs get fired for specific group coordinator the admin client wouldn't know which coordinator to target for the next page without holding an extra info of which coordinator is the next target. This would be achievable only by diverting Admin client from the RPCs to have different pagination definition and handling for these cases. While this is acceptable it push part of the problem of identifying where to start into the client and not solving the issue.
        • On Coordinator levels where metadata is scheduled for read from N number of metadata partitions loaded by this coordinator while the read happened at the end from the cache it still we will be queuing and waiting for events for unnecessary partition as we don't know where an INDEX start. This remove from the opportunity to reduce the number of scheduled read events by skipping some partitions. 
    • While with a verbose cursor for RPCs like OffsetFetchRequest, DescribeGroupsRequest and ConsumerGroupDescribeRequest:

      • We know the next GROUP_ID to target which give us an advantage to not target any coordinators that doesn't host the metadata for this group using the groupId. 

      • We do not need to schedule extra read events on the coordinators when it we catchup multiple groupIds from it. 
      • The cursor made the paging request transparent so no need for the client to hold any extra data to identify which node might hold this cursor as this can be discovered easily.

  5. Add RemainingPages fields to identify how many pages left.
    • Similar to INDEX, in RPCs where data is fetched from single broker this filed will mean all the pages remaining cross cluster while in other cases it would mean how many left in single broker
    • For users of AdminClient they will need to aggregate the remaining pages in situations like the request is divided between brokers. In this case to fillful the meaning of this field we will need to know how many other brokers has even if they not needed for this page. 
  • No labels