DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: link
JIRA:
KAFKA-20384
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Kafka consumer's only extension point before records reach application code is ConsumerInterceptor.onConsume(), which operates on the entire ConsumerRecords batch. There is no per-record lifecycle hook.
This means the client has no concept of when an individual record's processing starts or ends. It knows when records are fetched and iterated, but iteration and processing are not the same thing i.e. a record may be iterated on the poll thread and dispatched to a worker thread, buffered for out of the loop processing.
This gap has forced every major framework and instrumentation library to independently build per-record processing hooks:
OpenTelemetry and Datadog wrap
Iterator<ConsumerRecord>to create per-record spans. The span opens onnext()and closes on the followinghasNext(). This treats iteration as processing and when records are dispatched to a thread pool, the span lifetime is microseconds (iteration), not the actual processing time. OTel's own source comments: "this may potentially create problems if this iterator is used from different threads. But at the moment we cannot do much about this."Spring Kafka built
RecordInterceptor<K, V>(since 2.7) with per-recordintercept(),success(),failure(), andafterRecord()hooks, but only available within Spring's container.SmallRye Reactive Messaging the engine behind the
quarkus-messaging-kafkaextension and the standard within Quarkus wraps each record inIncomingKafkaRecordwith per-recordack()/nack()and built-in OTel tracing per record, because the client doesn't provide it.Micronaut Kafka manages per-record lifecycle internally via
OffsetStrategy.SYNC_PER_RECORDand per-recordAcknowledgementinjection.
Five projects with the same root cause. This KIP provides the client-level hook that can provide some consolidation.
The primary beneficiaries are frameworks that control the processing loop - they call newContext() on behalf of the application, so tracing/MDC processors configured at the client level work across Spring, Quarkus, and Micronaut without framework-specific integration.
Use Cases
1. Distributed Tracing
Today, OTel/DD span lifetime = time between next() calls on the poll thread. With this KIP:
for (ConsumerRecord<K, V> record : records) {
executor.submit(() -> {
try (ConsumerRecordContext ctx = record.newContext()) {
process(record); // span covers actual processing, on the worker thread
}
});
}
2. Logging Context (MDC)
Today, every team manually sets/clears MDC fields per record with try/finally boilerplate. With a configured MDCContextProcessor, record.newContext() handles it.
3. Multi-Threaded Context Propagation
Today, teams manually plumb MDC maps and parent spans across thread boundaries or resort to hacks like Datadog's subList(i, i+1) workaround to trigger TracingList instrumentation on worker threads. With this KIP, record.newContext() works on any thread.
Public Interfaces
ConsumerRecordContextProcessor
package org.apache.kafka.clients.consumer;
/**
* A processor that establishes per-record processing context for consumer records.
*
* <p>Implementations are instantiated once during consumer creation via the
* {@code context.processors} configuration and receive the full consumer
* configuration through {@link #configure(Map)}. For each record processed,
* the user calls {@link #createContext(ConsumerRecord)} to obtain a fresh
* {@link ConsumerRecordContext} instance.
*
* <p>Processors are stateless with respect to individual records. Per-record
* state belongs on the {@link ConsumerRecordContext}, not the processor. The processor
* acts as a factory: initialized once, producing contexts per record.
*
* <p>Multiple processors may be configured and are chained in declared order.
* Each processor's context is created and closed independently.
*/
public interface ConsumerRecordContextProcessor extends Configurable, Closeable {
/**
* Create a new context for processing a single record. All setup (span creation, MDC injection,
* etc.) happens here. The returned context's {@link ConsumerRecordContext#close()} will be called
* when processing completes.
*
* @param record the record about to be processed
* @return a new ConsumerRecordContext instance; must not return null
*/
ConsumerRecordContext createContext(ConsumerRecord<?, ?> record);
}
Instantiated once during consumer creation via context.processors config. Stateless per record state belongs on the returned context.
ConsumerRecordContext
/**
* Per-record processing context with deterministic lifecycle.
*
* <p>A ConsumerRecordContext is created by
* {@link ConsumerRecordContextProcessor#createContext(ConsumerRecord)}, which performs all
* setup (span creation, MDC injection, etc.). The only remaining contract is
* {@link #close()}, called when processing completes.
*
* <p>The intended usage is with try-with-resources:
* <pre>{@code
* try (ConsumerRecordContext ctx = record.initContext()) {
* // process record sync or async
* }
* }</pre>
*
* <p>For async processing, the caller is responsible for closing the context
* when the async operation completes:
* <pre>{@code
* ConsumerRecordContext ctx = record.initContext();
* CompletableFuture.runAsync(() -> process(record))
* .whenComplete((r, ex) -> ctx.close());
* }</pre>
*/
@FunctionalInterface
public interface ConsumerRecordContext extends AutoCloseable {
ConsumerRecordContext NO_OP = () -> {};
/**
* Close this context and release any resources.
*
* <p>Called exactly once when processing completes. Implementations
* should clean up any state established during context creation.
*
* <p>Exceptions thrown from this method are caught and logged by the
* consumer. They do not affect other contexts in the chain.
*/
@Override
void close();
}
Changes to ConsumerRecord
Add a newContext() method and a package-private constructor accepting ContextProcessors: (a package-private class in consumer/internals that encapsulates a list of context processors)
// Package-private used by CompletedFetch / ShareCompletedFetch internally
ConsumerRecord(..., Optional<ContextProcessors> contextProcessors) {
this.contextProcessors = contextProcessors;
}
// Public
public ConsumerRecordContext newContext() {
if (contextProcessors.isEmpty()) {
return ConsumerRecordContext.NO_OP;
}
return contextProcessors.get().createContext(this);
}
Proposed Changes
Overview
The aforementioned ConsumerRecordContextProcessor, a per-record complement to the batch-level ConsumerInterceptor. Each processor creates an AutoCloseable ConsumerRecordContext for deterministic setup/teardown via try-with-resources.
Configuration
| Property | Type | Default | Description |
|---|---|---|---|
context.processors | LIST | "" | ConsumerRecordContextProcessor class names, chained in declared order |
Changes to ConsumerRecord
Existing public constructors are unchanged. contextProcessors defaults to Optional.empty(), newContext() returns NO_OP.
The ContextProcessors reference is threaded from KafkaConsumer through AbstractFetch -> CompletedFetch / ShareCompletedFetch -> ConsumerRecord.
Memory overhead: One object reference per record, pointing to the same shared ContextProcessors instance for all records from the same consumer. For a batch of 10,000 records, that's 80KB of pointers to one object. When no processors are configured, the reference is null, so no overhead.
newContext() may be called multiple times on the same record. Each call creates a fresh, independent ConsumerRecordContext. The method is a factory, not one-time initialization.
Chaining and Failure Semantics
Multiple processors are composed into a CompositeContext. Exceptions in createContext() or close() are caught and logged -- a failure in one processor never prevents others from executing. newContext() never throws.
Integration with ConsumerInterceptor
Complementary, not competing. Interceptors fire on poll() (batch level). Context processors fire when the application calls newContext() (per record, caller-driven). No deprecation of ConsumerInterceptor
Compatibility, Deprecation, and Migration Plan
- Frameworks (Spring Kafka, SmallRye, Micronaut): Call
record.newContext()in their listener containers / processing pipelines. Their existing per-record hooks can delegate tonewContext()internally, giving applications client-level context processors for free. - Instrumentation libraries (OTel, Datadog): Ship a
ConsumerRecordContextProcessorimplementation. For framework-managed consumers (majority of production), no bytecode instrumentation needed. Framework callsnewContext(); For direct Kafka client users, OTel/DD can instrumentnewContext()as a single stable hook instead of wrapping iterators. - Applications: No changes required. Applications that don't configure
context.processorssee no behavioral change. Applications doing manual MDC/tracing setup can migrate incrementally by configuring a processor and removing the manual code.
Compatibility
Fully backwards compatible. No protocol changes, no changes to Consumer/ConsumerRecord public constructors. Default value for config.processors is empty.
Test Plan
- Unit tests for processor configuration, instantiation, and composite chaining
- Unit tests for
newContext()with/without configured processors - Unit tests for multiple
newContext()calls on the same record - Unit tests for exception isolation in multi-processor chains
- Unit tests for thread-safety (concurrent
newContext()from multiple threads) - Integration tests for context lifecycle through poll -> process -> close
- Integration tests for multi-threaded dispatch with per-thread context
Rejected Alternatives
Document a recommended pattern and leave this at the framework level. That is the current setup giving us OTel, Datadog, Spring Kafka, SmallRye variations. Documenting the pattern doesn't give frameworks a common hook, doesn't let client-level config drive context setup, and doesn't fix the iterator-wrapping hacks in instrumentation libraries.
Extend ConsumerInterceptor with per-record methods. Interceptors fire during iteration inside CompletedFetch, not during processing. Adding onRecord() would have the same iteration-vs-processing conflation. Also mixes record transformation (onConsume returns modified records) with context lifecycle.
Consumer#newContext(record) instead of ConsumerRecord#newContext(). Couples context init to the consumer instance. Worker threads would need a Consumer reference which can be misleading since Consumer is generally considered not thread-safe. The record is the natural unit of dispatch.
Context as a wrapper around the record. Forces applications to work with a different type, breaking existing method signatures.
Callback-based lifecycle (onBefore/onAfter). Doesn't support async processing, no resource scoping, no try-with-resources.