Status

Current state: Under Discussion

Discussion thread: here 

JIRA: KAFKA-17348 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

A custom produce request parser would allow to intercept all incoming messages before they get into the broker and apply broker wide logic to the messages. This could be a trace, a filter, a transform(such as lineage), forcing required headers across all messages, compression, encryption, or any other message manipulation before it gets into the broker.

Public Interfaces

  • org.apache.kafka.common.requests.ProduceRequest#parse

Proposed Changes

Introduce a new interface org.apache.kafka.common.requests.ProduceRequestParser with method ProduceRequest parse(ByteBuffer buffer, short version);

Move existing parse logic into its own separate class: org.apache.kafka.common.requests.DefaultProduceRequestParser implementing ProduceRequestParser.

Introduce a new private static ProduceRequestParser produceRequestParser member variable into ProduceRequest.

Change ProduceRequest class initialization to initialize produceRequestParser with a parser explicitly specified via java system properties, environment variables, or a default one.

Change ProduceRequest.parse() method to delegate parsing to produceRequestParser.parse()

Compatibility, Deprecation, and Migration Plan

  • None. This should be a fully backward compatible change that does not require any additional work on existing users part.

Test Plan

All current produce request tests should continue to pass and no additional testing is required.

Rejected Alternatives

None

  • No labels