Authors: Greg Harris, Ivan Yurchenko, Jorge Quilcate, Giuseppe Lillo, Anatolii Popov, Juha Mynttinen, Josep Prat, Filip Yonov

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-19161

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-1150: Diskless Topics introduces the concepts of diskless topics. KIP-1163: Diskless Core describes the core functionality of diskless topics, such as the produce and consume paths. However, KIP-1163 left the Batch Coordinator interface and reference implementation unspecified. This KIP fills this gap.

Glossary

  • Diskless Topic: A topic which does not append to or serve from block storage devices.
  • Object Storage: A shared, durable, concurrent, and eventually consistent storage supporting arbitrary sized byte values and a minimal set of atomic operations: put, delete, list, and ranged get.
  • Batch: A container for Kafka records and a unit of record representation in the network protocol and storage format in both status quo Kafka and diskless topics.
  • Shared Log Segment Object: An object containing a shared log segment for one or more diskless topic-partitions on the object storage. Contains record batches similar to classic Kafka topics.
  • Batch Coordinate: A reference to a record batch within a shared log segment object, at some byte range.
  • Object Compaction: Distinct from log compaction. A background asynchronous process which reads from and writes to multiple shared log segment objects to manage already-written objects.
  • Diskless Manager: The set of components internal to the broker that handle diskless topics operations, such as creating objects, uploading objects to object storage, etc.

Proposed Changes

This KIP has two purposes:

  1. Define the Batch Coordinator plugin interface.
  2. Describe the reference topic-based implementation of this interface that will be shipped with Kafka.

The KIP proposes that Kafka provide a production-grade implementation, but also that the mechanism is pluggable so that users and operators could have a Batch Coordinator implementation suitable for their use cases. Among the possible implementations are:

  • backed by a Kafka topic;

  • backed by a stand-alone database;
  • backed by the KRaft quorum;
  • backed by object storage itself.

Batch Coordinator

The Batch Coordinator is the source of truth about objects, partitions, and batches in diskless topics. It does the following:

  1. Chooses the total ordering for writes, assigning offsets without gaps or duplicates.
  2. Serves requests for log offsets.
  3. Serves requests for batch coordinates.
  4. Serves requests for atomic operations (creating partitions, deleting topics, records, etc.)
  5. Manages data expiry and soft deletion.
  6. Manages object physical deletion (performed by brokers).

The three main entities the Batch Coordinator is concerned about are objects, partitions, and batches. We’re interested mainly in the following information about objects:

  1. The object key: the object name/path of the object in the object storage.
  2. The object state: “uploaded” or “deleted” (for soft deletion).
  3. The size and the actually used size for tracking when the object becomes empty to be deleted.

About partitions, we’re interested mainly in the following:

  1. The topic ID and name, partition number.
  2. The log start offset and the high watermark: to track the beginning and end of the partition.

About batches, we’re interested mainly in the following:

  1. The reference to the log (the topic ID + partition).
  2. The reference to the object where the batch is located.
  3. The base and last offsets.
  4. The byte offset in the object.
  5. The batch size in bytes.
  6. The timestamps: log append time and batch max timestamp.
  7. The information necessary to support idempotent producers.

We propose to make the Batch Coordinator pluggable. The plugin interface is described in the Public Interfaces section.

Batch Coordinator implemented with an internal topic

This is a reference implementation that will be shipped with Kafka. It uses an internal, non-compacted topic called __diskless-metadata for storing diskless metadata. This topic is created by default with one partition, but the number of partitions can be increased later (see the Scaling the number of Batch Coordinators section.)

The reference implementation of the Batch Coordinator is based upon the coordinator runtime framework.

Given the nature of diskless topics, clients can fetch messages from these topics from every broker in the cluster. This characteristic requires that the batch coordinates should be retrieved in an efficient way, without the need to contact the leader of the metadata partition for every fetch request. 

In order to solve this problem, the Batch Coordinator makes use of read-only coordinators. Read-only Batch Coordinators are run on the brokers that are in-sync replicas for the __diskless-metadata topic. They can serve read requests, but cannot serve write requests.

For simplicity, let’s assume there’s only one partition, __diskless-metadata-0. The Batch Coordinator instance will run on the broker that is the leader of this partition, while read-only Batch Coordinators instances will run on all the brokers that are in-sync replicas for this partition. The Batch Coordinator will handle all the requests that modify the state (such as batch commit and delete requests), while the read-only Batch Coordinators can serve requests that only need to read the state. 

