...
Code Block |
---|
interface Producer {
/**
* Sends a list of specified offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* committed only if the transaction is committed successfully. The committed offset should
* be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
* <p>
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern. Thus, the specified
* {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used
* {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false}
* and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
* {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
*
* This API won't deprecate the existing {@link KafkaProducer#sendOffsetsToTransaction(Map, String) sendOffsets} API as standalone
* mode EOS applications are still relying on it. If the broker doesn't support the new underlying transactional API, the call will be automatically
* downgraded to ignore consumer metadata, while in the meantime a warning shall be logged.
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message
* format used for the offsets topic on the broker does not support transactions
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws org.apache.kafka.common.errors.IllegalGenerationException if the passed in consumer metadata has illegal generation
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if the passed in consumer metadata has a fenced group.instance.id
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
*/
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException, IllegalGenerationException, FencedInstanceIdException; // NEW
} |
...
- Same as stream, upgrade the broker version to 2.5
- Change `sendOffsetsToTransaction` to the new version. Note the service would crash if If the detected broker version through txn offset commit protocol is lower than 2.5, the underlying call will silently downgrade.
- Rolling bounce the application
...