...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class KafkaTransactionsExample { public static void main(String args[]) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); producer.initTransactions(); while(true) { ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); if (!records.isEmpty()) { producer.beginTransaction(); List<ProducerRecord<String, String>> outputRecords = processRecords(records); for (ProducerRecord<String, String> outputRecord : outputRecords) { producer.send(outputRecord); } sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets()); producer.endTransaction(); } } } } |
Transaction coordinator uses the initTransactions
API currently in As one could see, the first thing when a producer starts up is to register its identity through initTransactions
API. Transaction coordinator leverages this step in order to fence producers using the same transactional.id and to ensure that previous transactions have been completedmust complete. In the above template, we call consumer.poll() to get data, and internally for the very first time we start doing so, consumer needs to know the input topic offset. This is done by a FetchOffset call to group coordinator. With transactional processing, there could be offsets that are "pending", I.E they are part of some ongoing transactions. Upon receiving FetchOffset request, broker will export offset position to the "latest stable offset" (LSO), which is the largest offset that has already been committed when consumer isolation.level is `read_committed`. Since we rely on unique transactional.id to revoke stale transaction, we believe any pending transaction will be aborted when producer calls initTransaction again. During normal use case such as Kafka Streams, we will also explicitly close producer to send out a EndTransaction request to make sure we start from clean state.
...
We shall set `transaction.timout.ms` default to 10000 ms (10 seconds).
Offset Fetch Request
...
We need to bump OffsetFetchRequest version to include isolation level:
Code Block |
---|
OffsetFetchRequest => Partitions GroupId IsolationLevel
Partitions => List<TopicPartition>
GroupId => String
IsolationLevel => int8 // NEW |
With setting the isolation.level to `read_committed`, consumer will be required to back-off until all pending offsets are cleared.
Fence Zombie
A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of generation client.
...