Status

Current state: Under Discussion

Discussion thread: Link

JIRA: KAFKA-20387 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

RecordHeader stores its value as byte[]. This is the right wire representation, but it makes the API challenging for the types everyone mostly puts in headers (string, int, boolean...):

Construction is tedious:

// String
record.headers().add(new RecordHeader("traceId", traceId.getBytes(UTF_8)));

// Integer
record.headers().add(new RecordHeader("retryCount", ByteBuffer.allocate(4).putInt(3).array()));

// Boolean
record.headers().add("isRetry",
    new byte[] { (byte) (isRetry ? 1 : 0) });

Reading is tedious:

String traceId = new String(header.value(), UTF_8);
int retryCount = ByteBuffer.wrap(header.value()).getInt();
boolean isRetry = header.value()[0] != 0;

Keys and values on ProducerRecord have configurable serializers. Headers have nothing -- every team writes their own wrappers (LoggableRecordHeader, utility methods, byte constantsto make headers usable.

Public Interfaces

Add static factories for construction and typed accessors for reading, covering the common types used in headers. Internally still byte[] with no wire format changes.


Header.java
public interface Header {

    /**
     * Returns the key of the header.
     *
     * @return the header's key; must not be null.
     */
    String key();

    /**
     * Returns the value of the header.
     *
     * @return the header's value; may be null.
     */
    byte[] value();

	// Read accessors
	public String utf8StringValue();
	public String stringValue(Charset charset);
	public int intValue();
	public long longValue();
	public byte byteValue();
	public short shortValue();
	public float floatValue();
	public double doubleValue();
	public boolean booleanValue();
}


Headers.java
public interface Headers extends Iterable<Header> {

    public Headers add(String key, String value);
	public Headers add(String key, String value, Charset charset);
	public Headers add(String key, int value);
	public Headers add(String key, long value);
    public Headers add(String key, byte value);
	public Headers add(String key, short value);
	public Headers add(String key, float value);
	public Headers add(String key, double value);
	public Headers add(String key, boolean value);
 
    // rest of the methods
}


RecordHeader
public static RecordHeader ofUtf8String(String key, String value) { ... }
public static RecordHeader ofString(Charset charset) { ... }
public static RecordHeader ofInt(String key, int value) { ... }
public static RecordHeader ofLong(String key, long value) { ... }
public static RecordHeader ofByte(String key, byte value) { ... }
public static RecordHeader ofShort(String key, short value) { ... }
public static RecordHeader ofFloat(String key, float value) { ... }
public static RecordHeader ofDouble(String key, double value) { ... }
public static RecordHeader ofBoolean(String key, boolean value) { ... }
public static RecordHeader ofBytes(String key, byte[] value) { ... }

Typed Accessors

public String utf8StringValue() { ... }
public String stringValue(Charset charset) { ... }
public int intValue() { ... }
public long longValue() { ... }
public byte byteValue() { ... }
public short shortValue() { ... }
public float floatValue() { ... }
public double doubleValue() { ... }
public boolean booleanValue() { ... }

Same pattern: instantiate, deserialize, discard.

The existing byte[] value() method is unchanged.

Proposed Changes

While RecordHeader is defined under the internals/ package, the public class access is not preventing users from creating a reference via the RecordHeader class. To properly support types, we add new overloaded methods on Header and Headers.

Each ofXXX method serializes the value to byte[] internally using the same encoding as the corresponding Kafka Serializer (UTF-8 for strings, big-endian for numerics, single byte 0x01/0x00 for boolean).

Null handling: ofString(key, null, charset) (ofStringUtf8) and ofBytes(key, null) create a header with a null byte[] value, consistent with the existing RecordHeader(String, byte[]) constructor which accepts null. Primitive factories (ofInt, ofLong, etc.) cannot accept null since they take primitives. 

Each deserializes the internal byte[]. Throws IllegalStateException if the byte array is the wrong size for the requested type (e.g. calling intValue() on a 7-byte array). Calling a typed accessor on a header with a null value throws IllegalStateException. stringValue() on a null value returns null.

Compatibility, Deprecation, and Migration Plan

Fully backward compatible. All changes are additive with new static factories and new accessor methods. Existing RecordHeader(String, byte[]) constructor and byte[] value() are unchanged. 

Test Plan

  • Unit tests for each static factory (ofString, ofInt, ofLong, ofShort, ofFloat, ofDouble, ofBoolean, ofBytes)
  • Unit tests for each typed accessor, including wrong-size byte array errors
  • Unit tests for wrong-type reading (e.g. create with ofInt, read with stringValue())
  • Unit tests for null values -- ofString(key, null), ofBytes(key, null), and typed accessor on null value
  • Unit tests for boundary values (Integer.MAX_VALUE, Integer.MIN_VALUE, Long.MAX_VALUE, Double.NaN, Double.POSITIVE_INFINITY, etc.)
  • Unit tests for empty string (ofString(key, ""))
  • Unit tests for Unicode strings (multi-byte characters, emojis)
  • Unit tests verifying byte[] value() is unchanged
  • Integration tests verifying headers created with static factories survive produce/consume round-trip
  • Integration tests verifying typed accessors work on headers received from consumer

Rejected Alternatives

Configurable header serde on producer/consumer. A config mapping header keys to serializer classes adds complexity and couples header semantics to client config. The common types (string, int, long, short, float, double, boolean) cover the vast majority of header usage and don't need configuration.

Default methods on the Header/Headers interface. While this will help avoid compilation errors after upgrading to a newer AK version, it is better than silently failing for implementers. If users roll their own version of record headers, they must be aware of the type-aware methods and consciously adapt it to their use cases.

Generic serde parameter stored on RecordHeader instance. new RecordHeader(key, value, serializer) makes RecordHeader serde-aware and stores extra references. The static factory approach keeps RecordHeader as a (String, byte[]) tuple with no new instance state. The generic of() / value() methods accept a class, instantiate it, use it, and discard it.

Moving ReadcordHeader/RecordHeaders as part of the public API. Moving the classes out of the internals will break compatibility and force a major version change. The default implementation since the inception of the classes has been within the internals package. We aim to preserve this design choice.

  • No labels