Status

Current stateUnder Discussion

Discussion thread: link

JIRA: KAFKA-20384 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Kafka consumer's only extension point before records reach application code is ConsumerInterceptor.onConsume(), which operates on the entire ConsumerRecords batch. There is no per-record lifecycle hook.

This means the client has no concept of when an individual record's processing starts or ends. It knows when records are fetched and iterated, but iteration and processing are not the same thing i.e. a record may be iterated on the poll thread and dispatched to a worker thread, buffered for out of the loop processing.

This gap has forced every major framework and instrumentation library to independently build per-record processing hooks:

  • OpenTelemetry and Datadog wrap Iterator<ConsumerRecord> to create per-record spans. The span opens on next() and closes on the following hasNext(). This treats iteration as processing and when records are dispatched to a thread pool, the span lifetime is microseconds (iteration), not the actual processing time. OTel's own source comments: "this may potentially create problems if this iterator is used from different threads. But at the moment we cannot do much about this."

  • Spring Kafka built RecordInterceptor<K, V> (since 2.7) with per-record intercept(), success(), failure(), and afterRecord() hooks, but only available within Spring's container.

  • SmallRye Reactive Messaging the engine behind the quarkus-messaging-kafka extension and the standard within Quarkus wraps each record in IncomingKafkaRecord with per-record ack()/nack() and built-in OTel tracing per record, because the client doesn't provide it.

  • Micronaut Kafka manages per-record lifecycle internally via OffsetStrategy.SYNC_PER_RECORD and per-record Acknowledgement injection.

Five projects with the same root cause. This KIP provides the client-level hook that can provide some consolidation.

The primary beneficiaries are frameworks that control the processing loop - they call newContext() on behalf of the application, so tracing/MDC processors configured at the client level work across Spring, Quarkus, and Micronaut without framework-specific integration.

Use Cases

1. Distributed Tracing

Today, OTel/DD span lifetime = time between next() calls on the poll thread. With this KIP:

dd-records-iteration
for (ConsumerRecord<K, V> record : records) {
    executor.submit(() -> {
        try (ConsumerRecordContext ctx = record.newContext()) {
            process(record); // span covers actual processing, on the worker thread
        }
    });
}

2. Logging Context (MDC)

Today, every team manually sets/clears MDC fields per record with try/finally boilerplate. With a configured MDCContextProcessor, record.newContext() handles it.

3. Multi-Threaded Context Propagation

Today, teams manually plumb MDC maps and parent spans across thread boundaries or resort to hacks like Datadog's subList(i, i+1) workaround to trigger TracingList instrumentation on worker threads. With this KIP, record.newContext() works on any thread.

Public Interfaces

ConsumerRecordContextProcessor

ConsumerRecordContextProcessor
package org.apache.kafka.clients.consumer;

/**
 * A processor that establishes per-record processing context for consumer records.
 *
 * <p>Implementations are instantiated once during consumer creation via the
 * {@code context.processors} configuration and receive the full consumer
 * configuration through {@link #configure(Map)}. For each record processed,
 * the user calls {@link #createContext(ConsumerRecord)} to obtain a fresh
 * {@link ConsumerRecordContext} instance.
 *
 * <p>Processors are stateless with respect to individual records. Per-record
 * state belongs on the {@link ConsumerRecordContext}, not the processor. The processor
 * acts as a factory: initialized once, producing contexts per record.
 *
 * <p>Multiple processors may be configured and are chained in declared order.
 * Each processor's context is created and closed independently.
 */
public interface ConsumerRecordContextProcessor extends Configurable, Closeable {
    /**
     * Create a new context for processing a single record. All setup (span creation, MDC injection,
     * etc.) happens here. The returned context's {@link ConsumerRecordContext#close()} will be called
     * when processing completes.
     *
     * @param record the record about to be processed
     * @return a new ConsumerRecordContext instance; must not return null
     */
    ConsumerRecordContext createContext(ConsumerRecord<?, ?> record);
}


Instantiated once during consumer creation via context.processors config. Stateless per record  state belongs on the returned context.

ConsumerRecordContext

ConsumerRecordContext
/**
 * Per-record processing context with deterministic lifecycle.
 *
 * <p>A ConsumerRecordContext is created by
 * {@link ConsumerRecordContextProcessor#createContext(ConsumerRecord)}, which performs all
 * setup (span creation, MDC injection, etc.). The only remaining contract is
 * {@link #close()}, called when processing completes.
 *
 * <p>The intended usage is with try-with-resources:
 * <pre>{@code
 * try (ConsumerRecordContext ctx = record.initContext()) {
 *     // process record sync or async
 * }
 * }</pre>
 *
 * <p>For async processing, the caller is responsible for closing the context
 * when the async operation completes:
 * <pre>{@code
 * ConsumerRecordContext ctx = record.initContext();
 * CompletableFuture.runAsync(() -> process(record))
 *     .whenComplete((r, ex) -> ctx.close());
 * }</pre>
 */
