...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class MyProcessor implements Processor<KEY_TYPE, VALUE_TYPE> { private ProcessorContext context; @Override public void init(final ProcessorContext context) { this.context = context; } @Override void process(final KEY_TYPE key, final VALUE_TYPE value) { this.context.offset(); // returns the current record's offset // you can also access #partition(), #timestamp(), and #topic() } // other methods omitted for brevity } |
Why do I get an IllegalStateException
when accessing record metadata?
If you attach a new Processor/Transformer/ValueTransformer
to your topology using a corresponding supplier, you need to make sure that the supplier returns a new instance each time get()
is called. If you return the same object, a single Processor/Transformer/ValueTransformer
would be shared over multiple tasks resulting in an IllegalStateException
with error message "This should not happen as topic() should only be called while a record is processed"
(depending on the method you are calling it could also be partition()
, offset()
, or timestamp()
instead of topic()
).
Producers
How should I set metadata.broker.list?
...