The topic is the storage and replication medium, but it cannot be directly queried in a performant way. For this, the state needs to be materialized locally. This, in turn, poses another challenge: the state can become too large to fit into memory. That means, the materialization mechanism must be backed by a disk. This set of requirements suggests using some embedded disk-based database engine. We propose to use SQLite. SQLite is an open-source, self-contained SQL database engine that can be embedded into an application. It’s widely used in the software industry and can be easily integrated into any codebase. Using SQLite offers the possibility for structured querying and indexing of the state, allowing fast and easy access to the Batch Coordinator state without the need for implementing all the data structures and algorithms needed for managing the state on disk.

Protocol

The Diskless Manager can find the necessary Batch Coordinator using the FindCoordinator RPC using a new key type FindCoordinatorRequest.CoordinatorType.BATCH. The key format is “operation:topicId:partition”, where “operation” can be either “read” or “write”. When “write” is specified, only the leader Batch Coordinator is returned. When “read” is specified, any of the Batch Coordinators can be returned, including the read-only ones.

The Diskless Manager uses inter-broker RPCs to communicate with a Batch Coordinator, using a new set of APIs: 

  • InitDisklessTopics
  • CommitBatches
  • DeleteDisklessTopics
  • DeleteDisklessRecords
  • FindBatches
  • ListDisklessOffsets

Scaling the number of Batch Coordinators

The number of Batch Coordinators can be dynamically increased by creating new partitions for the __diskless-metadata topic.

Topic-partitions are assigned to one specific Batch Coordinator. This mapping is stored in the KRaft metadata.


Risks

The __diskless-metadata topic health is essential for the topic-based Batch Coordinator to function correctly. This creates some risks when operating this type of Batch Coordinator:

  1. If the topic becomes non-writable (e.g. too few in-sync replicas), the Coordinator cannot perform operations that modify the metadata. This involves, first and foremost, committing shared log segment objects. That means, in case of an outage, the write path will stall. In case of multi-partition __diskless-metadata topic, the outage will be contained only to the affected partitions.
  2. If the topic becomes fully unavailable, Batch Coordinator instances would be able to serve read-only operations from their local materialized state, which isn’t guaranteed to be up-to-date.
  3. If the topic is deleted (e.g. by an accident), the metadata is lost and the previously written data becomes inaccessible.

Public Interfaces

Batch Coordinator pluggable interface

public record CreateTopicAndPartitionsRequest(
    Uuid topicId,
    String topicName,
    int numPartitions) {
}

public record CommitBatchRequest(
    int requestId,
    TopicIdPartition topicIdPartition,
    int byteOffset,
    int size,
    long baseOffset,
    long lastOffset,
    long batchMaxTimestamp,
    TimestampType messageTimestampType,
    long producerId,
    short producerEpoch,
    int baseSequence,
    int lastSequence) {
}

public record CommitBatchResponse(
    Errors errors,
    long assignedBaseOffset,
    long logAppendTime,
    long logStartOffset,
    boolean isDuplicate,
    CommitBatchRequest request) {
}

public record FindBatchRequest(
    TopicIdPartition topicIdPartition,
    long offset,
    int maxPartitionFetchBytes) {
}

public record FindBatchResponse(
    Errors errors,
    List<BatchInfo> batches,
    long logStartOffset,
    long highWatermark) {
}

public record BatchInfo(
    long batchId,
    String objectKey,
    BatchMetadata metadata) {
}

public record BatchMetadata (
    TopicIdPartition topicIdPartition,
    long byteOffset,
    long byteSize,
    long baseOffset,
    long lastOffset,
    long logAppendTimestamp,
    long batchMaxTimestamp,
    TimestampType timestampType,
    long producerId,
    short producerEpoch,
    int baseSequence,
    int lastSequence) {
}

public record ListOffsetsRequest(
    TopicIdPartition topicIdPartition,
    long timestamp
) {

    public static final long EARLIEST_TIMESTAMP = org.apache.kafka.common.requests.ListOffsetsRequest.EARLIEST_TIMESTAMP;
    public static final long LATEST_TIMESTAMP = org.apache.kafka.common.requests.ListOffsetsRequest.LATEST_TIMESTAMP;
    public static final long MAX_TIMESTAMP = org.apache.kafka.common.requests.ListOffsetsRequest.MAX_TIMESTAMP;
    public static final long EARLIEST_LOCAL_TIMESTAMP = org.apache.kafka.common.requests.ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
    public static final long LATEST_TIERED_TIMESTAMP = org.apache.kafka.common.requests.ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
}

