Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/dlrkd9qnd2p4kysy26fygrh885mdm0lo

Vote thread: https://lists.apache.org/thread/0nvz4td8xvqrqkno9vlf4l6nf8xcvqz1

JIRA: KAFKA-12999 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Lazy initialization for RecordHeader was introduced in KAFKA-10438, improving performance but also creating unexpected side effects.
Since the Consumer is not thread-safe, the same assumption naturally extends to ConsumerRecord. However, users often assume that read-only access across threads is safe.
With lazy initialization, this assumption no longer holds, and users may encounter NullPointerException.
So far, three concurrency-related issues (KAFKA-12999, KAFKA-17725, KAFKA-18470) have been reported with respect to RecordHeader data access.
We can ensure that users have thread-safety guarantees when accessing RecordHeader in a read-only manner, eliminating the risk of NullPointerException.
Note that if other threads modify the value field in RecordHeader (a mutable byte[]), thread-safety is not guaranteed.

Public Interfaces

org.apache.kafka.common.header.internals.RecordHeader class will be updated to be thread-safe.

Proposed Changes

Use double-checked locking and volatile to make RecordHeader thread-safe.
This ensures that synchronization happens only during initialization, and once initialized, subsequent accesses do not acquire any locks.
Note that valueBuffer should also be declared volatile to reduce unnecessary synchronization attempts.

Proposed RecordHeader Change
public class RecordHeader implements Header {
    private ByteBuffer keyBuffer;
-   private String key;
-   private ByteBuffer valueBuffer;
-   private byte[] value;

+   private volatile String key;
+   private volatile ByteBuffer valueBuffer;
+   private volatile byte[] value;

...           

	public String key() {
        if (key == null) {
+           synchronized (this) {
+               if (key == null) {
                    key = Utils.utf8(keyBuffer, keyBuffer.remaining());
                    keyBuffer = null;
+               }
+           }
        }
        return key;
    }

    public byte[] value() {
        if (value == null && valueBuffer != null) {
+           synchronized (this) {
+               if (value == null && valueBuffer != null) {
                    value = Utils.toArray(valueBuffer);
                    valueBuffer = null;
+               }
+           }
        }
        return value;
    }
}

JMH Benchmark: Current Implementation (non-thread-safe) vs. Double-Checked Locking (thread-safe)

The benchmark was executed on an Apple M4 Max system with 48 GB RAM.
The thread-safe version adds a little overhead during the first initialization.
After the initial lazy initialization, subsequent accesses do not incur any locking or additional cost, so the steady-state performance remains essentially identical to the non-thread-safe version.

Benchmark code

Record Header Single Thread Benchmark
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class RecordHeaderSingleThreadBenchmark {

    private RecordHeader header;

    @Setup(Level.Iteration)
    public void setup() {
        byte[] valueBytes = new byte[1000];
        ByteBuffer keyBuffer = ByteBuffer.wrap("key".getBytes());
        ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes);
        header = new RecordHeader(keyBuffer, valueBuffer);
    }

    @Benchmark
    public String benchmarkKey() {
        return header.key();
    }

    @Benchmark
    public byte[] benchmarkValue() {
        return header.value();
    }
}

Result

The thread-safe double-checked locking implementation introduces negligible overhead (~0.3 ns) only during the first initialization of key or value

Current Implementation (non-thread-safe)

Current Implementation single thread result
Benchmark                                         Mode  Cnt  Score   Error  Units
RecordHeaderSingleThreadBenchmark.benchmarkKey    avgt   15  0.457 ± 0.023  ns/op
RecordHeaderSingleThreadBenchmark.benchmarkValue  avgt   15  0.451 ± 0.020  ns/op

Double-Checked Locking (thread-safe)

Double-Check Locking single thread result
Benchmark                                         Mode  Cnt  Score   Error  Units
RecordHeaderSingleThreadBenchmark.benchmarkKey    avgt   15  0.774 ± 0.010  ns/op
RecordHeaderSingleThreadBenchmark.benchmarkValue  avgt   15  0.773 ± 0.008  ns/op

JMH Benchmark: Double-Checked Locking vs. Full-Method Synchronization

Full-Method Synchronization means that the entire key()/ value() method is synchronized.

This benchmark use code in the above section with 8 threads to compare performance.

Benchmark code

RecordHeaderBenchmark
public class RecordHeaderBenchmark {

...

    @Benchmark
+   @Threads(8)
    public String benchmarkKey() {
        return header.key();
    }

    @Benchmark
+   @Threads(8)
    public byte[] benchmarkValue() {
        return header.value();
    }
}

Result

Double-Checked Locking is significantly faster (286×) than full-method synchronization.

Double-Checked Locking

Double-Checked Locking benchmark result
Benchmark                             Mode  Cnt  Score   Error  Units
RecordHeaderBenchmark.benchmarkKey    avgt   15  0.854 ± 0.011  ns/op
RecordHeaderBenchmark.benchmarkValue  avgt   15  0.846 ± 0.005  ns/op

Full-Method Synchronization

Double-Checked Locking benchmark result
Benchmark                             Mode  Cnt    Score    Error  Units
RecordHeaderBenchmark.benchmarkKey    avgt   15  244.625 ± 36.088  ns/op
RecordHeaderBenchmark.benchmarkValue  avgt   15  233.566 ± 49.321  ns/op

Compatibility, Deprecation, and Migration Plan

Making RecordHeader thread-safe does not break any compatibility.

Test Plan

  • Unit tests will be written to verify that no NullPointerException occurs.

  • The benchmark described above will also be included. Successful completion of the benchmark without any NullPointerException will demonstrate that RecordHeader is thread-safe.

  • Additionally, the benchmark will confirm that the double-checked locking implementation performs significantly better than full-method synchronization.

Rejected Alternatives

Full-Method Synchronization

Although it seems simple, it introduces significant overhead to key() and value() on every method invocation.

  • No labels