...
Code Block | ||||
---|---|---|---|---|
| ||||
ConsumerGroupMetadata getMetadatagroupMetadata(); |
So that EOS users could get refreshed group metadata as needed.
...
Code Block |
---|
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId TransactionalId => String GroupId => String ProducerId => int64 ProducerEpoch => int16 Offsets => Map<TopicPartition, CommittedOffset> GenerationId => int32, default -1 // NEW MemberId => nullable String // NEW GroupInstanceId => nullable String // NEW |
If one of the field is not matching correctly on server side, the client will be fenced immediately. An edge case is defined as:
...
Code Block | ||
---|---|---|
| ||
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); KafkaProducer producer = new KafkaProducer(); producer.initTransactions(); while (true) { // Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // Do some processing and build the records we want to produce List<ProducerRecord> processed = process(consumed); // Write the records and commit offsets under a single transaction producer.beginTransaction(); for (ProducerRecord record : processed) producer.send(record); // Pass the entire consumer group metadata producer.sendOffsetsToTransaction(consumedOffsets, consumer.getMetadatagroupMetadata()); producer.commitTransaction(); } |
...