DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/dlrkd9qnd2p4kysy26fygrh885mdm0lo
Vote thread: https://lists.apache.org/thread/0nvz4td8xvqrqkno9vlf4l6nf8xcvqz1
JIRA:
KAFKA-12999
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Lazy initialization for RecordHeader was introduced in KAFKA-10438, improving performance but also creating unexpected side effects.
Since the Consumer is not thread-safe, the same assumption naturally extends to ConsumerRecord. However, users often assume that read-only access across threads is safe.
With lazy initialization, this assumption no longer holds, and users may encounter NullPointerException.
So far, three concurrency-related issues (KAFKA-12999, KAFKA-17725, KAFKA-18470) have been reported with respect to RecordHeader data access.
We can ensure that users have thread-safety guarantees when accessing RecordHeader in a read-only manner, eliminating the risk of NullPointerException.
Note that if other threads modify the value field in RecordHeader (a mutable byte[]), thread-safety is not guaranteed.
Public Interfaces
org.apache.kafka.common.header.internals.RecordHeader class will be updated to be thread-safe.
Proposed Changes
Use double-checked locking and volatile to make RecordHeader thread-safe.
This ensures that synchronization happens only during initialization, and once initialized, subsequent accesses do not acquire any locks.
Note that valueBuffer should also be declared volatile to reduce unnecessary synchronization attempts.
public class RecordHeader implements Header {
private ByteBuffer keyBuffer;
- private String key;
- private ByteBuffer valueBuffer;
- private byte[] value;
+ private volatile String key;
+ private volatile ByteBuffer valueBuffer;
+ private volatile byte[] value;
...
public String key() {
if (key == null) {
+ synchronized (this) {
+ if (key == null) {
key = Utils.utf8(keyBuffer, keyBuffer.remaining());
keyBuffer = null;
+ }
+ }
}
return key;
}
public byte[] value() {
if (value == null && valueBuffer != null) {
+ synchronized (this) {
+ if (value == null && valueBuffer != null) {
value = Utils.toArray(valueBuffer);
valueBuffer = null;
+ }
+ }
}
return value;
}
}
JMH Benchmark: Current Implementation (non-thread-safe) vs. Double-Checked Locking (thread-safe)
The benchmark was executed on an Apple M4 Max system with 48 GB RAM.
The thread-safe version adds a little overhead during the first initialization.
After the initial lazy initialization, subsequent accesses do not incur any locking or additional cost, so the steady-state performance remains essentially identical to the non-thread-safe version.
Benchmark code
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class RecordHeaderSingleThreadBenchmark {
private RecordHeader header;
@Setup(Level.Iteration)
public void setup() {
byte[] valueBytes = new byte[1000];
ByteBuffer keyBuffer = ByteBuffer.wrap("key".getBytes());
ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes);
header = new RecordHeader(keyBuffer, valueBuffer);
}
@Benchmark
public String benchmarkKey() {
return header.key();
}
@Benchmark
public byte[] benchmarkValue() {
return header.value();
}
}
Result
The thread-safe double-checked locking implementation introduces negligible overhead (~0.3 ns) only during the first initialization of key or value
Current Implementation (non-thread-safe)
Benchmark Mode Cnt Score Error Units RecordHeaderSingleThreadBenchmark.benchmarkKey avgt 15 0.457 ± 0.023 ns/op RecordHeaderSingleThreadBenchmark.benchmarkValue avgt 15 0.451 ± 0.020 ns/op
Double-Checked Locking (thread-safe)
Benchmark Mode Cnt Score Error Units RecordHeaderSingleThreadBenchmark.benchmarkKey avgt 15 0.774 ± 0.010 ns/op RecordHeaderSingleThreadBenchmark.benchmarkValue avgt 15 0.773 ± 0.008 ns/op
JMH Benchmark: Double-Checked Locking vs. Full-Method Synchronization
Full-Method Synchronization means that the entire key()/ value() method is synchronized.
This benchmark use code in the above section with 8 threads to compare performance.
Benchmark code
public class RecordHeaderBenchmark {
...
@Benchmark
+ @Threads(8)
public String benchmarkKey() {
return header.key();
}
@Benchmark
+ @Threads(8)
public byte[] benchmarkValue() {
return header.value();
}
}
Result
Double-Checked Locking is significantly faster (286×) than full-method synchronization.
Double-Checked Locking
Benchmark Mode Cnt Score Error Units RecordHeaderBenchmark.benchmarkKey avgt 15 0.854 ± 0.011 ns/op RecordHeaderBenchmark.benchmarkValue avgt 15 0.846 ± 0.005 ns/op
Full-Method Synchronization
Benchmark Mode Cnt Score Error Units RecordHeaderBenchmark.benchmarkKey avgt 15 244.625 ± 36.088 ns/op RecordHeaderBenchmark.benchmarkValue avgt 15 233.566 ± 49.321 ns/op
Compatibility, Deprecation, and Migration Plan
Making RecordHeader thread-safe does not break any compatibility.
Test Plan
Unit tests will be written to verify that no
NullPointerExceptionoccurs.The benchmark described above will also be included. Successful completion of the benchmark without any
NullPointerExceptionwill demonstrate thatRecordHeaderis thread-safe.Additionally, the benchmark will confirm that the double-checked locking implementation performs significantly better than full-method synchronization.
Rejected Alternatives
Full-Method Synchronization
Although it seems simple, it introduces significant overhead to key() and value() on every method invocation.