public record ListOffsetsResponse(
    Errors errors,
    TopicIdPartition topicIdPartition,
    long timestamp,
    long offset
)

public record DeleteRecordsRequest(
    TopicIdPartition topicIdPartition,
    long offset) {
}

public record DeleteRecordsResponse(
    Errors errors,
    long lowWatermark) {
}

public record FileToDelete(
    String objectKey,
    Instant markedForDeletionAt) {
}

public record DeleteFilesRequest(
    Set<String> objectKeyPaths) {
}

public interface BatchCoordinator extends Closeable, Configurable {
    /**
     * This operation is called when a Diskless partition 
     * (or a topic with one or more partitions) is created in the cluster.
     * The Batch Coordinator initializes the corresponding logs.
     *
     * @throws KafkaException if an unexpected error occurs
     */
    void createTopicAndPartitions(
        Set<CreateTopicAndPartitionsRequest> requests);

    /**
     * This operation is called by a broker after uploading the 
     * shared log segment object to the object storage.
     *
     * <p>The Batch Coordinator:
     * <ol>
     * <li>Performs the necessary checks for idempotent produce.
     * <li>Accordingly increases the high watermark of the affected logs.
     * <li>Assigns offsets to the batches.
     * <li>Saves the batch and object metadata.
     * <li>Returns the result to the broker
     *
     * @throws KafkaException if an unexpected error occurs
     */
    List<CommitBatchResponse> commitFile(
        String objectKey,
        int uploaderBrokerId,
        long fileSize,
        List<CommitBatchRequest> batches);

    /**
     * This operation is called by a broker when it needs to serve a Fetch request.
     * <p>The Batch Coordinator collects the batch coordinates to satisfy
     * this request and sends the response back to the broker.
     *
     * @throws KafkaException if an unexpected error occurs
     */
    List<FindBatchResponse> findBatches(
        List<FindBatchRequest> findBatchRequests,
        int fetchMaxBytes);


    /**
     * This operation allows the broker to get the information about log offsets:
     * earliest, latest, etc. The operation is a read-only operation.
     *
     * @throws KafkaException if an unexpected error occurs
     */
    List<ListOffsetsResponse> listOffsets(
        List<ListOffsetsRequest> requests);

    /**
     * This operation is called when a partition needs to be truncated by the user.
     * <p>The Batch Coordinator:
     * <ol>
     * <li>Modifies the log start offset for the affected partitions (logs).
     * <li>Deletes the batches that are no longer needed due to this truncation.
     * <li>If some objects become empty after deleting these batches,
     *     they are marked for deletion as well.
     *
     * @throws KafkaException if an unexpected error occurs
     */
    List<DeleteRecordsResponse> deleteRecords(
        List<DeleteRecordsRequest> requests);

    /**
     * This operation is called when topics are deleted. 
     * It’s similar to deleting records, but all the associated batches 
     * are deleted and the log metadata are deleted as well.
     *
     * @throws KafkaException if an unexpected error occurs
     */
    void deleteTopics(
        Set<Uuid> topicIds);

    /**
     * This operation allows a broker to get a list of soft deleted objects 
     * for asynchronous physical deletion from the object storage.
     *
     * @throws KafkaException if an unexpected error occurs
     */
    List<FileToDelete> getFilesToDelete();

    /**
     * This operation informs the Batch Coordinator that certain soft deleted
     * objects were also deleted physically from the object storage.
     * <p>The Batch Coordinator removes all metadata about these objects.
     *
     * @throws KafkaException if an unexpected error occurs
     */
    void deleteFiles(
        DeleteFilesRequest request);

    boolean isSafeToDeleteFile(
        String objectKey);
}

BatchCoordinator topic-based implementation

FindCoordinator API

The KIP introduces version 7.

Request schema

Version 7 adds the new key type of FindCoordinatorRequest.CoordinatorType.BATCH with value 3, with the key of "operation:topicId:partition". “operation” can be either "write" or "read", to indicate whether the Batch Coordinator needs to write new metadata (write) or it just needs to read metadata.

