...
Code Block | ||
---|---|---|
| ||
String consumerGroupId = "group"; Set<String> topics = buildSubscription(); KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId)); KafkaProducer producer = new KafkaProducer(buildProducerConfig(groupId)); // passing in consumer group id as transactional.group.id producer.initTransactions(new GroupAssignment()); consumer.subscribe(topics, new ConsumerRebalanceListener() { void onPartitionsAssigned(Collection<TopicPartition> partitions) { // some operation } }); while (true) { Generation generation = consumer.generation(); // Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // Do some processing and build the records we want to produce List<ProducerRecord> processed = process(consumed); // Write the records and commit offsets under a single transaction producer.beginTransaction(); for (ProducerRecord record : processed) producer.send(record); try { producer.sendOffsetsToTransaction(consumedOffsets, consumer.generation().generationId); } catch (IllegalGenerationException e) { throw e; // fail the zombie member if generation doesn't match } producer.commitTransaction(); } |
...