...
Code Block | ||
---|---|---|
| ||
Set<String> topics = buildSubscription();
KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
KafkaProducer producer = new KafkaProducer();
consumer.subscribe(topics, new ConsumerRebalanceListener() {
void onPartitionsAssigned(Collection<TopicPartition> partitions) {
producer.initTransactions(consumer); // Will access consumer internal state. Only called once in the app's life cycle.
}
});
while (true) {
// Read some records from the consumer and collect the offsets to commit
ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);
// Do some processing and build the records we want to produce
List<ProducerRecord> processed = process(consumed);
// Write the records and commit offsets under a single transaction
producer.beginTransaction();
for (ProducerRecord record : processed)
producer.send(record);
try {
producer.sendOffsetsToTransaction(consumedOffsets);
} catch (IllegalGenerationException e) {
throw e; // fail the zombie member if generation doesn't match
}
producer.commitTransaction();
} |
...