Authors: Greg Harris, Ivan Yurchenko, Jorge Quilcate, Giuseppe Lillo, Anatolii Popov, Juha Mynttinen, Josep Prat, Filip Yonov

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-19161

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-1150: Diskless Topics introduces the concepts of diskless topics. KIP-1163: Diskless Core describes how Diskless topics work. One of the principal components that was mentioned but not described was the Diskless coordinator. This KIP closes this gap and describes the Diskless coordinator (DC) in detail.

Role of Diskless coordinator

The Diskless coordinator manages metadata specific to Diskless topics, such as batches, WAL files, producer and transaction state for Diskless topics. Brokers rely on DC to perform operations on Diskless topics. For example:

  1. When a broker writes a WAL file, it sends a commit request to DC so it stores the metadata about the file and batches.
  2. When a broker serves a consumer, it queries DC for partitions’ offsets and existing batches and also files where the batches are located.
  3. When a partition is deleted or truncated, DC deletes the corresponding batches and marks WAL files as free.

To perform this and other operations, brokers need to use the request-response API of the Diskless coordinator.

Proposed Changes

This KIP proposes the following:

  1. The Diskless coordinator broker API.
  2. The approach to the Diskless coordinator implementation based on topics and the coordinator framework.

Diskless coordinator API

The Diskless coordinator API will be a part of the broker API. However, it is not expected to be used directly by client applications. It will require the CLUSTER_ACTION permission.

The Diskless coordinator will expose the following operations:

  1. DisklessCreatePartitions: Create new partitions, could be used both for creating new topics and for increasing the number of partitions in an existing one.
  2. DisklessDeleteTopics: Delete existing topics and their partitions.
  3. DisklessCommitFile: Commit a WAL file with batches. During this operation, various checks are performed (e.g. for producer idempotence, transactional checks) and offsets are assigned to batches.
  4. DisklessDeleteRecords: Delete tail records in partitions.
  5. DisklessListOffsets: List offsets for specified timestamps, including special timestamps for “earliest”, “latest” for finding the log start offset and high watermark. This is a read-only operation with strict consistency (i.e. it should reflect the state after applying all the previous mutating operations).
  6. DisklessFindBatches: Find batches in the specified partitions starting from the specified offset. This is a read-only operation with less strict consistency requirements (i.e. could be served from stale state and by followers).
  7. DisklessDescribeFileCheck if the coordinator has live batches in a WAL file.

The request-response protocol for these operations will be explained in the Public interfaces section.

Some operations remain out of scope of this KIP, such as the ones for transaction management or offloading to tiered storage. They will be explained in the corresponding KIPs.

Topic-based implementation

The KIP proposes to use a new internal Kafka topic __diskless_metadata with multiple partitions as the primary storage. The implementation and deployment and operation models will be built around this fact.

Partitioning of __diskless_metadata

The __diskless_metadata topic will have multiple partitions, effectively creating multiple coordinators and sharding the space of user partitions for scalability. Following the current terminology, we will be saying “Diskless coordinators” and “the Diskless coordinator for partition N” when referring to Diskless code and (meta-)data associated with these partitions.

The __diskless_metadata partitions and Diskless coordinators will be independent in all senses: placement, Diskless operations, etc.

User partitions will be assigned to Diskless coordinators / metadata partitions during creation. This mapping will be stored as a part of partition metadata. Adding partitions to __diskless_metadata will be possible to increase the total capacity of the cluster, but only newly created user partitions will be able to take advantage of the new metadata partitions / Diskless coordinators.

Moving of user partition from Diskless coordinator to another is possible, but remains out of scope of this KIP.

The default number of partitions will be TBD.

Leadership and deployment

The implementation will follow the model set by other coordinators in Kafka.

__diskless_metadata partition leaders will be coordinators. They will serve as an entry point for all mutating operations for the corresponding user partitions. The brokers which need to communicate with a coordinator will be discovering it using FindCoordinator API.

