This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KafkaProducer is designed to be thread-safe and we encourage users to share a single producer instance across multiple threads[link]. These threads often need to send records to different topics. However, another important configurations acks and compression are currently set at the producer level only.
As a result, users needing different settings for each topic must create additional producer instances instead of reusing the existing one and this approach requires 3x producer instances at least.
This is harmful for edge device(resource-constrained device) since users need a producer pool to handle different acks and compression. Additionally, this approach creates many idle producers if a sensor with a specific setting has no data for a while.
In this KIP, we propose adding topic-level acks and compression, enabling users reuse a single KafkaProducer even when different topics require different settings. This change will reduce the need for multiple producers and minimize resource usage on edge devices.
Public Interfaces
We add two new config TOPIC_ACKS_CONFIG and TOPIC_ACKS_DOC into ProducerConfig and define the format.
- Format : acks.topic => acks.<Topic-A>=<acks-1>:acks.<Topic-B>=<acks-2>
- example: config.put(acks.topic, acks.money=-1:acks.temperature=1)
- Format : compression.type.topic => compression.type.<Topic-A>=<compression-1>:compression.type.<Topic-B>=<compression-2>
- example: config.put(compression.type.topic, compression.type.money=lz4:compression.type.temperature=gzip)
Name | Type | Importance | Default | Description |
---|---|---|---|---|
acks.topic | String | LOW | null | This configuration item will set acks for specific topics. |
compression.type.topic | String | HIGH | none | This configuration item will set compression for specific topics. |
Proposed Changes
- Adding a new configuration in ProducerConfig : acks.topic and compression.type.topic, By setting this configuration item, users can customize the acks and compression for specific topics.
- Behavior change:
- Attach topic-level acks and compression settings to RecordAccumulator#TopicInfo.
- Return Map<Acks, List<ProducerBatch>> when RecordAccumulator#drainBatchesForOneNode is called.
- Finally, we can get the acks information and group same acks into List<ProducerBatch>> for a node in sender#sendProduceRequests and then send request.
Compatibility, Deprecation, and Migration Plan
- The public interface is new one so there are no compatibility issues.
- The ProduceRequest already contains an acks field so we don't need add any additional change at PRC-level [link].
Performance issue
- There is no performance issue since we will group the same acks batches to broker.
- producer request is asynchronous operation so we do not need to wait the response.
Test Plan
- Relevant unit tests and integration tests will be added to demonstrate the functionality.
Rejected Alternatives
- N/A