Status

Current state"Accepted"

Discussion thread: here 

JIRA: KAFKA-5925 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The KIP-107 provides a way to delete messages starting from a specified offset inside a topic partition which we don’t want to take anymore so without relying on time-based and size-based log retention policies. The already implemented protocol request and response messages (DeleteRecords API, key 21) are used only by the “legacy” Admin Client in Scala and aren’t provided by the new Admin Client API in Java. This KIP is about adding this feature to the new Admin Client API.

Note that this KIP is related to KIP-107.

Public Interfaces

The AdminClient API will have new methods added (plus overloads for options):

deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete)

Proposed Changes

AdminClient : deleteRecords()

public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete)
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete, DeleteRecordsOptions options)

Where :

TopicPartition comes from org.apache.kafka.common package

RecordsToDelete, DeleteRecordsOptions and DeleteRecordsResult are defined as follow. 

/**
 * Options for {@link AdminClient#deleteRecords(Map, DeleteRecordsOptions)}.
 */
public class DeleteRecordsOptions extends AbstractOptions<DeleteRecordsOptions> {

}
 
/**
 * Describe records to delete in a call to {@link AdminClient#deleteRecords(Map)}
 */
public class RecordsToDelete {
	private long offset;
	
	/** 
	 * Delete all the records before the given {@code offset}
	 *
	 * @param offset    the offset before which all records will be deleted
	 */
	public static RecordsToDelete beforeOffset(long offset) { ... }
}

/**
 * The result of the {@link AdminClient#deleteRecords(Map)} call.
 */ 
public class DeleteRecordsResult {
    // package access constructor
    Map<TopicPartition, KafkaFuture<DeleteRecords>> values() { ... }
    KafkaFuture<DeleteRecords> all() { ... }
}
 
/**
 * Represents information about deleted records
 */
public class DeletedRecords {
	private final long lowWatermark;
 
	/**
 	 * Create an instance of this class with the provided parameters.
 	 *
	 * @param lowWatermark  "low watermark" for the topic partition on which the deletion was executed
	 */
	public DeletedRecords(long lowWatermark) {
    	this.lowWatermark = lowWatermark;
	}

	/**
	 * Return the "low watermark" for the topic partition on which the deletion was executed
	 */
	public long lowWatermark() {
    	return lowWatermark;
	}
}

In the DeleteRecordsResult, the Long value accessed by values() and all() method specifies the low watermark as described in the KIP-107.

The deleteRecords() will have the same authorization setting as deleteTopic(). Its operation type is be DELETE and its resource type is TOPIC.

Compatibility, Deprecation, and Migration Plan

This is a new API and won't directly affect existing users.

It should replace the deleteRecordsBefore() method provided by the Scala based Admin Client. So existing users who are using such method, should migrate to use the new Java based Admin Client for this feature.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels