...
Code Block | ||||
---|---|---|---|---|
| ||||
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId); // DEPRECATED void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets); // NEW |
The key to this proposal is allowing a single transaction coordinator to see all members working in the same application group. It can then maintain the invariant that only one producer is allowed to make progress at any time for a particular input partition. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to We could effectively deprecate the `transactional.id` config because we no longer use it for revoking ongoing transactions. Instead we would stick to consumer group id. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to support lookup of the transaction coordinator using the consumer group Id. Second, we need to extend the InitProducerId API to support consumer group aware initialization.
...
Code Block |
---|
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ConsumerGroupId AssignedPartitions TransactionalId => NullableString TransactionTimeoutMs => Int64 ConsumerGroupId => NullableString // NEW ConsumerGroupGeneration => Int32 // NEW AssignedPartitions InitProducerIdResponse => ThrottleTimeMs [Topic [Partition]] // NEW Topic => String Partition => Int32 InitProducerIdResponse => ThrottleTimeMs ErrorCode ErrorCode ProducerId ProducerEpoch ThrottleTimeMs => Int64 ErrorCode => Int16 ProducerId => Int64 ProducerEpoch => Int16 |
The new InitProducerId API accepts either a user-configured transactional Id or a consumer group Id and a generation id. When a consumer group is provided, the transaction coordinator will check whether there are any ongoing transactions that include the assigned partitions. If there are, these transactions will be aborted and the corresponding producers will be fenced by bumping their epochs. Once transactions are complete, the call will return.honor consumer group id and allocate a new producer.id every time initTransaction is called.
Fencing zombie
A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we include the consumer group generation. Once the coordinator observes a generation bump for a group, it will refuse to handle requests from the previous generation. The only thing other group members can do is call InitProducerId themselves. This in fact would be the common case since transactions will usually be completed before a consumer joins a rebalance.
...
And to avoid concurrent processing due to upgrade, we also want to introduce an exception to let consumer back off:
Code Block | ||||
---|---|---|---|---|
| ||||
PENDING_TRANSACTION(86, "There are pending transactions that need to be cleared before proceeding.", PendingTransactionException::new), |
Will discuss in more details in Compatibility section.
Example
Below we provide an example of a simple read-process-write loop with consumer group-aware EOS processing.
let consumer back off:
Code Block | ||||
---|---|---|---|---|
| ||||
PENDING_TRANSACTION(86, "There are pending transactions that need to be cleared before proceeding.", PendingTransactionException::new), |
Will discuss in more details in Compatibility section.
Example
Below we provide an example of a simple read-process-write loop with consumer group-aware EOS processing.
Code Block | ||
---|---|---|
| ||
String consumerGroupId = "group";
Set<String> topics = buildSubscription();
KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
KafkaProducer producer = new KafkaProducer(buildProducerConfig(groupId)); // passing in consumer group id
producer.initTransactions(new GroupAssignment() | ||
Code Block | ||
| ||
String consumerGroupId = "group";
Set<String> topics = buildSubscription();
KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
KafkaProducer producer = new KafkaProducer(buildProducerConfig());
consumer.subscribe(topics, new ConsumerRebalanceListener() {
void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
// On assignment, call initTransactions() in order to ensure any
// transactions involving committed offsets from the assigned partitions
// have been completed
producer.initTransactions(new TxnProducerIdentity(partitions, consumerGroupId, generationId));
}
});
while (true) {
// Read some records from the consumer and collect the offsets to commit
ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);
// Do some processing and build the records we want to produce
List<ProducerRecord> processed = process(consumed);
// Write the records and commit offsets under a single transaction
producer.beginTransaction();
for (ProducerRecord record : processed)
producer.send(record);
producer.sendOffsetsToTransaction(consumedOffsets, groupId);
producer.commitTransaction();
} |
...