DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
DeleteRecords advances a partition's logStartOffset to free storage. Today the request only completes after every alive replica has truncated past the requested offset. There is no way to ask for weaker acknowledgement.
This model creates two problems:
A single slow replica stalls the request. If one alive replica is slow to fetch (GC pause, slow disk, transient network), the partition's low watermark cannot advance and the leader holds the request in delayedDeleteRecordsPurgatory until it times out — even though the leader has already truncated locally and the cluster would have eventually finished.
The model is over-strict for transient data. Bulk retention-sweep tooling over many partitions, and transit topics whose source of truth lives elsewhere, use DeleteRecords for operational trimming, not as a durability boundary. For these callers, waiting on every follower is pure overhead — and when one follower stalls, it is an availability hit for no incremental safety.
The existing TimeoutMs does not provide async delete: Setting a small TimeoutMs on a v2 request does not turn DeleteRecords into a leader-only operation — it only bounds how long the leader waits in purgatory before giving up. When the timeout fires, the broker returns REQUEST_TIMED_OUT to the client, even though the leader has already truncated locally. Callers cannot distinguish "leader truncated, followers slow" from "leader failed to truncate at all", so they have no safe way to treat the timeout as success. The timeout is a deadline, not an acknowledgement mode; achieving async semantics requires an explicit signal in the request and a successful response carrying the leader's new logStartOffset.
why LeaderOnly: bool, not acks-style
A natural question is whether the new field should mirror Produce's acks (int8). The boolean shape is the right fit for two reasons specific to DeleteRecords:
DeleteRecords always needs a response. Unlike Produce, where acks=0 enables fire-and-forget for high-throughput writers, DeleteRecords is an admin operation: the caller needs per-partition success/error and the resulting offset to make any follow-up decision. A fire-and-forget mode is not just unused here — it does not make sense for this API. Borrowing the acks shape would advertise a wire value (acks=0) that the broker must always reject with INVALID_REQUIRED_ACKS.
Public Interfaces
DeleteRecordsRequest.json gains a LeaderOnly boolean
diff --git a/clients/src/main/resources/common/message/DeleteRecordsRequest.json b/clients/src/main/resources/common/message/DeleteRecordsRequest.json
index 969efd63e9..5e97d3350a 100644
--- a/clients/src/main/resources/common/message/DeleteRecordsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteRecordsRequest.json
@@ -21,7 +21,10 @@
// Version 1 is the same as version 0.
// Version 2 is the first flexible version.
- "validVersions": "0-2",
+
+ // Version 3 adds the LeaderOnly field (KIP-XXXX) to allow callers to opt in
+ //
+ "validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "Topics", "type": "[]DeleteRecordsTopic", "versions": "0+",
@@ -37,6 +40,8 @@
]}
]},
{ "name": "TimeoutMs", "type": "int32", "versions": "0+",
- "about": "How long to wait for the deletion to complete, in milliseconds." }
+ "about": "How long to wait for the deletion to complete, in milliseconds." },
+ { "name": "LeaderOnly", "type": "bool", "versions": "3+", "default": "false",
+ "about": "If false (default), the response is returned only after every alive replica has truncated to the requested offset. If true, the response is returned as soon as the leader has truncated locally." }
]
}
DeleteRecordsResponse.json add LeaderLogStartOffset field
iff --git a/clients/src/main/resources/common/message/DeleteRecordsResponse.json b/clients/src/main/resources/common/message/DeleteRecordsResponse.json
index bfc0a56390..ac06e0ad01 100644
--- a/clients/src/main/resources/common/message/DeleteRecordsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteRecordsResponse.json
@@ -20,7 +20,9 @@
// Starting in version 1, on quota violation, brokers send out responses before throttling.
// Version 2 is the first flexible version.
- "validVersions": "0-2",
+
+ // Version 3 adds the LeaderLogStartOffset field (KIP-XXXX)
+ "validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -34,7 +36,9 @@
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
"about": "The partition index." },
{ "name": "LowWatermark", "type": "int64", "versions": "0+",
- "about": "The partition low water mark." },
+ "about": "The partition low water mark, i.e. the minimum logStartOffset across all alive replicas at the time of the response. With LeaderOnly=true, this may be below the requested offset until followers propagate." },
+ { "name": "LeaderLogStartOffset", "type": "int64", "versions": "3+",
+ "about": "The leader's local logStartOffset at the time of the response. On success this is at least the requested offset. When LeaderOnly was set in the request, this is the field to consult, since LowWatermark may lag until followers catch up." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The deletion error code, or 0 if the deletion succeeded." }
]}
Add LeaderOnly to DeleteRecordsOptions
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java
index 320d3d8cb6..b2ec6712e5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java
@@ -24,4 +24,28 @@ import java.util.Map;
*/
public class DeleteRecordsOptions extends AbstractOptions<DeleteRecordsOptions> {
+ private boolean leaderOnly = false;
+
+ /**
+ * Whether the broker should respond as soon as the leader has truncated locally,
+ * without waiting for follower acknowledgement.
+ *
+ * <ul>
+ * <li>{@code false} (default): the broker waits for every alive replica to advance its
+ * log start offset past the requested offset. This is the historical behavior and
+ * is required for use cases that rely on cross-replica deletion guarantees
+ * (e.g. privacy / compliance flows).</li>
+ * <li>{@code true}: the broker responds as soon as the leader has truncated locally.
+ * Suitable when deletion is an operational optimization rather than a correctness
+ * guarantee.</li>
+ * </ul>
+ */
+ public DeleteRecordsOptions leaderOnly(boolean leaderOnly) {
+ this.leaderOnly = leaderOnly;
+ return this;
+ }
+
+ public boolean leaderOnly() {
+ return leaderOnly;
+ }
}
DeletedRecords add LeaderLogStartOffset field
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeletedRecords.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeletedRecords.java
index 97d83a7775..580c2a7e5a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeletedRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeletedRecords.java
@@ -23,14 +23,29 @@ package org.apache.kafka.clients.admin;
public class DeletedRecords {
private final long lowWatermark;
+ private final long leaderLogStartOffset;
/**
- * Create an instance of this class with the provided parameters.
+ * Create an instance of this class with only the low watermark known. The
+ * leader's local logStartOffset is reported as -1 (unknown), which is the
+ * case for responses from brokers older than v3.
*
* @param lowWatermark "low watermark" for the topic partition on which the deletion was executed
*/
public DeletedRecords(long lowWatermark) {
+ this(lowWatermark, -1L);
+ }
+
+ /**
+ * Create an instance of this class with the provided parameters.
+ *
+ * @param lowWatermark "low watermark" for the topic partition on which the deletion was executed
+ * @param leaderLogStartOffset the leader's local logStartOffset at the time of the response, or -1 if unknown
+ * (returned by brokers that do not support DeleteRecords v3 or older).
+ */
+ public DeletedRecords(long lowWatermark, long leaderLogStartOffset) {
this.lowWatermark = lowWatermark;
+ this.leaderLogStartOffset = leaderLogStartOffset;
}
/**
@@ -39,4 +54,13 @@ public class DeletedRecords {
public long lowWatermark() {
return lowWatermark;
}
+
+ /**
+ * Return the leader's local logStartOffset at the time of the response.
+ * On success this is at least the requested offset. Returns -1 if the broker did not
+ * include this field (e.g. responses from DeleteRecords v2 or earlier).
+ */
+ public long leaderLogStartOffset() {
+ return leaderLogStartOffset;
+ }
}
~
Rename low watermark to partitionResults because we need to support LeaderLogStartOffset
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
index 061403eeb1..70b154f7ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
@@ -35,12 +35,24 @@ public class DeleteRecordsResult {
/**
* Return a map from topic partition to futures which can be used to check the status of
- * individual deletions.
+ * individual deletions. Each future resolves to a {@link DeletedRecords} carrying both
+ * the partition's low watermark and the leader's local logStartOffset.
*/
- public Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks() {
+ public Map<TopicPartition, KafkaFuture<DeletedRecords>> partitionResults() {
return futures;
}
+ /**
+ * @deprecated Use {@link #partitionResults()} instead. The returned futures
+ * resolve to a {@link DeletedRecords} that carries more than just the low
+ * watermark (notably {@link DeletedRecords#leaderLogStartOffset()}), so the
+ * {@code lowWatermarks} name no longer describes the contents.
+ */
+ @Deprecated(since = "4.4.0")
+ public Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks() {
+ return partitionResults();
+ }
+
/**
* Return a future which succeeds only if all the records deletions succeed.
*/
Proposed Changes
- Bump DeleteRecordsRequest.json and DeleteRecordsResponse.json to v3 to support non-blocking DeleteRecords operation.
- KafkaApis.handleDeleteRecordsRequest reads data.leaderOnly and forwards it to ReplicaManager.deleteRecords. No validation needed.
- ReplicaManager.deleteRecords accepts leaderOnly argument
leaderOnly = true: skipdelayedDeleteRecordsPurgatory; respond immediately. The response carriesLowWatermark isthe currentlowWatermarkIfLeader(which may be below the requested offset) andLeaderLogStartOffset isthe leader's locallogStartOffsetafter truncation.leaderOnly = false: identical to today, withLeaderLogStartOffsetpopulated from the leader'slogStartOffsetat the time the request completes.
- When leader truncation fails (disk error, not leader, unknown partition, etc.): ErrorCode is set accordingly, LeaderLogStartOffset = -1, LowWatermark = -1. Same as v2.
- Admin client passes DeleteRecordsOptions.leaderOnly straight through to DeleteRecordsRequestData.setLeaderOnly. DeletedRecords exposes leaderLogStartOffset.
- Admin client behavior against older brokers:
- leaderOnly = false: downgrades to the broker's max version (v0–v2). DeletedRecords.leaderLogStartOffset returns -1.
- leaderOnly = true: against a broker < v3: per-partition future fails with UnsupportedVersionException. Non-retriable, no silent downgrade.
- The kafka-delete-records CLI support leaderOnly flag and prints the new field leaderLogStartOffset.
Compatibility, Deprecation, and Migration Plan
Wire (negotiated via
ApiVersionsResponse):- Old client → any broker: unchanged.
- New client → v3 broker:
leaderOnlyhonoured. - New client → old broker:
- If leaderOnly is true, UnsupportedVersion is thrown.
- If leaderOnly is false, fall back to v0-v2 version.
Admin client:
DeleteRecordsResult.lowWatermarks()→partitionResults(); old name kept and deprecatedbecause the newLeaderLogStartOffsetmakes the old name misleading.DeletedRecordsgainsleaderLogStartOffset(). Returns-1when the response came from a v2-or-older broker.
Test Plan
- Default behaviour regression. Existing DeleteRecordsRequestTest runs with leaderOnly unset and must still pass.
- Leader-only happy path. A new integration test issues DeleteRecords with leaderOnly = true and asserts errorCode == NONE and leaderLogStartOffset >= requestedOffset.
- Leader-only skips purgatory. A ReplicaManager unit test asserts that deleteRecords(..., leaderOnly = true) invokes the response callback synchronously and does not enroll a DelayedDeleteRecords.
Admin client wiring. Unit-test the request and response sides of the admin path.
Rejected Alternatives
- Tagged Field on v2: Adding LeaderOnly as a tagged field would avoid the version bump but capability can't be advertised via ApiVersionsResponse, so a new client sending leaderOnly=true to an old broker would silently fall back to wait-all-replicas with no way to tell the responses apart. LeaderLogStartOffset is also meaningful enough to deserve a versioned field rather than a tag.
- Acks-style Enum Instead of Boolean LeaderOnly: Using an acks-style enum (like Produce) was considered and rejected. DeleteRecords has only two meaningful modes: leader-only, and all alive replicas.