It will be possible to control which brokers host the coordinators through partition placement of the __diskless_metadata topic. If necessary, this will allow dedicating a subset of brokers to serving as Diskless coordinators.

Local state machine

The source of truth for the coordinator will be the __diskless_metadata topic. Similar to other coordinators in Kafka, the Diskless coordinator will need a local state machine materializing the metadata log to facilitate performing of the operations.

In contrast to other coordinators, the expected size of the state of DC is big (up to hundreds of megabytes or even gigabytes), so it’s impractical to keep it in memory. We propose using SQLite as the way to locally materialize the metadata log. SQLite is an open-source widely-used ACID embedded DBMS. Using SQLite offers the possibility for structured querying and indexing of the state, allowing fast and easy access to the Diskless metadata state without the need for implementing all the data structures and algorithms needed for managing the state on disk.

The SQLite DB will be a local metadata cache, not the source of truth. It could be dropped when needed to be refilled again from the log and snapshots.

How operations are performed

The high-level idea of how operations are performed is simple and resembles what happens in the KRaft controller:

  1. Do check against the current state (e.g. whether the operation could be performed at all, fully or partially, etc.)
  2. For mutating operations, append one or several metadata records to the log.
  3. Wait for the metadata records to be replicated to the followers.
  4. Reply to the client.

However, the question is in which order to wait for metadata record replication and to apply the changes to the local state. There are two somewhat conflicting requirements:

  1. We want to perform operations in the pipelined manner to reduce the operation latency. That is, to be able to start working on the following operation while the current one is still waiting to be replicated. This requires the local state to be consistent with the previous pending (non-replicated) operations.
  2. We don’t want to modify the local state until we’re sure it’s consistent with the log to reduce the recovery time in case of failures (reduce the probability of rebuilding the state from scratch). This requires the local state to be updated only with the replicated operations.

While these two requirements contradict each other, they can be reconciled by doing the pre-operation checks against the local state and pending operations at the same time.

From the coordinator point of view, the API request processing logic will be the following:

  1. The broker API receives a request (for example, DisklessCommitFile).
  2. The broker checks that it hosts the coordinator (i.e. it is the leader of the corresponding partition of __diskless_metadata) and passes the request to it. Otherwise, NOT_COORDINATOR error will be returned.
  3. The coordinator speculatively selectively applies the current pending operations to the current committed local state and does the necessary checks against the resulting speculative state. The speculative application can be done either fully in memory or within a to-be-rejected SQLite transaction. At this point, the request may turn out to be fully or partially erroneous, which will be reflected in the response.
  4. The coordinator generates one or more metadata records and publishes them to the local log. These records become pending for the future operations until they are committed to the local state. If there are multiple records, they will be appended to the local log atomically within the same batch. The record formats are explained in the Public interfaces section.
  5. The coordinator waits until these records are replicated to in-sync replicas (the acks=all way), i.e. until the high watermark advances past them.
  6. The coordinator applies the replicated records to the local state for real.
  7. The coordinator sends the response to the client.

Read-only operations that require a consistent up-to-date view of data (e.g. DisklessListOffsets) will skip step 4, but will still wait for the preceding records to be replicated. Some other read-only operations could be served from a stale view of data (e.g. DisklessFindBatches) and could be served right away after step 2, also by a follower.

When an operation is applied to the local state, atomically with this the corresponding offset in the metadata log is stored to facilitate potential recovery from the known point.

Metadata log size management

By far the biggest contributor to the total metadata log size will be metadata records about committed batches and WAL files. The Diskless system is designed to work in cooperation with the tiered storage system by periodically combining batches from Diskless topics into Kafka segments and offloading them to tiered storage. This means that even with infinite data retention, the lifetime of each batch and WAL file metadata inside DC is finite and determined by user configuration (mainly segment.ms and segment.bytes).

