...
DefaultPartitioner
andUniformStickyPartitioner
will be deprecated, they'll behave the same way as they are today.- Users that don't specify a custom partitioner would get the new behavior automatically.
- Users that explicitly specify
DefaultPartitioner
orUniformStickyPartitioner
would get a deprecation warning, but see no change of behavior. They would need to update the configuration correspondingly to get the updated behavior (remove partitioner.class setting and optionally set partitioner.ignore.keys to 'true' if replacingUniformStickyPartitioner
). Partitioner.onNewBatch
will be deprecated.
Test Results
Test Setup
3 kafka brokers on a local machine, one topic with 3 partitions, RF=1, one partition on each broker:
Topic: foo Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Offline:
Topic: foo Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Offline:
Topic: foo Partition: 2 Leader: 2 Replicas: 2 Isr: 2 Offline:
Kafka-0 has 20ms sleep injected for each produce response.
The settings of Kafka producer are default other than the explicitly mentioned.
Test Series 1
bin/kafka-producer-perf-test.sh --topic foo --num-records 122880 --print-metrics --record-size 512 --throughput 2048 --producer.config producer.properties
The test produces ~120K records 512 bytes each (total of ~60MB) with the throughput throttle of 2048 rec/sec (1MB/sec)
Summary
Partitioner Config | Throughput | Avg Latency | P99 Latency | P99.9 Latency |
---|---|---|---|---|
Old DefaultPartitioner | 0.84 MB/s | 4072.74 ms | 10992 ms | 11214 ms |
partitioner.adaptive.partitioning.enable=false | 1 MB/s | 49.92 ms | 214 ms | 422 ms |
New logic with all default settings | 1 MB/s | 40.06 ms | 154 ms | 220 ms |
partitioner.availability.timeout.ms=5 | 1 MB/s | 36.29 ms | 150 ms | 184 ms |
Old DefaultPartitioner (current implementation)
122880 records sent, 1724.873666 records/sec (0.84 MB/sec), 4072.74 ms avg latency, 11246.00 ms max latency, 3870 ms 50th, 10221 ms 95th, 10992 ms 99th, 11214 ms 99.9th.
The current implementation favors the slowest broker and manages to handle ~0.85 MB/s, so the latency grows over time.
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0} : 46826262.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1} : 9207276.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 9004193.000
The Kafka-0 broker takes ~5x more bytes than the other 2 brokers, becoming the bottleneck for the cluster and potentially skewing downstream data distribution.
New Uniform Partitioner (partitioner.adaptive.partitioning.enable=false)
122880 records sent, 2043.198484 records/sec (1.00 MB/sec), 49.92 ms avg latency, 795.00 ms max latency, 8 ms 50th, 150 ms 95th, 214 ms 99th, 422 ms 99.9th.
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0} : 22237614.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1} : 21606034.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 22152273.000
All brokers take roughly the same load. The slowest broker isn't overloaded anymore, so the clusters is pulling 1MB/sec and latencies are more under control.
New Default Partitioner (all settings are default)
122880 records sent, 2045.477245 records/sec (1.00 MB/sec), 40.06 ms avg latency, 817.00 ms max latency, 7 ms 50th, 141 ms 95th, 154 ms 99th, 220 ms 99.9th.
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0} : 19244362.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1} : 23589010.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 23321818.000
Here the adaptive logic sends less data to the slower broker, so faster brokers can take more load and the latencies are better. The data distribution isn't skewed too much, just enough to adjust load to broker capacity.
New Default Partitioner with partitioner.availability.timeout.ms=5
122880 records sent, 2044.218196 records/sec (1.00 MB/sec), 36.29 ms avg latency, 809.00 ms max latency, 6 ms 50th, 138 ms 95th, 150 ms 99th, 184 ms 99.9th.
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0} : 17431944.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1} : 25781879.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 22977244.000
Here adaptive logic is more responsive to latency and sends even less data to the slower broker, which increases the mix of data processed by faster brokers. Note that it took me a few experiments to find a good value that made a difference, so this confirms the design decision that this logic should be off by default an only turned on after tuning to a specific configuration and workload.
Test Series 2
In this test series, the throughput throttle is increased to 2MB/sec.
bin/kafka-producer-perf-test.sh --topic foo --num-records 122880 --print-metrics --record-size 512 --throughput 4096 --producer.config producer.properties
The test produces ~120K records 512 bytes each (total of ~60MB) with the throughput throttle of 4096 rec/sec (2MB/sec)
New Uniform Partitioner (partitioner.adaptive.partitioning.enable=false)
122880 records sent, 3789.317873 records/sec (1.85 MB/sec), 426.24 ms avg latency, 2506.00 ms max latency, 8 ms 50th, 2065 ms 95th, 2408 ms 99th, 2468 ms 99.9th.
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0} : 22396882.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1} : 21652393.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 21355134.000
Here the brokers take the same load, looks like the slowest broker is maxed out, so the cluster can only take 1.85MB/s and latencies grow.
New Default Partitioner (all settings are default)
122880 records sent, 4078.327249 records/sec (1.99 MB/sec), 34.66 ms avg latency, 785.00 ms max latency, 4 ms 50th, 143 ms 95th, 167 ms 99th, 297 ms 99.9th.
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0} : 14866064.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1} : 25176418.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 25581116.000
Adaptive logic manages to redistribute the load to faster brokers, to sustain the 2MB/sec throughput and the latencies are stable. Note that here a bigger skew in the distribution (vs. 1MB/sec throttle) was made to adjust load to broker capacity, showing that adaptive logic does just enough skew to keep latencies stable.
Rejected Alternatives
KafkaProducer to Explicitly Check for DefaultPartitioner
...