This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: "Accepted"

Discussion thread: here 

Vote thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current ListTransactions API has 3 filters: Transaction state filter, Producer ID filter, and duration filter. However, there is a growing need to filter the transactions based on the transactional ID pattern.

On large clusters, the transactional ID count can be huge. If the client wants to inspect a subset of the transactions whose transactional ID is named in a certain pattern, it is unnecessary to fetch all the transactions first and then apply the transactional ID filter on the client side.

Proposed changes

Propose adding a new filter TransactionalIdPattern. This transaction ID pattern filter works as AND with the other transaction filters. Also, it is empowered with Re2j.

Public Interfaces

ListTransactionsRequest

{
 "apiKey": 66,
 "type": "request",
 "listeners": ["broker"],
 "name": "ListTransactionsRequest",
 "validVersions": "0-2",
 "flexibleVersions": "0+",
 "fields": [
....

                   { "name": "TransactionalIdPattern", "type": "string", "versions": "2+", "nullableVersions": "2+", "default": "null",

                      "about": "The transactional ID regular expression pattern to filter by: if it is empty or null, all transactions are returned; Otherwise then only the transactions matching the given regular expression will be returned."

                   }  

     ]
}

ListTransactionsResponse

{
 "apiKey": 66,
 "type": "response",
 "name": "ListTransactionsResponse",
 "validVersions": "0-2",
 "flexibleVersions": "0+",
 "fields": [
// The response can return a INVALID_REGULAR_EXPRESSION error.

     ]
}

ListTransactionsOptions

Add the filteredTransactionalIdPattern to the ListTransactionsOptions  class used by AdminClient  to pass on filters to the Kafka broker.

@InterfaceStability.Evolving
public class ListTransactionsOptions extends AbstractOptions<ListTransactionsOptions> {
...
// Filter the transactions by their transactional ID pattern.
String filteredTransactionalIdPattern;
 
public ListTransactionsOptions filterOnTransactionalIdPattern(String pattern) {
    this.filteredTransactionalIdPattern = pattern;
    return this;
}
 
public String filteredTransactionalIdPattern() {
    return this.filteredTransactionalIdPattern;
}
...
}


kafka-transactions.sh

Also, the TransactionsCommand tool (kafka-transactions.sh) will include this new filter.

kafka-transactions.sh --list  command will have a new option, --transactional-id-pattern to return transactions that have the same transactional ID pattern.


Compatibility, Deprecation, and Migration Plan

  • ListTransactions API version will be bumped from 1 to 2.
  • When the cluster does not support the new version and the new filter is used, the admin client will fail to build the ListTransaction request with version 2. UnsupportedVersionException will be thrown.

Test Plan

It will be tested in the unit test and integration test.

Rejected Alternatives

N/A

  • No labels