The primary mechanism to keep the metadata log size contained will be snapshotting and pruning of the log. Periodically, the leader will take snapshots of the local state asynchronously and the followers will be able to fetch these snapshots. Once a metadata log offset is in a snapshot, it could be pruned. This mechanism is identical to the one in KRaft (see KIP-630).

In KRaft, snapshots are created from an in-memory metadata image. In the Diskless coordinator, the local state will be stored in an SQLite database. To make an asynchronous snapshot point-in-time consistent snapshot, SQLite read-only transactions or the backup API will be used.

WAL file management

Technically it’s not impossible (however, not desirable and we’ll discuss below how to avoid this) that a WAL file contains data for Diskless partitions that belong to different Diskless coordinators, i.e. there’s no one owner that controls the file lifetime.

To overcome this difficulty, we propose the following approach:

  1. When a broker is about to commit a file, it gathers the IDs of all the DCs it’s going to send the commit requests to into the list of owners. The list of owners is randomly permuted.
  2. The list of owners is included as the field in DisklessCommitFile request. Now each DC knows what other DCs claim the file.
  3. The first DC in the owner list becomes the owner.
  4. Once the last batch in this DC for the file is deleted, the DC hands over the file to the next owner in the list.
  5. Once this happens to the last owner, the file is deleted.

The handover and deletion operations must not interfere with other DC activity, i.e. they must be performed asynchronously. The status of the file is changed in the local state (but not in the metadata log, because this information is already implicitly present there and the followers will know it) and a background worker is started for the corresponding operation (handover or deletion). If the target broker for handover or the remote storage is not available, the background worker will retry the operation indefinitely. If the DC leadership changes, the background workers on the current broker will be stopped and started again on the new leader (as the new leader has been reading the same metadata log and knows the status of files).

When a file is to be deleted, it makes sense to allow for some grace period and not delete it right away to allow consumers finish possible ongoing reads from them.

Orphan files

It’s possible that a broker uploads a WAL file, but fails to commit it with any DC and also fails to do on-the-spot cleanup. The most obvious example is a broker crash right after uploading. These files will be a dead weight, making users pay for the extra storage. To deal with these orphan files, a special background worker will periodically scan the remote storage in order to find such files and set them for deletion. The algorithm will be the following:

  1. Before each scan, the worker will ask each DC for the timestamp of the oldest file it has batches in. The grace period will be added to the oldest of all the timestamps, forming the timestamp threshold.
  2. The worker will scan the remote storage for the files older than the threshold.
  3. As an additional safety measure, the working will ask each DC whether particular files are known to it.
  4. If a file is not known to any DC, it will be physically deleted by the worker.

The scan frequency must be configurable. An orphan file is expected to be a rare event, so the default scan frequency should be correspondingly low.

Reducing multi-DC commit operations

Any broker can handle a Produce request for any Diskless partition. As mentioned above, this opens the possibility for a WAL file to contain data from partitions belonging to different Diskless coordinators. In this case, the broker will have to commit this file against multiple DCs. This situation is undesirable because in the worst case committing one WAL file will result in n_dcs (number of DCs) outbound network commit calls, which increases the chances for partial failure and higher tail latencies. This section is dedicated to alleviating this problem.

The problem is partially alleviated by the fact that one broker may host several DCs, so logical commit requests can be bundled together into fewer physical ones. However, the worst case is still bad enough: the upper bound of outbound network commit calls will be n_brokers (number of brokers).

If we direct producers the right way, we will be able to concentrate Diskless Produce requests so that brokers need to do fewer network commit calls. KIP-1163 proposes to expand the Metadata request/response for newer clients, particularly to add the PreferredProduceBrokers field. The value of this field will be dynamically calculated on the broker side.

For each Diskless coordinator, one or several brokers in each rack will be selected as “produce gateways”. This means, when a producer needs to send a Produce request to a Diskless partition managed by this DC, it’ll be encouraged to send it to particular brokers depending on the raсk, not just to any broker. 

Provided there’s enough brokers in the cluster, we can make sure that each broker is the produce gateway for only one DC (in its rack). Let’s consider some scenarios to clarify the idea.

