DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
The calls to KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) are simply updating the state map in the KafkaConsumer. It is only once KafkaShareConsumer.commitAsync() is called that the acknowledgements are committed by sending the new state information to the share-partition leader.
Example - Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)
In this example, the application stops processing the batch when it encounters an exception.
| Code Block | ||
|---|---|---|
| ||
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myshare");
props.setProperty("share.acknowledgement.mode", "explicit");
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Return a batch of acquired records
for (ConsumerRecord<String, String> record : records) {
try {
doProcessing(record);
consumer.acknowledge(record, AcknowledgeType.ACCEPT); // Mark the record as processed successfully
} catch (Exception e) {
consumer.acknowledge(record, AcknowledgeType.REJECT); // Mark this record as unprocessable
}
}
consumer.commitAsync(); // Commit the acknowledgements of the acknowledged records only
} |
There are the following cases in this example:
The batch contains no records, in which case the application just polls again. The call to
KafkaShareConsumer.commitAsync()just does nothing because the batch was empty.All of the records in the batch are processed successfully. The calls to
KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType.ACCEPT)mark all records in the batch as successfully processed.One of the records encounters an exception. The call to
KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType.REJECT)rejects that record. Earlier records in the batch have already been marked as successfully processed. The call toKafkaShareConsumer.commitAsync()commits the acknowledgements.
Access control
Share group access control is performed on the GROUP resource type, just the same as consumer groups, with the same rules for the actions checked. A share group is just a new kind of group.
...