...
Acknowledgements errors are delivered to a new kind of callback called an acknowledgement commit callback which can optionally be registered with a KafkaShareConsumer.wakeup()
.
If the application uses
KafkaShareConsumer.commitSync()
to commit its acknowledgements, the results of the acknowledgements is returned to the applicationIf the application uses
KafkaShareConsumer.commitAsync()
orKafkaShareConsumer.poll(Duration)
to commit its acknowledgements, the results of the acknowledgements are only delivered if there is an acknowledgement commit callback registered.
The acknowledgement commit callback is called on the application thread and it is not permitted to call the methods of KafkaShareConsumer
with the exception of KafkaShareConsumer.wakeup()
. It can be called during any of KafkaShareConsumer.poll()
, KafkaShareConsumer.commitAsync()
, KafkaShareConsumer.commitSync()
and KafkaShareConsumer.close()
.
Example - Acknowledging a batch of records (implicit acknowledgement)
...
In this example, each record processed is separately acknowledged using a call to the new KafkaShareConsumer
.acknowledge(ConsumerRecord, AcknowledgeType)
method. The AcknowledgeType
argument indicates whether the record was processed successfully or not. In this case, the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error such as a deserialization semantic error, this is appropriate. For a transient error which might not affect a subsequent processing attempt, the AcknowledgeType.RELEASE
is more appropriate because the record remains eligible for further delivery attempts.
...
The batch contains no records, in which case the application just polls again. The call to
KafkaShareConsumer
.commitAsync()
just does nothing because the batch was empty.All of the records in the batch are processed successfully. The calls to
KafkaShareConsumer
.acknowledge(ConsumerRecord, AcknowledgeType.ACCEPT)
marks mark all records in the batch as successfully processed.One of the records encounters an exception. The call to
KafkaShareConsumer
.acknowledge(ConsumerRecord, AcknowledgeType.REJECT)
rejects that record. Earlier records in the batch have already been marked as successfully processed. The call toKafkaShareConsumer
.commitAsync()
commits the acknowledgements, but the records after the failed record remain Acquired as part of the same delivery attempt and will be presented to the application in response to another poll.
...