Scenario 1: the number of brokers is less than the number of DCs.
Racks: 3. Brokers: 3. DCs: 12.
For each DC, we select one broker in each rack as the produce gateway. That means, each broker will be a gateway for 12 DCs (12 DCs * 1 * 3 racks / 3 brokers). However, there are fewer brokers than DCs and each broker will have to do only up to n_brokers outbound network commit calls per WAL file.

Scenario 2: same, just more brokers.
Racks: 3. Brokers: 6. DCs: 12.
For each DC, we select one broker in each rack as the produce gateway. That means, each broker will be a gateway for 6 DCs (12 DCs * 1 * 3 racks / 6 brokers). However, there’s fewer brokers than DCs and each broker will have to do only up to n_brokers outbound network commit calls per WAL file.

Scenario 3: the number of brokers is equal to the number of DCs.
Racks: 3. Brokers: 12. DCs: 12.
For each DC, we select one broker in each rack as the produce gateway. That means, each broker will be a gateway for 3 DCs (12 DCs * 1 * 3 racks / 12 brokers). Each broker will have to do up to n_brokers outbound network commit calls per WAL file.

Scenario 4: the number of brokers is greater than the number of DCs.
Racks: 3. Brokers: 24. DCs: 12.
For each DC, we select one broker in each rack as the produce gateway. That means, each broker will be a gateway for 2 DC (12 DCs * 1 * 3 racks / 24 brokers = 1.5). Each broker will have to do up to 2 outbound network commit calls per WAL file.

Scenario 5: same, just more brokers.
Racks: 3. Brokers: 36. DCs: 12.
For each DC, we select one broker in each rack as the produce gateway. That means, each broker will be a gateway for 1 DC (12 DCs * 1 * 3 racks / 36 brokers). Each broker will have to do up to 1 outbound network commit calls per WAL file.

Scenario 6: same, just more brokers.
Racks: 3. Brokers: 108. DCs: 12.
For each DC, we select three brokers in each rack as produce gateways. That means, each broker will be a gateway for 1 DC (12 DCs * 3 * 3 racks / 18 brokers). Each broker will have to do up to 1 outbound network commit calls per WAL file.

The number of produce gateways per DC per rack can be calculated as max(1, n_brokers / n_dcs / n_racks).

By this, provided we’re having enough brokers, each broker will have to do less than n_brokers outbound network commit calls per WAL file, going down to 1. The saturation point is where n_brokers = n_dcs * n_racks.

A further important optimization is possible. In scenarios before the saturation point (too few brokers), we can mirror co-location of DCs by co-location of their produce gateways. That means if DCs dc1, dc2, dc3 are co-located on some broker, their produce gateways should also be co-located on some brokers in other racks. With this, we lower the upper bound of outbound network commit calls per WAL file from n_brokers to n_racks.

And last optimization but not least is that sending requests to DCs on the same broker doesn’t need to be a network call, so the upper bound is in any case n_brokers-1 or n_racks-1. This matters because DC can be its own produce gateway in its own rack.

Public Interfaces

The proposed public interface changes are subject to change during the community discussion.

Diskless coordinator API

DisklessCreatePartitions

DisklessCreatePartitions
{
  "apiKey": 93,
  "type": "request",
  "listeners": ["controller"],
  "name": "DisklessCreatePartitionsRequest",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DisklessCreatablePartitions", "versions": "0+",
      "about": "Each topic where to create partitions.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The unique topic ID." },
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions to create for the topic." }
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait in milliseconds before timing out the request." }
  ]
}

{
  "apiKey": 93,
  "type": "response",
  "name": "DisklessCreatePartitionsResponse",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]DisklessCreatablePartitionsResult", "versions": "0+",
      "about": "Results for each topic.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The unique topic ID." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true,
        "about": "The error message, or null if there was no error." }
    ]}
  ]
}

DisklessDeleteTopics

