...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Get the last committed offset for the given partition (whether the commit happened by this process or * another). This offset will be used as the position for the consumer in the event of a failure. * <p> * This call will block to do a remote call to get the latest committed offsets from the server. * * @param partition The partition to check + * @param timeout The maximum duration of the method + * @param timeunit The unit of time timeout refers to * @return The last committed offset and metadata or null if there was no prior commit * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws TimeoutException if method exceeds maximum given time */ @Override public OffsetAndMetadata committed(TopicPartition partition, final long timeout, final Timeunit timeunit) { acquireAndEnsureOpen(); final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit); try { Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition), totalWaitTime); return offsets.get(partition); } finally { release(); } } /** * Commit the specified offsets for the specified list of topics and partitions. * <p> * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. The committed offset should be the next message your application will consume, * i.e. lastProcessedMessageOffset + 1. * <p> * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is * encountered (in which case it is thrown to the caller). * <p> * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. * * @param offsets A map of offsets by partition with associated metadata + * @param timeout Maximum duration of methof + * @param timeunit The unit of time which timeout refers to * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried. * This can only occur if you are using automatic group management with {@link #subscribe(Collection)}, * or if there is an active group with the same groupId which is using group management. * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws java.lang.IllegalArgumentException if the committed offset is negative * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata * is too large or if the topic does not exist). */ @Override public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final long timeout, final Timeunit timeunit) { final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit); acquireAndEnsureOpen(); try { coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime); } finally { release(); } } |
Proposed Changes
Another change shall be made to KafkaConsumer#poll()
, due to its call to updateFetchPositions()
which blocks indefinitely. To combat this, poll() will automatically return
if the time limit is hit.
Proposed Changes
whose calls in Kafka internals will block indefinitely. This is a similar problems to what other methods (such as commitSync()
, committed
(), and position()
). To avoid blocking indefinitely, the new methods will take user-determined timeout to define the maximum amount of time for which a method blocks.
Regarding the policy of what happens when time limit is exceeded:
1. KafkaConsumer#poll()
, since it returns offsets, will return nothing.
2. A TimeoutException will be thrown if time spent of position()
method exceeds requestTimeoutMs
on other time-bounded methods.
Compatibility, Deprecation, and Migration Plan
...