Status
Current state: Under Discussion
Discussion thread: here
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
There are many use cases where it’s acceptable from the business logic point of view for a produced record to be appended to any partition of a multi-partition topic. The current default partitioning algorithm takes advantage of this and periodically rotates target partitions (if the partition and key are not set), also taking into account partition load statistics.
This KIP is motivated by the fact that in a typical Kafka setup in cloud:
- The cloud region has multiple availability zones (AZs).
- The cluster brokers are uniformly distributed across AZs.
- Partition leaders are also distributed across AZs.
- Clients, producers particularly, are also distributed across AZs.
- Inter-AZ network traffic is not free (true for many cloud providers) and it’s desirable to reduce it.
The concept of rack is used to specify the AZ in cloud environments.
If the producers and the partition leaders are more or less uniformly distributed and the target partition is periodically randomly rotated, it is possible to to limit the potential target partitions to a subset of those whose leaders are located in the same AZ as the producer itself. This will drastically reduce the inter-AZ network traffic originating from this producer, opening cost saving. Reduced network latency could potentially be considered as well.
For this to be stable, there are important conditions. The distribution of producers and partition leaders across availability zones should be uniform enough to avoid potential unhealthy imbalances. For example, if there are 2 producers in 2 (of 3 total) AZs and 3 partitions in all AZs, the two producers will produce to their nearby partitions, leaving the third one empty. Producers should generate similar load for the same reason. The degree of this uniformity needed depends heavily on the cluster and its ability to compensate and absorb imbalances. Naturally, this is not always the case and then the proposed algorithm should be avoided (as will be said later, the new logic will be behind configuration). Monitoring for imbalances should also be used.
Proposed Changes
The KIP proposes to extend the current default partitioner algorithm, namely the RecordAccumulator
and BuiltInPartitioner
classes in the following way:
If the automatic partitioning is needed (i.e. no record partition or key is specified; or partitioner.ignore.keys
is set to true
):
- If the client rack is specified and there are partition leaders in the same rack, select the next partition from those “nearby” partitions. The current algorithm is followed in the other aspects, including load balancing.
- Select the next partition from all partitions following the current algorithm in the following cases:
- if the client rack is not specified;
- if there are no partitions with leaders in the same rack as the client;
- if all the “nearby” partitions are unavailable.
- if
partitioner.rack.aware
is false.
The new behavior will be controlled by configuration.
Public Interfaces
Configuration Keys
Key Name | Description | Valid Values | Default Value |
---|---|---|---|
| Controls whether the default partitioner is rack-aware. This has no effect when a custom partitioner is used. | true , false | false |
(already exists for consumers) | Specifies the rack of the client | strings |
Documentation
The introduced changes will be reflected in the documentation, namely in the documentation to the partitioner.class
config, which explains the algorithm to the user.
Compatibility, Deprecation, and Migration Plan
Migrating to the new version will have no impact on clients as the default configuration value keeps the old behavior.
The behavior will remain configurable for the foreseeable future.
No special migration process or tool is needed to migrate to the new version.
Test Plan
The proposed change could be tested on the unit level (the RecordAccumulator
and BuiltInPartitioner
classes).
Testing on the integration level seems excessive as the (internal) interface for partitioning is very clear and small.
Rejected Alternatives
None.