DisklessDeleteTopics
{
  "apiKey": 94,
  "type": "request",
  "listeners": ["controller"],
  "name": "DisklessDeleteTopicsRequest",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DisklessDeletableTopicState", "versions": "0+",
      "about": "Each topic to delete.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The unique topic ID." }
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "The length of time in milliseconds to wait for the deletions to complete." }
  ]
}

{
  "apiKey": 94,
  "type": "response",
  "name": "DisklessDeleteTopicsResponse",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DisklessDeletableTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to delete.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The unique topic ID." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The deletion error, or 0 if the deletion succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
        "about": "The error message, or null if there was no error." }
    ]}
  ]
}

DisklessCommitFile

DisklessCommitFile
{
  "apiKey": 95,
  "type": "request",
  "listeners": ["controller"],
  "name": "DisklessCommitFileRequest",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ObjectKey", "type": "string", "versions": "0+",
      "about": "The object key identifying the file in object storage." },
    { "name": "ObjectFormat", "type": "int8", "versions": "0+",
      "about": "The format of the object (e.g. 1 = WRITE_AHEAD_MULTI_SEGMENT)." },
    { "name": "FileSize", "type": "int64", "versions": "0+",
      "about": "The total size of the file in bytes." },
    { "name": "OwnerIds", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
      "about": "The list of Diskless Coordinator IDs that share ownership of this file." },
    { "name": "Batches", "type": "[]DisklessCommitBatch", "versions": "0+",
      "about": "The batches contained in the file to commit.", "fields": [
      { "name": "Magic", "type": "int8", "versions": "0+",
        "about": "The record batch magic byte." },
      { "name": "RequestId", "type": "int32", "versions": "0+",
        "about": "The ID of the original produce request this batch belongs to." },
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique  ID of the topic this batch belongs to." },
      { "name": "PartitionIndex", "type": "int32", "versions": "0+",
        "about": "The partition index this batch belongs to." },
      { "name": "ByteOffset", "type": "int32", "versions": "0+",
        "about": "The byte offset of this batch within the file." },
      { "name": "Size", "type": "int32", "versions": "0+",
        "about": "The size of the batch in bytes." },
      { "name": "BaseOffset", "type": "int64", "versions": "0+",
        "about": "The base offset of the record batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0+",
        "about": "The last offset of the record batch." },
      { "name": "BatchMaxTimestamp", "type": "int64", "versions": "0+",
        "about": "The maximum timestamp in the batch." },
      { "name": "TimestampType", "type": "int8", "versions": "0+",
        "about": "The timestamp type (0 = CreateTime, 1 = LogAppendTime)." },
      { "name": "ProducerId", "type": "int64", "versions": "0+",
        "about": "The producer ID, or -1 if not set." },
      { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
        "about": "The producer epoch, or -1 if not set." },
      { "name": "BaseSequence", "type": "int32", "versions": "0+",
        "about": "The base sequence number, or -1 if not set." },
      { "name": "LastSequence", "type": "int32", "versions": "0+",
        "about": "The last sequence number, or -1 if not set." }
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait in milliseconds before timing out the request." }
  ]
}

{
  "apiKey": 95,
  "type": "response",
  "name": "DisklessCommitFileResponse",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Batches", "type": "[]DisklessCommitBatchResult", "versions": "0+",
      "about": "Results for each batch, in the same order as the request batches.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
        "about": "The error message, or null if there was no error." },
      { "name": "AssignedBaseOffset", "type": "int64", "versions": "0+",
        "about": "The base offset assigned by the coordinator, or -1 on error." },
      { "name": "LogAppendTime", "type": "int64", "versions": "0+",
        "about": "The log append timestamp, or -1 if not applicable." },
      { "name": "LogStartOffset", "type": "int64", "versions": "0+",
        "about": "The log start offset of the partition, or -1 on error." },
      { "name": "IsDuplicate", "type": "bool", "versions": "0+",
        "about": "True if this batch was a duplicate of an already committed batch." }
    ]}
  ]
}

