DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Vote"
Discussion thread: Thread
Vote Thread: Thread
JIRA: KAFKA-17826
Motivation
The current design of the offsetsForTimes() method in KafkaConsumer exhibits clear flaws.
According to the documentation:
@return a mapping from partition to the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp. {@code null}will be returned for the partition if there is no such message.
This design introduces inconsistency risks: developers often assume that if Map.containsKey() returns true, the corresponding value is non-null. This can lead to unexpected NullPointerExceptions when null checks are omitted.
Code example:
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
// Dangerous operation - may cause NPE
offsets.forEach((tp, offset) -> consumer.seek(tp, offset.offset()));
// Developers need to write defensive code
offsets.forEach((tp, offset) -> {
if (offset != null) {
consumer.seek(tp, offset.offset());
}
});
Null can mean failure, can mean success, can mean almost anything. Using something other than null makes your meaning clear.
In Google's Guava documentation, it is stated that in application code, null often leads to confusion, hard-to-debug errors, and undesirable ambiguities. For example, when Map.get() returns null, it could indicate either that the value is explicitly null or that the key does not exist. Therefore, when designing API specifications, we should avoid passing null values.
At the language level, Java's newer SDK design discourages null values in collections (List, Set, Map). Recent APIs like Stream, Optional, and collection factory methods (List.of(), Set.of(), Map.of()) explicitly reject nulls.
At the specification level, the other similar methods in the consumer do not implement this approach, and the admin API doesn't either. This is the only method whose logic differs from the others.
Based on these reasons, we should revise this flawed design and avoid continuously using null as the value in a Map.
For most developers, this change will have minimal impact. The primary reason is that in typical usage scenarios, this method is used to retrieve offsets for the given partitions by timestamp. Therefore, the production code would look something like this:
if (returnedOffsets.get(topicPartition) == null) {
// do not do anything
} else {
// do something
}
In this scenario, having no key or a null value is effectively the same. Both cases represent the absence of data, and thus, developers would not be impacted by the removal of null values.
This is the only scenario that would be affected:
if (returnedOffsets.containsKey(topicPartition)) {
if (returnedOffsets.get(topicPartition) == null) {
// The scenarios that will be impacted
}
} else {
}
For a real-world example, we can see that developers typically do not check for null when using this. If the return value is null, it will result in a NullPointerException (NPE), posing a risk during development. example: https://github.com/issa-khodadadi/kafka-master.
long startOffset = consumer.beginningOffsets(Collections.singleton(topicPartition)).get(topicPartition);
long endOffset = consumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition);
details.put("offsets", Map.of("start", startOffset, "end", endOffset, "size", endOffset - startOffset));
But typical code doesn't use this approach in production, and I believe this style doesn't align with the design philosophy of Java maps. Given this point, such a change would have more advantages than disadvantages.
Public Interfaces
To ensure backward compatibility, we will preserve the existing public API signatures. The change will be limited to internal implementation logic and configuration controls.
Consumer
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout)
Admin
While the returned map entries are never null, a value of -1 is used to indicate the absence of an offset. However, from the user's perspective, -1 and null effectively carry the same meaning: no offset is available. To ensure consistency in how offsets are represented across the API, we propose to change this behavior and eliminate the use of -1.
default ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
return listOffsets(topicPartitionOffsets, new ListOffsetsOptions());
}
ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);
New Configuration
To help developers migrate more smoothly to the new consumer behavior, we are introducing a new configuration that controls whether these methods may return null entries. To avoid introducing a second deprecation cycle, this new configuration, allow.null.offsets.entries, will be marked as @Deprecated from the start.
By default, this configuration is set to true, preserving current behavior. However, a new warning log will be introduced to inform developers:
- For Consumer: "Kafka consumer APIs will stop returning null entries in version 5.0. To prepare for this change, refer to KIP-1140 and the
allow.null.offsets.entriesconfiguration documentation." - For Admin: "Kafka admin APIs will stop returning -1 entries in version 5.0. To prepare for this change, refer to KIP-1140 and the
allow.null.offsets.entriesconfiguration documentation."
@Deprecated(since = "4.1") public static final String ALLOW_NULL_OFFSETS_ENTRIES_CONFIG = "allow.null.offsets.entries"; @Deprecated(since = "4.1") public static final boolean DEFAULT_ALLOW_NULL_OFFSETS_ENTRIES = true;
Proposed Changes
This will be implemented in Kafka 4.X. For example,
/** * skip... * * <p> * This method does not change the current consumer position of the partitions. * <P> * * If the offset for a specific partition cannot be found or the timeout is zero, the corresponding value will be null. * <strong>Note: Kafka 5.0 will remove the behavior of returning null values.</strong> * skip... */ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
New/deprecated configuration in admin also implemented in Kafka 4.X. For example,
@Deprecated(since = "4.1")
public static final String ALLOW_NULL_OFFSETS_ENTRIES_CONFIG = "allow.null.offsets.entries";
@Deprecated(since = "4.1")
public static final boolean DEFAULT_ALLOW_NULL_OFFSETS_ENTRIES = true;
@Deprecated(since = "4.1")
private static final String ALLOW_NULL_OFFSETS_ENTRIES_DOC = "Specifies whether to allow null entries in certain public admin APIs. " +
"The following methods are affected: " +
"<code>listOffsets(Map)</code>, listOffsets(Map, ListOffsetsOptions). "
"If set to <code>true</code>, -1 entries are permitted, and the behavior of these methods remains unchanged. " +
"If set to <code>false</code>, -1 entries are not allowed, and these methods will not return -1 values. " +
"This configuration acts as a transitional bridge. Starting from Kafka 5.0, these methods will no longer return -1 " +
"entries by default, and this configuration will be removed.";
New/deprecated configuration in consumer also implemented in Kafka 4.X. For example,
@Deprecated(since = "4.1")
private static final String ALLOW_NULL_OFFSETS_ENTRIES_DOC = "Specifies whether to allow null entries in certain public consumer APIs. " +
"The following methods are affected: " +
"<code>Consumer.committed(Set)</code>, <code>Consumer.committed(Set, Duration)</code>,"
"<code>Consumer.offsetsForTimes(Map)</code>, <code>Consumer.offsetsForTimes(Map, Duration)</code>, " +
"<code>Consumer.beginningOffsets(Collection)</code>, <code>Consumer.beginningOffsets(Collection, Duration)</code>, " +
"<code>Consumer.endOffsets(Collection)</code>, and <code>Consumer.endOffsets(Collection, Duration)</code>. " +
"If set to <code>true</code>, null entries are permitted, and the behavior of these methods remains unchanged. " +
"If set to <code>false</code>, null entries are not allowed, and these methods will not return null values. " +
"This configuration acts as a transitional bridge. Starting from Kafka 5.0, these methods will no longer return null " +
"entries by default, and this configuration will be removed.";
This feature is planned for Kafka 4.x. For instance, in the case of offsetsForTimes(), as mentioned earlier, the method will rely on a boolean tag to determine whether null values should be returned.
static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult(
final Map<TopicPartition, Long> timestampsToSearch,
final Map<TopicPartition, ListOffsetData> fetchedOffsets,
BiFunction<TopicPartition, ListOffsetData, OffsetAndTimestamp> resultMapper,
boolean allowNullOffestsEntries
) {
HashMap<TopicPartition, OffsetAndTimestamp> offsetsResults = new HashMap<>(timestampsToSearch.size());
if (allowNullOffestsEntries) {
// initialize the return map with the null value
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet())
offsetsResults.put(entry.getKey(), null);
}
for (Map.Entry<TopicPartition, ListOffsetData> entry : fetchedOffsets.entrySet()) {
ListOffsetData offsetData = entry.getValue();
// assign the value into result map.
offsetsResults.put(entry.getKey(), resultMapper.apply(entry.getKey(), offsetData));
}
return offsetsResults;
}
In Kafka 5.0, We could eliminate the for loop that initializes the map with null values. Instead, we should wrap the result map with the fetchOffsets data.
static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult(
final Map<TopicPartition, ListOffsetData> fetchedOffsets,
BiFunction<TopicPartition, ListOffsetData, OffsetAndTimestamp> resultMapper) {
HashMap<TopicPartition, OffsetAndTimestamp> offsetsResults = new HashMap<>(fetchedOffsets.size());
for (Map.Entry<TopicPartition, ListOffsetData> entry : fetchedOffsets.entrySet()) {
ListOffsetData offsetData = entry.getValue();
offsetsResults.put(entry.getKey(), resultMapper.apply(entry.getKey(), offsetData));
}
return offsetsResults;
}
Compatibility, Deprecation, and Migration Plan
While the method signatures remain unchanged, several internal classes will require updates to eliminate null values from the returned maps.
ClassicKafkaConsumer- Asynckafkaconsumer
- OffsetFetcher
- OffsetFetcherUtils
While the method signatures remain unchanged, several internal classes will require updates to eliminate -1 values from the returned maps.
- Admin
These modifications will primarily focus on:
- Adjusting the underlying logic to prevent
nullvalues from ever being introduced into the returned maps. - Adjusting the underlying logic to prevent
-1values from ever being introduced into the returned maps.
The Migration plan will be:
- Update the existing Java document to explain why this method returns
null, -1and explicitly note that thenull-returning behavior will be removed in Kafka 5.0. - Introduce a new configuration, allow.null.offsets.entries, to control whether null entries are returned. This config is marked as @Deprecated from the beginning.
- When Kafka 5.0 is released, remove this Java document and eliminate the
null-returning logic from the code. - Delete the allow.null.offsets.entries configuration and all associated handling.
Test Plan
In Kafka 4.x, we need to test the new configuration and ensure that it does not affect existing tests.
In Kafka 5.0, we should update all related tests to ensure these methods no longer return null, -1 values.
Rejected Alternatives
Using Java Optional instead raw class in Map value, there are some consideration:
Advantage:
- Users will focus to handle cases where value don't exist
- Users will retrieve the same TopicPartition key as old method
Disadvantage:
- Adds an extra wrapper layer, making code more verbose
- May cause confusion: Maps already have mechanisms for handling missing values. (such as getOrDefault(), computeIfAbsent())
- Still mean less, this can mean success, can mean fail.
Add new method which don't return null value, there are some consideration:
Advantage:
- Users can use these non-null returning methods since Kafka 4.X."
Disadvantage:
- Adding new methods just to avoid returning null would be too big of a change, the cost is disproportionate to the benefits.
Add new method which can control the return null value without deprecate, there are some consideration:
Advantage:
- Users can use these non-null returning methods since Kafka 4.X."
Disadvantage:
- This approach would extend the deprecation cycle, so we have decided to reject it.