{
  "apiKey": 10,
  "type": "request",
  "listeners": ["broker"],
  "name": "FindCoordinatorRequest",
  // Version 1 adds KeyType.
  //
  // Version 2 is the same as version 1.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds support for batching via CoordinatorKeys (KIP-699)
  //
  // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
  //
  // Version 6 adds support for share groups (KIP-932).
  // For key type SHARE (2), the coordinator key format is "groupId:topicId:partition".
  // Version 7 adds support for batch coordinator (KIP-1164).
  // For key type BATCH (3), the coordinator key format is "operation:topicId:partition", where operation is "read" or "write".
  "validVersions": "0-7",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "Key", "type": "string", "versions": "0-3",
      "about": "The coordinator key." },
    { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
      "about": "The coordinator key type. (group, transaction, share, batch)." },
    { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
      "about": "The coordinator keys." }
  ]
}

Response schema

Version 7 is the same as version 6.

InitDisklessTopics API

Request schema

{
  "apiKey": 93,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitDisklessTopics",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]InitDisklessTopic", "versions": "0+",
      "about": "Topics to initialize.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique ID of this topic." },
      { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions of this topic" }
      ]
    }
  ]
}

Response schema

{
  "apiKey": 93,
  "type": "response",
  "listeners": ["broker"],
  "name": "InitDisklessTopics",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Responses", "type": "[]InitDisklessTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to initialize.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique topic ID."},
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The initialization error, or 0 if the initialization succeeded." }
    ]}
  ]
}

CommitBatches API

Request schema

{
  "apiKey": 94,
  "type": "request",
  "listeners": ["broker"],
  "name": "CommitBatches",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the requesting broker." },
    { "name": "Batches", "type": "[]Batches", "versions": "0+",
      "about": "Batches to commit.", "fields": [
      { "name": "Topics", "type": "[]CommitBatchesTopic", "versions": "0+",
        "about": "Each topic that we want to commit batches for.", "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]CommitBatchesTopicPartition", "versions": "0+",
          "about": "Each partition that we want to commit batches for.", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
          { "name": "ObjectKey", "type": "string", "versions": "0+",
            "about": "The key of this object that contains this batch." }
          { "name": "ByteOffset", "type": "int32", "versions": "0+",
            "about": "The starting byte offset for this batch within the object."},
          { "name": "Size", "type": "int64", "versions": "0+",
            "about": "Size of the batch in bytes."},
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "The base offset of this batch."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this batch (inclusive)."},
          { "name": "BatchMaxTimestamp", "type": "int64", "versions": "0+",
            "about": "Max timestamp or log append time of this batch."},
          { "name": "MessageTimestampType", "type": "int8", "versions": "0+",
            "about": "The message timestamp type."},
          { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId", 
            "about": "ProducerId of this batch."},
          { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
            "about": "The current epoch associated with the producer ID."},
          { "name": "BaseSequence", "type": "int32", "versions": "0+",
            "about": "Base sequence number of this batch."},
          { "name": "LastSequence", "type": "int32", "versions": "0+",
            "about": "Last sequence number of this batch."}
        ]}
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": 94,
  "type": "response",
  "listeners": ["broker"],
  "name": "CommitBatches",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]CommitBatchesTopicResults", "versions": "0+",
      "about": "The result for each topic.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "Partitions", "type": "[]CommitBatchesTopicPartitionResults", "versions": "0+",
        "about": "The result for each partition.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The result error, or zero if there was no error." },
        { "name": "AssignedBaseOffset", "type": "int64", "versions": "0+",
          "about": "The assigned base offset."},
        { "name": "LogAppendTime", "type": "int64", "versions": "0+",
          "about": "The timestamp returned by broker after appending the messages."},
        { "name": "LogStartOffset", "type": "int64", "versions": "0+",
          "about": "The log start offset."},
        { "name": "IsDuplicate", "type": "bool", "versions": "0+",
          "about": "Whether the batch was already written by the same producer id."}
      ]}
    ]}
  ]
}

DeleteDisklessTopics API

Request schema

{
  "apiKey": 95,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteDisklessTopics",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
     { "name": "Topics", "type": "[]DeleteDisklessTopic", "versions": "0+",
       "about": "The ID of the topics to delete.", "fields": [
       { "name": "TopicId", "type": "uuid", "versions": "0+",
         "about": "The unique topic ID." }
     ]}
   ]
}

Response schema

{
  "apiKey": 95,
  "type": "response",
  "listeners": ["broker"],
  "name": "DeleteDisklessTopics",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Responses", "type": "[]DeleteDisklessTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to delete.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique topic ID."},
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The deletion error, or 0 if the deletion succeeded." }
    ]}
  ]
}