DisklessDeleteRecords

DisklessDeleteRecords
{
  "apiKey": 96,
  "type": "request",
  "listeners": ["controller"],
  "name": "DisklessDeleteRecordsRequest",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DisklessDeleteRecordsTopic", "versions": "0+",
      "about": "Each topic that we want to delete records from.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DisklessDeleteRecordsPartition", "versions": "0+",
        "about": "Each partition that we want to delete records from.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
          "about": "The partition index." },
        { "name": "Offset", "type": "int64", "versions": "0+",
          "about": "The deletion offset. All records with offsets less than this value will be deleted." }
      ]}
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait for the deletion to complete, in milliseconds." }
  ]
}

{
  "apiKey": 96,
  "type": "response",
  "name": "DisklessDeleteRecordsResponse",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]DisklessDeleteRecordsTopicResult", "versions": "0+",
      "about": "Each topic that we wanted to delete records from.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DisklessDeleteRecordsPartitionResult", "versions": "0+",
        "about": "Each partition that we wanted to delete records from.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
          "about": "The partition index." },
        { "name": "LowWatermark", "type": "int64", "versions": "0+",
          "about": "The partition low water mark." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The deletion error code, or 0 if the deletion succeeded." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DisklessListOffsets

DisklessListOffsets
{
  "apiKey": 97,
  "type": "request",
  "listeners": ["controller"],
  "name": "DisklessListOffsetsRequest",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DisklessListOffsetsTopic", "versions": "0+",
      "about": "Each topic in the request.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DisklessListOffsetsPartition", "versions": "0+",
        "about": "Each partition in the request.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
          "about": "The partition index." },
        { "name": "Timestamp", "type": "int64", "versions": "0+",
          "about": "The timestamp to query. Use -2 for earliest, -1 for latest, -3 for max timestamp, -4 for earliest local, -5 for latest tiered." }
      ]}
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait in milliseconds before timing out the request." }
  ]
}

{
  "apiKey": 97,
  "type": "response",
  "name": "DisklessListOffsetsResponse",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]DisklessListOffsetsTopicResponse", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DisklessListOffsetsPartitionResponse", "versions": "0+",
        "about": "Each partition in the response.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." },
        { "name": "Timestamp", "type": "int64", "versions": "0+", "default": "-1",
          "about": "The timestamp associated with the returned offset." },
        { "name": "Offset", "type": "int64", "versions": "0+", "default": "-1",
          "about": "The returned offset." }
      ]}
    ]}
  ]
}

DisklessFindBatches

DisklessFindBatches
{
  "apiKey": 98,
  "type": "request",
  "listeners": ["controller"],
  "name": "DisklessFindBatchesRequest",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "FetchMaxBytes", "type": "int32", "versions": "0+",
      "about": "Maximum bytes to fetch across all partitions." },
    { "name": "MaxBatchesPerPartition", "type": "int32", "versions": "0+",
      "about": "Maximum number of batches per partition to return." },
    { "name": "Topics", "type": "[]DisklessFindBatchesTopic", "versions": "0+",
      "about": "Each topic in the request.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The topic ID." },
      { "name": "Partitions", "type": "[]DisklessFindBatchesPartition", "versions": "0+",
        "about": "Each partition in the request.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
          "about": "The partition index." },
        { "name": "Offset", "type": "int64", "versions": "0+",
          "about": "The offset to start fetching from." },
        { "name": "MaxPartitionFetchBytes", "type": "int32", "versions": "0+",
          "about": "Maximum bytes to fetch for this partition." }
      ]}
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait in milliseconds before timing out the request." }
  ]
}

