This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KafkaProducer is designed to be thread-safe and we encourage users to share a single producer instance across multiple threads[link]. These threads often need to send records to different topics. However, another important configurations acks and compression are currently set at the producer level only.

As a result, users needing different settings for each topic must create additional producer instances instead of reusing the existing one and this approach requires 3x producer instances at least.

This is harmful for edge device(resource-constrained device) since users need a producer pool to handle different acks and compression. Additionally, this approach creates many idle producers if a sensor with a specific setting has no data for a while.

In this KIP, we propose adding topic-level acks and compression, enabling users reuse a single KafkaProducer even when different topics require different settings. This change will reduce the need for multiple producers and minimize resource usage on edge devices.

Public Interfaces

We add two new config TOPIC_ACKS_CONFIG  and TOPIC_ACKS_DOC into ProducerConfig and define the format.

  • Format  : acks.topic => acks.<Topic-A>=<acks-1>:acks.<Topic-B>=<acks-2>
    • example: config.put(acks.topic, acks.money=-1:acks.temperature=1)
  • Format  : compression.type.topic => compression.type.<Topic-A>=<compression-1>:compression.type.<Topic-B>=<compression-2>
    • example: config.put(compression.type.topic, compression.type.money=lz4:compression.type.temperature=gzip)

Name

Type

Importance

Default

Description

acks.topicStringLOW
null

This configuration item will set acks for specific topics.

compression.type.topicStringHIGHnone

This configuration item will set compression for specific topics.

Proposed Changes

  • Adding a new configuration in  ProducerConfig : acks.topic and compression.type.topic,  By setting this configuration item, users can customize the acks and compression for specific topics.
  • Behavior change:
    1. Attach topic-level acks and compression settings to RecordAccumulator#TopicInfo.
    2. Return Map<Acks, List<ProducerBatch>> when RecordAccumulator#drainBatchesForOneNode is called.
    3. Finally, we can get the acks information and group same acks into List<ProducerBatch>> for a node in sender#sendProduceRequests and then send request.

Compatibility, Deprecation, and Migration Plan

  • The public interface is new one so there are no compatibility issues.
  • The ProduceRequest already contains an acks field so we don't need add any additional change at PRC-level [link].

Performance issue

  • There is no performance issue since we will group the same acks batches to broker.
  • producer request is asynchronous operation so we do not need to wait the response.

Test Plan

  • Relevant unit tests and integration tests will be added to demonstrate the functionality.

Rejected Alternatives

  • N/A
  • No labels