DeleteDisklessRecords API

Request schema

{
  "apiKey": 96,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteDisklessRecords",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DeleteDisklessTopicRecords", "versions": "0+",
      "about": "Each topic that we want to delete records from.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "Partitions", "type": "[]DeleteDisklessPartitionRecords", "versions": "0+",
        "about": "Each partition that we want to delete records from.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Offset", "type": "int64", "versions": "0+",
          "about": "The deletion offset." }
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": 96,
  "type": "response",
  "listeners": ["broker"],
  "name": "DeleteDisklessRecords",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Responses", "type": "[]DeleteDisklessRecordsResult", "versions": "0+",
      "about": "The results for each topic we tried to delete records for.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]DeleteDisklessPartitionRecordsResult", "versions": "0+",
        "about": "The results for each topic we tried to delete records for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The deletion error, or 0 if the record deletion succeeded." }
      ]}
    ]}
  ]
}

FindBatches API

Request schema

{
  "apiKey": 97,
  "type": "request",
  "listeners": ["broker"],
  "name": "FindDisklessBatches",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]FindDisklessBatchTopic", "versions": "0+",
      "about": "Each topic that we want to find batches for.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "Partitions", "type": "[]FindDisklessBatchTopicPartition", "versions": "0+",
        "about": "Each partition that we want to find batches for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Offset", "type": "int64", "versions": "0+",
          "about": "The starting offset." },
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch for this the topic partition." }
      ]}
    ]},
    { "name": "MaxBytes", "type": "int32", "versions": "0+",
        "about": "The maximum bytes to fetch for all the topic partition." }
  ]
}

Response schema

{
  "apiKey": 97,
  "type": "response",
  "listeners": ["broker"],
  "name": "FindDisklessBatches",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Responses", "type": "[]FindDisklessBatchTopicResult", "versions": "0+",
      "about": "The results for each topic we searched batches for.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FindDisklessBatchTopicPartitionResult", "versions": "0+",
        "about": "The results for each topic we searched batches for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error, or 0 if the search succeeded." },
        { "name": "LogStartOffset", "type": "int64", "versions": "0+",
          "about": "The current log start offset."},
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark."},
        { "name": "ObjectKey", "type": "string", "versions": "0+",
            "about": "The key of this object that contains this batch." }
        { "name": "ByteOffset", "type": "int32", "versions": "0+",
          "about": "The starting byte offset for this batch within the object."},
        { "name": "Size", "type": "int64", "versions": "0+",
          "about": "Size of the batch in bytes."},
        { "name": "BaseOffset", "type": "int64", "versions": "0+",
          "about": "The base offset of this batch."},
        { "name": "LastOffset", "type": "int64", "versions": "0+",
          "about": "The last offset of this batch (inclusive)."},
        { "name": "BatchMaxTimestamp", "type": "int64", "versions": "0+",
          "about": "Max timestamp or log append time of this batch."},
        { "name": "MessageTimestampType", "type": "int8", "versions": "0+",
          "about": "The message timestamp type"},
        { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId", 
          "about": "ProducerId of this batch."},
        { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
          "about": "The current epoch associated with the producer ID."},
        { "name": "BaseSequence", "type": "int32", "versions": "0+",
          "about": "Base sequence number of this batch."},
        { "name": "LastSequence", "type": "int32", "versions": "0+",
          "about": "Last sequence number of this batch."}
      ]}
    ]}
  ]
}

ListDisklessOffsets API

Request schema

{
  "apiKey": 98,
  "type": "request",
  "listeners": ["broker"],
  "name": "ListDisklessOffsets",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]ListDisklessOffsetsTopic", "versions": "0+",
      "about": "Each topic that we want to list offsets for.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "Partitions", "type": "[]ListDisklessOffsetsTopicPartition", "versions": "0+",
        "about": "Each partition that we want to list offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Timestamp", "type": "int64", "versions": "0+",
          "about": "The current timestamp." }
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": 98,
  "type": "response",
  "listeners": ["broker"],
  "name": "DeleteDisklessRecords",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Responses", "type": "[]ListDisklessOffsetsTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to list offsets for.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]ListDisklessOffsetsTopicPartitionResult", "versions": "0+",
        "about": "The results for each topic we tried to list offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The deletion error, or 0 if the record list offsets succeeded." },
        { "name": "Timestamp", "type": "int64", "versions": "0+",
          "about": "The timestamp associated with the returned offset." },
        { "name": "Offset", "type": "int64", "versions": "0+",
          "about": "The returned offset." },
      ]}
    ]}
  ]
}