{
  "apiKey": 98,
  "type": "response",
  "name": "DisklessFindBatchesResponse",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]DisklessFindBatchesTopicResponse", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
        "about": "The topic ID." },
      { "name": "Partitions", "type": "[]DisklessFindBatchesPartitionResponse", "versions": "0+",
        "about": "Each partition in the response.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition error code, or 0 if there was no error." },
        { "name": "LogStartOffset", "type": "int64", "versions": "0+", "default": "-1",
          "about": "The log start offset of the partition, or -1 if unknown." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+", "default": "-1",
          "about": "The high watermark of the partition, or -1 if unknown." },
        { "name": "Batches", "type": "[]DisklessFindBatchesBatchInfo", "versions": "0+",
          "about": "The batches found for this partition.", "fields": [
          { "name": "BatchId", "type": "int64", "versions": "0+",
            "about": "The batch ID." },
          { "name": "ObjectKey", "type": "string", "versions": "0+",
            "about": "The object key in object storage containing this batch." },
          { "name": "Magic", "type": "int8", "versions": "0+",
            "about": "The record batch magic byte." },
          { "name": "ByteOffset", "type": "int64", "versions": "0+",
            "about": "The byte offset of the batch within the object." },
          { "name": "ByteSize", "type": "int64", "versions": "0+",
            "about": "The size of the batch in bytes." },
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "The base offset of the record batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of the record batch." },
          { "name": "LogAppendTimestamp", "type": "int64", "versions": "0+",
            "about": "The log append timestamp." },
          { "name": "BatchMaxTimestamp", "type": "int64", "versions": "0+",
            "about": "The maximum timestamp in the batch." },
          { "name": "TimestampType", "type": "int8", "versions": "0+",
            "about": "The timestamp type (0 = CreateTime, 1 = LogAppendTime)." }
        ]}
      ]}
    ]}
  ]
}

DisklessDescribeFiles

DisklessDescribeFiles
{
  "apiKey": 100,
  "type": "request",
  "listeners": ["controller"],
  "name": "DisklessDescribeFilesRequest",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Files", "type": "[]DisklessDescribeFileEntry", "versions": "0+",
      "about": "Each file to describe.", "fields": [
      { "name": "ObjectKey", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The object key identifying the file in object storage." }
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait in milliseconds before timing out the request." }
  ]
}

{
  "apiKey": 100,
  "type": "response",
  "name": "DisklessDescribeFilesResponse",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Files", "type": "[]DisklessDescribeFileResult", "versions": "0+",
      "about": "Results for each file.", "fields": [
      { "name": "ObjectKey", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The object key identifying the file in object storage." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
        "about": "The error message, or null if there was no error." },
      { "name": "HasLiveBatches", "type": "bool", "versions": "0+",
        "about": "True if the file still has live batches tracked by this coordinator." }
    ]}
  ]
}

Metadata records

DisklessPartitionCreatedRecord

DisklessPartitionCreatedRecord
{
  "apiKey": 100,
  "type": "data",
  "name": "DisklessPartitionCreatedRecord",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID." },
    { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The topic name." },
    { "name": "PartitionIndex", "type": "int32", "versions": "0+",
      "about": "The partition index." }
  ]
}

DisklessTopicDeletedRecord

DisklessTopicDeletedRecord
{
  "apiKey": 101,
  "type": "data",
  "name": "DisklessTopicDeletedRecord",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID." }
  ]
}

DisklessFileCommittedRecord

TODO: optimize for storing fewer bytes per record

