This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Current state: Under Discussion
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
The CommitOffset api allows users to include a string of metadata while committing. This metadata can be useful for tracking information about the commit such as when and where it came from.
Unfortunately, the KafkaConsumer interface does not provide consistent access to set this metadata. The only way for a user to provide metadata for a commit is to manually construct the list of offsets to commit. The argument-less commit methods and auto-commit functionality do not expose any hooks to provide metadata and default to using an empty string. The inability to access the commit metadata for these methods makes it challenging to provide or enforce consistent metadata across multiple applications within in organization.
In addition, it would be useful for users to be able to manipulate or filter the actual offsets to be committed. An example would be filtering out commits that are identical to the most recent successful commit.
This KIP proposes an additional method in the ConsumerInterceptor that would allow for the modification of OffsetsAndMetadata to be committed to the cluster.
This KIP proposes adding an additional method to the ConsumerInterceptor interface that can be used to manipulate both the offsets to be committed to a cluster as well as the metadata to be committed.
The new method will have similar behavior and caveats to the onConsume method. Specifically, The ability to freely modify the contents of the Map containing the offsets to commit, the ability to chain multiple calls together, and the suppression of exceptions.
An additional "preCommit" method will also be added to the internal ConsumerInterceptors object to handle the chaining of multiple interceptors. This method is basically identical to the existing onConsume method.
ConsumerInterceptors#preCommit() will be inserted as the first call in ConsumerCoordinator#sendOffsetCommitRequest(). It will be inserted prior to the check for an empty set of commits to allow Interceptors to remove all committed offsets from a request if they see fit.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- No impact
- The new method will be added with a default implementation that simply returns the input offsets
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Adding a new interface specifically to manipulate commits was considered, but would require a significantly higher amount of code changes. The additional complexity to developers of discovering and learning about a new interface is similarly undesirable.
The Kafka Protocol currently uses a single String for the commit metadata. This presents a potential challenge for developers using the preCommit method as they need to append their metadata to the end of the existing metadata string. It also presents a challenge for anyone who wants to do any kind of processing based on the metadata as the metadata string may have a mix of strange metadata that the processor would need to deal with. Adding Headers to the commit protocol and deprecating the current metadata string could reduce these challenges, but is outside the scope of this KIP.
The Producer also has the ability to commit offsets as part of the Transactional interface, however it currently does not have the ability to configure ConsumerInterceptors. As such it has never had access to the onCommit method of the ConsumerInterceptor and will not have access to the new preCommit method. Giving the producer access to these interceptors would bring further consistency to the commit interfaces, but would likely require a more complicated change than simply adding ConsumerInterceptors. Adding ConsumerInterceptors would likely add confusion, but more logical solutions such as splitting the commit methods into a separate CommitInterceptor interface would require a much more in-depth discussion. As such, these changes are outside of the scope of this KIP