AssignPartitionToBatchCoordinator API

Request schema

{
  "apiKey": 99,
  "type": "request",
  "listeners": ["controller"],
  "name": "AssignPartitionToBatchCoordinator",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
   { "name": "Topics", "type": "[]AssignTopicToBatchCoordinator", "versions": "0+",
      "about": "Each topic that we want to assign Batch Coordinator to.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "Partitions", "type": "[]AssignTopicPartitionToBatchCoordinator", "versions": "0+",
        "about": "Each partition that we want to assign Batch Coordinator to.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "BatchCoordinatorPartitionIndex", "type": "int32", "versions": "0+",
          "about": "The Batch Coordinator partition index." }
      ]}
    ]}   
  ]
}

Response schema

{
  "apiKey": 99,
  "type": "response",
  "listeners": ["broker"],
  "name": "AssignPartitionToBatchCoordinator",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Responses", "type": "[]AssignTopicToBatchCoordinatorResult", "versions": "0+",
      "about": "The results for each topic we tried to assign Batch Coordinator to.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]ListDisklessOffsetsTopicPartitionResult", "versions": "0+",
        "about": "The results for each topic we tried to assign Batch Coordinator to.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error, or 0 if the assignment succeeded." }
      ]}
    ]}
  ]
}

TopicPartitionToMetadataMapping

{
  "apiKey": 29,
  "type": "metadata",
  "name": "TopicPartitionToBatchCoordinatorMapping",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
   { "name": "PartitionIndex", "type": "int32", "versions": "0+",
     "about": "The partition index." },
   { "name": "TopicId", "type": "uuid", "versions": "0+",
     "about": "The unique ID of the topic." },
   { "name": "BatchCoordinatorPartitionIndex", "type": "int32", "versions": "0+",
     "about": "The Batch Coordinator partition index." }
  ]
}

Future Work

Having many diskless topics with infinite retention would mean that metadata keeps growing infinitely. This leads to metadata partitions having to store large amounts of metadata, which also impacts Batch Coordinator startup time. Enabling Tiered Storage on the __diskless-metadata topic is a possible way to mitigate these issues. Another possible way to solve these problems is to introduce the concepts of Diskless Metadata Snapshots. These would be periodic snapshots of a metadata partition that can be stored on object storage, so that the internal topic only needs to store the deltas compared to the last available snapshot.

Compatibility, Deprecation, and Migration Plan

Batch Coordinator is only needed for diskless topics. No migration is therefore needed.

Test Plan

The default implementation of the Batch Coordinator will be tested in-depth with unit tests. Performance tests will be done in order to understand how many requests a single Batch Coordinator is able to sustain, which is a useful metric to consider in regards to deciding how many Batch Coordinator is necessary to have in a deployment.

Tests of the Batch Coordinator in an integrated system will be performed through the integration and system tests that are planned for KIP-1163.

Rejected Alternatives

Topic-based Batch Coordinator using high number of partitions by default

In this option the Batch Coordinator topic is created by default with a high number of partitions, for example 50 like the other coordinators already available.

The advantage of this option is that the assignment of a topic-partition to its Batch Coordinator can be static, removing the need for storing the assignment inside KRaft metadata.

The drawback of this option instead consists of the high fan-out of requests from brokers to Batch Coordinators. Having 50 partitions means that there are 50 Batch Coordinators, and this drastically increases the likelihood of one broker having to contact every broker in the cluster every time a new Shared Log Segment Object is created. Every Shared Log Segment Object contains multiple topic-partitions, therefore in order to commit this object it’s necessary to contact all the Batch Coordinators that are assigned to the topic-partitions present in the object.

This KIP instead proposes using only one Batch Coordinator by default to avoid paying the high fan-out cost even when it’s not necessary for a cluster to have many Batch Coordinators.

Use an external storage for metadata

One can argue that a Kafka topic is not the most convenient medium for storing this type of metadata. Using a relational or key-value database may make it more convenient to implement and provide certain performance benefits. We reject this alternative because Kafka tends to be self-sufficient and not rely on external systems (see e.g. the ZooKeeper removal in KIP-500). However, the pluggability of the batch coordinator allows using such implementations.

  • No labels