@FunctionalInterface
public interface ConsumerRecordContext extends AutoCloseable {

    ConsumerRecordContext NO_OP = () -> {};

    /**
     * Close this context and release any resources.
     *
     * <p>Called exactly once when processing completes. Implementations
     * should clean up any state established during context creation.
     *
     * <p>Exceptions thrown from this method are caught and logged by the
     * consumer. They do not affect other contexts in the chain.
     */
    @Override
    void close();
}

Changes to ConsumerRecord

Add a newContext() method and a package-private constructor accepting ContextProcessors: (a package-private class in consumer/internals that encapsulates a list of context processors)

ConsumerRecord
// Package-private used by CompletedFetch / ShareCompletedFetch internally
ConsumerRecord(..., Optional<ContextProcessors> contextProcessors) {
    this.contextProcessors = contextProcessors;
}

// Public
public ConsumerRecordContext newContext() {
    if (contextProcessors.isEmpty()) {
        return ConsumerRecordContext.NO_OP;
    }
    return contextProcessors.get().createContext(this);
}

Proposed Changes

Overview

The aforementioned ConsumerRecordContextProcessor, a per-record complement to the batch-level ConsumerInterceptor. Each processor creates an AutoCloseable ConsumerRecordContext for deterministic setup/teardown via try-with-resources.

Configuration

PropertyTypeDefaultDescription
context.processorsLIST""ConsumerRecordContextProcessor class names, chained in declared order

Changes to ConsumerRecord

Existing public constructors are unchanged. contextProcessors defaults to Optional.empty(), newContext() returns NO_OP.

The ContextProcessors reference is threaded from KafkaConsumer through AbstractFetch -> CompletedFetch / ShareCompletedFetch -> ConsumerRecord.

Memory overhead: One object reference per record, pointing to the same shared ContextProcessors instance for all records from the same consumer. For a batch of 10,000 records, that's 80KB of pointers to one object. When no processors are configured, the reference is null, so no overhead.

newContext() may be called multiple times on the same record. Each call creates a fresh, independent ConsumerRecordContext. The method is a factory, not one-time initialization.

Chaining and Failure Semantics

Multiple processors are composed into a CompositeContext. Exceptions in createContext() or close() are caught and logged -- a failure in one processor never prevents others from executing. newContext() never throws.

Integration with ConsumerInterceptor

Complementary, not competing. Interceptors fire on poll() (batch level). Context processors fire when the application calls newContext() (per record, caller-driven). No deprecation of ConsumerInterceptor

Compatibility, Deprecation, and Migration Plan

  • Frameworks (Spring Kafka, SmallRye, Micronaut): Call record.newContext() in their listener containers / processing pipelines. Their existing per-record hooks can delegate to newContext() internally, giving applications client-level context processors for free.
  • Instrumentation libraries (OTel, Datadog): Ship a ConsumerRecordContextProcessor implementation. For framework-managed consumers (majority of production), no bytecode instrumentation needed. Framework calls newContext() ; For direct Kafka client users, OTel/DD can instrument newContext() as a single stable hook instead of wrapping iterators.
  • Applications: No changes required. Applications that don't configure context.processors see no behavioral change. Applications doing manual MDC/tracing setup can migrate incrementally by configuring a processor and removing the manual code.

Compatibility

Fully backwards compatible.  No protocol changes,  no changes to Consumer/ConsumerRecord public constructors. Default value for config.processors is empty.

Test Plan

  • Unit tests for processor configuration, instantiation, and composite chaining
  • Unit tests for newContext() with/without configured processors
  • Unit tests for multiple newContext() calls on the same record
  • Unit tests for exception isolation in multi-processor chains
  • Unit tests for thread-safety (concurrent newContext() from multiple threads)
  • Integration tests for context lifecycle through poll -> process -> close
  • Integration tests for multi-threaded dispatch with per-thread context

Rejected Alternatives

Document a recommended pattern and leave this at the framework level. That is the current setup giving us OTel, Datadog, Spring Kafka, SmallRye variations. Documenting the pattern doesn't give frameworks a common hook, doesn't let client-level config drive context setup, and doesn't fix the iterator-wrapping hacks in instrumentation libraries.

Extend ConsumerInterceptor with per-record methods. Interceptors fire during iteration inside CompletedFetch, not during processing. Adding onRecord() would have the same iteration-vs-processing conflation. Also mixes record transformation (onConsume returns modified records) with context lifecycle.

Consumer#newContext(record) instead of ConsumerRecord#newContext(). Couples context init to the consumer instance. Worker threads would need a Consumer reference which can be misleading since Consumer is generally considered not thread-safe. The record is the natural unit of dispatch.

Context as a wrapper around the record. Forces applications to work with a different type, breaking existing method signatures.

Callback-based lifecycle (onBefore/onAfter). Doesn't support async processing, no resource scoping, no try-with-resources.

  • No labels