DisklessFileCommittedRecord
{
  "apiKey": 102,
  "type": "data",
  "name": "DisklessFileCommittedRecord",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ObjectKey", "type": "string", "versions": "0+",
      "about": "The object key identifying the file in object storage." },
    { "name": "ObjectFormat", "type": "int8", "versions": "0+",
      "about": "The format of the object (e.g. 1 = WRITE_AHEAD_MULTI_SEGMENT)." },
    { "name": "FileSize", "type": "int64", "versions": "0+",
      "about": "The total size of the file in bytes." },
    { "name": "UploaderBrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker that uploaded the file." },
    { "name": "OwnerIds", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
      "about": "The list of Diskless Coordinator IDs that share ownership of this file." },
    { "name": "Topics", "type": "[]DisklessCommittedTopic", "versions": "0+",
      "about": "Each topic with accepted batches in this file.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DisklessCommittedPartition", "versions": "0+",
        "about": "Each partition with accepted batches for this topic.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Batches", "type": "[]DisklessCommittedBatch", "versions": "0+",
          "about": "The accepted batches for this partition.", "fields": [
          { "name": "Magic", "type": "int8", "versions": "0+",
            "about": "The record batch magic byte." },
          { "name": "ByteOffset", "type": "int32", "versions": "0+",
            "about": "The byte offset of this batch within the file." },
          { "name": "ByteSize", "type": "int32", "versions": "0+",
            "about": "The size of the batch in bytes." },
          { "name": "AssignedBaseOffset", "type": "int64", "versions": "0+",
            "about": "The base offset assigned by the coordinator." },
          { "name": "AssignedLastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset assigned by the coordinator." },
          { "name": "LogAppendTimestamp", "type": "int64", "versions": "0+",
            "about": "The log append timestamp assigned by the coordinator." },
          { "name": "BatchMaxTimestamp", "type": "int64", "versions": "0+",
            "about": "The maximum timestamp in the batch." },
          { "name": "TimestampType", "type": "int8", "versions": "0+",
            "about": "The timestamp type (0 = CreateTime, 1 = LogAppendTime)." },
          { "name": "ProducerId", "type": "int64", "versions": "0+",
            "about": "The producer ID, or -1 if not set." },
          { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
            "about": "The producer epoch, or -1 if not set." },
          { "name": "BaseSequence", "type": "int32", "versions": "0+",
            "about": "The base sequence number, or -1 if not set." },
          { "name": "LastSequence", "type": "int32", "versions": "0+",
            "about": "The last sequence number, or -1 if not set." }
        ]}
      ]}
    ]}
  ]
}

DisklessRecordsDeletedRecord

DisklessRecordsDeletedRecord
{
  "apiKey": 103,
  "type": "data",
  "name": "DisklessRecordsDeletedRecord",
  // Version 0 is the initial version.
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID." },
    { "name": "PartitionIndex", "type": "int32", "versions": "0+",
      "about": "The partition index." },
    { "name": "NewLogStartOffset", "type": "int64", "versions": "0+",
      "about": "The new log start offset after deletion. All batches with last offset less than this value are removed." }
  ]
}

Configuration

Broker configuration

More to be added as discussion unrolls

ConfigurationDescriptionValues
diskless.metadata.topic.num.partitionsThe number of Diskless metadata partitions and the number of Diskless coordinators.
diskless.coordinator.orphan.scan.interval.msThe interval for scanning for orphan files.
diskless.coordinator.orphan.scan.grace.period.msThe grace period to give to deletable files.

Monitoring

More to be added as discussion unrolls

Compatibility, Deprecation, and Migration Plan

There’s no impact on existing users and no existing behavior will be changed, no migration will be required.

Test Plan

The feature will be thoroughly tested with unit, integration and system tests. We will also carry out performance testing both to understand the performance of the Diskless coordinator, and also to understand the impact on brokers with it.

Rejected Alternatives

Use external system

To keep Kafka self-sufficient, we don’t propose to use an external system for managing the Diskless metadata.

Use cluster KRaft quorum

It is possible to put Diskless metadata in the same KRaft quorum that manages the cluster metadata. This was rejected because this would mean mixing low-throughput relatively slow changing cluster metadata with high-throughput Diskless metadata, which may potentially create performance issues with such critical operations like partition leadership changes.

Use separate KRaft quorum

Instead of using the cluster KRaft quorum, it’s possible to run a separate quorum exclusively for Diskless metadata. The proposed solution relies on some useful mechanisms from KRaft like snapshotting. However, we don’t see any benefit in using the real KRaft mechanism over a normal Kafka topic for this task.


  • No labels