Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Minor corrections for acknowledgement commit callback

...

Acknowledgements errors are delivered to a new kind of callback called an acknowledgement commit callback which can optionally be registered with a KafkaShareConsumer.wakeup() .

  • If the application uses KafkaShareConsumer.commitSync() to commit its acknowledgements, the results of the acknowledgements is returned to the application

  • If the application uses KafkaShareConsumer.commitAsync()  or KafkaShareConsumer.poll(Duration) to commit its acknowledgements, the results of the acknowledgements are only delivered if there is an acknowledgement commit callback registered.

The acknowledgement commit callback is called on the application thread and it is not permitted to call the methods of KafkaShareConsumer  with the exception of KafkaShareConsumer.wakeup() . It can be called during any of KafkaShareConsumer.poll() , KafkaShareConsumer.commitAsync() , KafkaShareConsumer.commitSync()  and KafkaShareConsumer.close() .

Example - Acknowledging a batch of records (implicit acknowledgement)

...

In this example, each record processed is separately acknowledged using a call to the new KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method. The AcknowledgeType argument indicates whether the record was processed successfully or not. In this case, the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error such as a deserialization semantic error, this is appropriate. For a transient error which might not affect a subsequent processing attempt, the AcknowledgeType.RELEASE is more appropriate because the record remains eligible for further delivery attempts.

...

  1. The batch contains no records, in which case the application just polls again. The call to KafkaShareConsumer.commitAsync() just does nothing because the batch was empty.

  2. All of the records in the batch are processed successfully. The calls to KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType.ACCEPT) marks mark all records in the batch as successfully processed.

  3. One of the records encounters an exception. The call to KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType.REJECT) rejects that record. Earlier records in the batch have already been marked as successfully processed. The call to KafkaShareConsumer.commitAsync() commits the acknowledgements, but the records after the failed record remain Acquired as part of the same delivery attempt and will be presented to the application in response to another poll.

...