...
Transaction coordinator uses the initTransactions
API currently in order to fence producers using the same transactional.id and to ensure that previous transactions have been completed. In the above template, we call consumer.poll() to get data, and internally for the very first time we start doing so, consumer needs to know the input topic offset. This is done by a FetchOffset call to group coordinator. With transactional processing, there could be offsets that are "pending", I.E they are part of some ongoing transactiontransactions. Upon receiving FetchOffset request, broker will export offset position to the "latest stable offset" (LSO), which is the largest offset that has already been committed when consumer isolation.level is `read_committed`. Since we rely on unique transactional.id to revoke stale transaction, we believe any pending transaction will be aborted when producer calls initTransaction again. During normal use case such as Kafka Streams, we will also explicitly close producer to send out a EndTransaction request to make sure we start from clean state.
...
Code Block |
---|
interface Producer { /** * This API shall be called for consumer group aware transactional producers. */ void initTransactions(Consumer<byte[], byte[]> consumer); // NEW /** * No longer need to pass in the consumer group id in a case where we already get access to the consumer state. */ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) throws ProducerFencedException, IllegalGenerationException; // NEW } |
...
Shrink transactional.timeout
We shall set `transaction.timout.ms` default to 10000 ms (10 seconds).
...
Fence zombie
A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of generation client.
...
Code Block | ||
---|---|---|
| ||
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
KafkaProducer producer = new KafkaProducer();
// Will access consumer internal state. Only called once in the app's life cycle after first rebalance.
// Note that this is a blocking call until consumer successfully joins the group.
producer.initTransactions(consumer);
while (true) {
// Read some records from the consumer and collect the offsets to commit
ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);
// Do some processing and build the records we want to produce
List<ProducerRecord> processed = process(consumed);
// Write the records and commit offsets under a single transaction
producer.beginTransaction();
for (ProducerRecord record : processed)
producer.send(record);
producer.sendOffsetsToTransaction(consumedOffsets);
producer.commitTransaction();
} |
...
- Broker must be upgraded to 2.4 first. This means the `inter.broker.protocol.version` (IBP) has to be set to the latest. Any produce request with higher version will automatically get fenced because of no support.
- Upgrade the stream application binary and choose to set UPGRADE_FROM_CONFIG config to 2.3 or lower. Do the first rolling bounce, and make sure the group is stable with every instance on 2.4 binary.
- Remove Just remove/unset that config, to make application point to actual Kafka client version 2.4. Do the second rolling bounce and now the application officially starts using new thread producer for EOS.
The reason for doing two rolling bounces is because the old transactional producer doesn’t have access to consumer generation, so group coordinator doesn’t have an effective way to fence old zombies. By doing first rolling bounce, the task producer will also opt in accessing the consumer state and send TxnOffsetCommitRequest with generation. With this foundational change, it is much safer to execute step 3.
...