...
The main addition of this KIP is a new variant of the current initTransactions
API current sendOffsetsToTransaction API which gives us access to the consumer group states, such as member state and generation.id.
Code Block |
---|
interface Producer { /** * Should pass in the entire consumer state for new API. */ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException, IllegalGenerationException; // NEW } |
...
We shall set `transaction.timout.ms` default to 10000 ms (10 seconds) on Kafka Streams. For non-stream users, we highly recommend you to do the same if you want to use the new semantics.
Offset Fetch Request
We will add a new error code for consumer to wait for pending transaction clearance. In order to be able to return corresponding exceptions for old/new clients, we shall also bump the OffsetFetch protocol version.
...
In the meantime, this offset fetch back-off should be only applied to EOS use cases, not general offset fetch use case such as admin client access. we We shall also define a flag within offset fetch request so that we only trigger back-off logic when the request is involved in EOS, I.E on isolation level read_committed.
...
To help get access to consumer state for txn producer, consumer will expose a new API for some of its internal states as an opaque struct. This is already done by KIP-429, and we just take showcase the some high level information structure here for convenience.
Code Block | ||
---|---|---|
| ||
ConsumerGroupMetadata { public String groupId(); public int generationId(); public String memberId(); public Optional<String> groupInstanceId(); } |
...
And here is a recommended new transactional API non-EOS usage example:
Code Block | ||
---|---|---|
| ||
Set<String> topics = buildSubscription(); KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); KafkaProducer producer = new KafkaProducer(); producer.initTransactions(); while (true) { // Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // Do some processing and build the records we want to produce List<ProducerRecord> processed = process(consumed); // Write the records and commit offsets under a single transaction producer.beginTransaction(); for (ProducerRecord record : processed) producer.send(record); // Pass the entire consumer group metadata producer.sendOffsetsToTransaction(consumedOffsets, consumer.getMetadata()); producer.commitTransaction(); } |
...
Following the above principle, Kafka Streams uses version probing to solve the perform online upgrade problem. Step by step guides are:
- Broker must be upgraded to 2.4 first. This means the `inter.broker.protocol.version` (IBP) has to be set to the latest. Any produce request with higher version will automatically get fenced because of no support.
- Upgrade the stream application binary and choose to set UPGRADE_FROM_CONFIG config to 2.3 or lower. Do the first rolling bounce, and make sure the group is stable with every instance on 2.4 binary.
- Just removeRemove/unset that config, to make application point UPGRADE_FROM_CONFIG, in order to point the application to actual Kafka client version 2.4. Do a second rolling bounce and now the application officially starts using new thread producer for EOS.
...