...
Code Block |
---|
interface Producer { /** * ThisShould APIpass shallin bethe called forentire consumer groupstate awarefor transactionalnew producersAPI. */ void initTransactions(Consumer<byte[], byte[]> consumer)sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException, IllegalGenerationException; // NEW /** * No longer need to pass in the consumer group id in a case where we already get access to the consumer state. */ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) throws ProducerFencedException, IllegalGenerationException; // NEW } |
Shrink transactional.timeout
} |
Shrink transactional.timeout
We shall set `transaction.timout.ms` default to 10000 ms (10 seconds) on Kafka Streams.
...
Code Block | ||
---|---|---|
| ||
Set<String> topics = buildSubscription(); KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); KafkaProducer producer = new KafkaProducer(); ConsumerGroupMetadata groupMetadata // Will access consumer internal state. Only called once in the app's life cycle. // Note that this is a blocking call until consumer successfully joins the group.= null; consumer.subscribe(topics, new ConsumerRebalanceListener() { void onGroupRejoined(ConsumerGroupMetadata updatedMetadata) { consumerGroupMetadata = updatedMetadata; } }); producer.initTransactions(consumer); while (true) { // Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // Do some processing and build the records we want to produce List<ProducerRecord> processed = process(consumed); // Write the records and commit offsets under a single transaction producer.beginTransaction(); for (ProducerRecord record : processed) producer.send(record); ) producer.send(record); if (groupMetadata == null) { throw new IllegalStateException("Consumer poll should be blocked until successfully joined the group and get updated metadata"); } producer.sendOffsetsToTransaction(consumedOffsets, groupMetadata); producer.commitTransaction(); } |
...
- User must be utilizing both consumer and producer as a complete EOS application,
- User needs to store transactional offsets inside Kafka group coordinator, not in any other external system for the sake of fencing,
- Need to call the new producer.initTransactions(consumer); which passes in a consumer struct for state access during initialization,
- Producer no longer needs to call sendOffsetsToTransaction(offsets, consumerGroupIdgroupMetadata) because we will to be able to access consumer group id internally. Instead just pass offsets as single parameter. fence properly
- User needs to configure a referencing copy of the group metadata.
Compatibility, Deprecation, and Migration Plan
...