Status
Current state: Accepted
Discussion thread: part 1 and current discussion
JIRA:
-
KAFKA-16368Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
With the upcoming release of 4.0, we have an opportunity to revisit the constraints and defaults for various Kafka configurations. Each of the proposed change to configuration(s) in this KIP has a specific motivation list next to it.
Public Interfaces
Note that the implementation of this KIP is split across major versions 4.0 and 5.0.
# | Configuration name | Default Value | Diff in constraints (only lists the modified constraints) | Target release version |
---|---|---|---|---|
1 | segment.ms / log.roll.ms | Unchanged: 7 days | Previous: min = 1 ms New: min = 1 min. | 5.0 |
2 | segment.bytes / log.segment.bytes | Unchanged: | Previous: New: | 4.0 |
3 | num.recovery.threads.per.data.dir | Previous: 1 New: 2 | Unchanged | 4.0 |
4 | segment.index.bytes / log.index.size.max.bytes | Unchanged: 10 MB | Previous: New: | 5.0 |
5 | linger.ms | Previous: 0 New: 5 | Unchanged | 4.0 |
6 | max.compaction.lag.ms / log.cleaner.max.compaction.lag.ms | Unchanged: | Previous: New: | 5.0 |
7 | message.timestamp.after.max.ms / log.message.timestamp.after.max.ms | Previous: New: | Unchanged | 4.0 |
9 | remote.log.manager.copier.thread.pool.size | Previous: -1 New: 10 | Current: (-1, 1, 2, ...) New: (1, 2, ...) | 4.0 |
10 | remote.log.manager.expiration.thread.pool.size | Previous: -1 | Current: (-1, 1, 2, ...) | 4.0 |
11 | remote.log.manager.thread. pool.size | Previous: 10 | Unchanged | 4.0 |
Proposed Changes
Please refer to the table above for an overview of the proposed changes. The changes to defaults and new constraints will be applied for both, the broker level configs and corresponding topic level configs. Sometimes there is a slight difference in naming for the same property at broker level and topic level. As an example, log.cleaner.max.compaction.lag.ms
is the broker level config and max.compaction.lag.ms
is the corresponding topic level config.
#1 #2 #4 #6 Config(s): segment.ms / segment.bytes / segment.index.bytes / max.compaction.lag.ms
Motivation for the change
Users of Apache Kafka sometimes inadvertently configure certain settings with very small values. This can lead to the creation of an excessive number of log segments, which in turn can exhaust the system’s file descriptors or the operating system’s memory-mapped file (mmap) limit. These resource exhaustion scenarios degrade system performance and can lead to critical failures in production environments.
Additionally, the current minimum value for the offset.index.interval.bytes configuration is technically invalid for the OffsetIndex. This is because the smallest possible entry in this index requires 8 bytes (see relevant code reference), yet the configuration allows a minimum value of 4 bytes. As a result, attempting to use the current minimum value leads to exceptions during operation, further complicating system stability.
Proposed change
The proposed changes introduce a revised set of minimum values for the affected configurations, chosen with a balance between practical system requirements and operational efficiency. The new minimum values ensure:
- Resource Safety: Prevent configurations that could overwhelm the operating system’s resources, such as file descriptors or mmap limits.
- Technical Validity: Align configuration constraints with the actual requirements of the OffsetIndex, avoiding invalid states or exceptions during runtime.
- Operational Stability: Reduce the likelihood of accidental misconfigurations that can disrupt production environments.
These constraints are carefully selected based on both the theoretical minimums required by Kafka’s internal mechanics and practical considerations for system resource utilization.
#3 Config(s): num.recovery.threads.per.data.dir
Motivation for the change
After an unclean shutdown, a Kafka broker must perform log recovery before restarting. This process can be time-consuming, particularly in environments with a large number of partitions (e.g., thousands). The current configuration defaults may result in a single-threaded or minimally parallelized recovery process, leading to extended downtime as each partition's log segments are recovered sequentially or with limited concurrency.
This delay can significantly impact availability and throughput in Kafka clusters, especially in scenarios where unclean shutdowns are not uncommon due to infrastructure issues.
Proposed change
Increase the default value of num.recovery.threads.per.data.dir
to 2. This change enabling greater parallelism and faster recovery times.
#5 Config(s): linger.ms
Motivation for the change
The current default value of linger.ms
is set to 0
, which means Kafka sends out a message as soon as it is available. While this setting is designed to minimize latency, it does not hold true in scenarios with a high volume of messages. Kafka's architecture is optimized for handling large batches of messages, as batching reduces the overhead associated with processing individual messages.
When linger.ms
is set to 0
, Kafka effectively sends one message per batch in these cases, leading to a significant increase in the number of small requests. This results in inefficient utilization of network and server resources, as Kafka performs better when it can process larger, consolidated batches of messages. This misalignment between configuration and Kafka's optimization results in unnecessary overhead and performance degradation under high-throughput workloads.
Proposed change
Increase the default value of linger.ms
to 5ms
. This value represents a reasonable compromise between maintaining low-latency messaging (within single-digit milliseconds) and ensuring efficient batching of messages to avoid overwhelming Kafka with a flood of small requests.
Key advantages of this change:
- Improved Throughput: Larger batches of messages will allow Kafka to process requests more efficiently, improving overall system throughput.
- Balanced Latency: A
5ms
default ensures latency remains low while taking advantage of Kafka's batch optimization. - Resource Optimization: Reducing the number of small requests will minimize strain on the network and brokers, leading to more stable operations under high message loads.
- Better Out-of-the-Box Performance: With the new default, Kafka will perform more optimally without requiring users to manually tune the configuration.
By adopting this change, Kafka's default behavior will align better with its design principles, improving performance for the majority of use cases while still allowing users to adjust the configuration if they have specific low-latency requirements.
#7 Config(s): message.timestamp.after.max.ms
Motivation for the change
See: KIP-937: Improve Message Timestamp Validation for background on this configuration. A user can accidentally send messages with createTime provided in nano sec. In such cases, the current default values lead to aggressive segment rotation.
Notably, we are not modifying log.message.timestamp.before.max.ms because there is a legitimate use case for sending records with past timestamps such as in change data capture (CDC) cases, extracting the records from a database and replying on Kafka.
#9 #10 #11 Config(s): remote.log.manager.copier.thread.pool.size / remote.log.manager.expiration.thread.pool.size / remote.log.manager.thread.pool.size
Motivation for the change
KIP-950 proposed to split the thread pool used by RemoteLogManager into copy and expiration thread pools. With KIP-956, the changes are landed in v3.9. The default value for the copier and expiration thread pools was set to -1 to derive the value from the previous remote.log.manager.thread.pool
for backward compatibility. See PR#17499.
Now, we have three thread pools:
1. remote.log.manager.thread.
pool.size
- handles the RLMFollowerTask to read the highest-uploaded remote offset for follower partitions. It was planned to deprecate this pool after KIP-950 but retained to handle this task.
2. remote.log.manager.copier.
thread.pool.size
- handles the RLMCopyTask to copy the segments to remote storage for leader partitions.
3. remote.log.manager.
expiration.thread.pool.size
- handles the RLMExpirationTask to delete the expired remote segments for leader partitions.
A broker acts as both the leader for some partitions and as follower for other partitions, deriving the value from remote.log.manager.thread.
pool.size
for copier and expiration thread pool should be avoided. The task of the follower is light-weight to fetch only the highest-uploaded remote offset whereas the task of the leader is heavy-weight to copy and delete the segments. Moreover, if the user was running with 10 threads in v3.8, then v3.9 server will start 30 threads.
Proposed change
To provide clear configuration and reduce the need for manual tuning, the following changes are proposed:
- Change the default value of copier and expiration thread pool size from -1 to 10. This will also ensure smooth validation (step increments x/2 to x*2) of thread-count dynamic config change.
- Change the default value of
remote.log.manager.thread.pool.size
from 10 to 2, as the task is light-weight to fetch the highest-uploaded remote offset for the follower partitions. - We can deprecate the logic that refers the value from another thread-pool (transitive config dependency). The thread-pool configured values become independent of each other.
Compatibility, Deprecation, and Migration Plan
This change introduces a backward-incompatible adjustment, meaning that during a rolling upgrade to Kafka 4.0 from a previous version, brokers configured with invalid values for the affected settings will fail to start and throw a ConfigException
.
Rationale for Accepting the Risk:
Upgrade Documentation Review:
It is standard practice to expect users to carefully review the upgrade documentation before transitioning to a major version. Kafka 4.0 already includes other backward-incompatible changes that users must address, making this change part of a broader upgrade process.Reasonable Constraints:
The new constraints are designed to prevent configurations that can lead to system instability. These constraints align with Kafka’s operational best practices, and, as per the author’s knowledge and community discussions, there are no known use cases that would require values outside the newly enforced boundaries.Minimal Impact on Valid Configurations:
Configurations adhering to best practices will remain unaffected, ensuring that users already following recommended guidelines experience a seamless upgrade.
Risk Mitigation Strategies:
To minimize the impact on existing users, the following measures will be implemented:
Prominent Documentation:
These changes will be clearly and prominently highlighted in the "Upgrading to Kafka 4.0" section of the documentation. This will include detailed explanations of the new constraints, examples of valid configurations, and guidance on how to identify and correct invalid settings.Pre-Upgrade Validation Tool (not part of this KIP):
A validation tool or script (if applicable) can be provided to help users check their existing configurations for compatibility with Kafka 4.0 before initiating the upgrade process. This will proactively identify potential issues.
By enforcing these constraints and providing robust support for the upgrade process, this change ensures a more stable and predictable Kafka system for all users while minimizing the impact on existing deployments.
Test Plan
Appropriate unit tests will be added for the newly added constraints.
Rejected Alternatives
- Set the default for io.threads, network.threads and recovery threads based on the number of cores instead of a fixed value. We chose to keep this change out of the scope of this KIP because finding the exact formula based on number of cores requires some research and that doesn't align with the timeline for this KIP.