...
Code Block | ||
---|---|---|
| ||
Set<String> topics = buildSubscription(); KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId))new KafkaConsumer<>(consumerConfig); KafkaProducer producer = new KafkaProducer(); // Will access consumer.subscribe(topics, new ConsumerRebalanceListener() { void onPartitionsAssigned(Collection<TopicPartition> partitions) { internal state. Only called once in the app's life cycle after first rebalance. // WillNote accessthat consumerthis internalis state.a Only called once in the app's life cycle after first rebalance. blocking call until consumer successfully joins the group. producer.initTransactions(consumer); } }); while (true) { // Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // Do some processing and build the records we want to produce List<ProducerRecord> processed = process(consumed); // Write the records and commit offsets under a single transaction producer.beginTransaction(); for (ProducerRecord record : processed) producer.send(record); try { producer.sendOffsetsToTransaction(consumedOffsets); } catch (IllegalGenerationException e) { throw e; // fail the zombie member if generation doesn't match } producer.commitTransaction(); } |
The main points are the following:
- Consumer group id becomes a config value on producerProducer calling new initTransaction will be blocked until consumer successfully joins the group and get valid generation for the first time.
- Generation.id will be used for group coordinator fencing.
- We no longer need to close the producer after a rebalance.
...