...
In order to pass the group generationId to `initTransaction`, we need to expose it from the consumer. We propose to add an overload to onPartitionsAssigned
in the consumer's rebalance listener interfacea new function call on consumer to expose the generation info:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface ConsumerRebalanceListener { default void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) { onPartitionsAssigned(partitions); } }Generation generation(); |
With this proposal, the transactional id is no longer needed for proper fencing, but the coordinator still needs a way to identify producers individually as they are executing new transactions. There are two options: continue using the transactional id or use the producer id which is generated by the coordinator in InitProducerId. Either way, the main challenge is authorization. We currently use the transaction Id to authorize transactional operations. In this KIP, we will keep this model unchangedcould instead utilize the consumer group id for authorization. The producer must still provide a transactional Id , but now the only requirement is that it is defined uniquely for each producer in the application. It is no longer tied to exactly once guaranteesif it is working on standalone mode though.
We also need to change the on-disk format for transaction state in order to persist both the consumer group id and the assigned partitions. We propose to use a separate record type in order to store the group assignment. Transaction state records will not change.
...
Code Block | ||
---|---|---|
| ||
String consumerGroupId = "group"; Set<String> topics = buildSubscription(); KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId)); KafkaProducer producer = new KafkaProducer(buildProducerConfig(groupId)); // passing in consumer group id producer.initTransactions(new GroupAssignment()); while (true) { // Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // Do some processing and build the records we want to produce List<ProducerRecord> processed = process(consumed); // Write the records and commit offsets under a single transaction producer.beginTransaction(); for (ProducerRecord record : processed) producer.send(record); try { producer.beginTransaction(.sendOffsetsToTransaction(consumedOffsets, consumer.generation().generationId); for } catch (ProducerRecordIllegalGenerationException record : processed) producer.send(record); producer.sendOffsetsToTransaction(consumedOffsets, groupId);e) { throw e; // fail the zombie member if generation doesn't match } producer.commitTransaction(); } |
The main points are the following:
- Consumer group id becomes a config value on producer.
- Generation.id will be used for group coordinator fencingThe new
initTransactions
API is used in the ConsumerRebalanceListener passed tosubscribe
. - We no longer need to close the producer after a rebalance. We can call
initTransactions
multiple times.
Compatibility, Deprecation, and Migration Plan
...