Authors: Greg Harris, Ivan Yurchenko, Jorge Quilcate, Giuseppe Lillo, Anatolii Popov, Juha Mynttinen, Josep Prat, Filip Yonov

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-19161

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-1150: Diskless Topics introduces the concepts of diskless topics. This KIP describes the core functionality of diskless topics, such as the produce and consume paths.

Glossary

  • Diskless Topic: A topic which does not append directly to block storage devices, and does not utilize direct replication.
  • Classic Topic: A topic which appends directly to block storage devices and utilizes direct replication, the present Kafka status quo.
  • Tiered Topic: A classic topic using KIP-405 Tiered Storage to move inactive segments off of block storage, but still appends to block storage devices.
  • Diskless-Ready Cluster: An Apache Kafka cluster which is configured to host Diskless Topics in addition to Classic and Tiered topics.
  • Broker Disk: A local block device available for a broker to store log directories on. May be direct-attached or network-attached.
  • Object Storage: A shared, durable, concurrent, and eventually consistent storage supporting arbitrary sized byte values and a minimal set of atomic operations: put, delete, list, and ranged get.
  • Object Key: A unique reference to an object within Object Storage.
  • Broker/Client Rack: A hint provided to Apache Kafka brokers and clients about the underlying infrastructure topology. Data transfers within a rack are assumed to have lower costs and lower latency than transfers between racks. Failures are assumed to be less correlated across racks, so durable information is persisted across multiple racks. When Apache Kafka is deployed in a hyperscaler, this is typically synonymous with “Region”, “Availability Zone”, or “Zone”.
  • Batch: A container for Kafka records and a unit of record representation in the network protocol and storage format in both classic and diskless topics.
  • WAL Segment: An object containing a batches for one or more diskless topic-partitions on the object storage.
  • Batch Coordinate: A reference to a record batch within a WAL Segment, at some byte range.
  • Diskless Coordinator: Component serving as source of truth about batch coordinates and WAL Segments. Establishes a global order of batches within a diskless topic, and serves coordinates to enable retrieval of batches.
  • Hyperscaler/Cloud: Widely available cloud computing vendors offering compute, storage, and networking solutions.

Proposed Changes

Overview

The KIP proposes a new kind of topic in Kafka called diskless topic, which have the following properties and immediate effects:

  1. Data in diskless topics is durably stored in object storage at all times. 
    1. Local segments on broker disks serve as caches and not sources of truth
    2. Remote storage may have higher latency than local disks, increasing the latency of Kafka requests and end-to-end data latency.
  2. Kafka delegates replication of diskless topics to object storage, and does not perform replication itself.
    1. Replicas placement is still used to control client traffic and cache placement
    2. Any broker may build a replica of any set of diskless partitions by contacting the diskless coordinator, lowering load on other brokers and eliminating unclean leader elections.
    3. All operators can use efficient types of storage backends, such as ones with erasure coding.
    4. Hyperscaler operators can avoid most inter-zone data replication costs.
  3. All brokers are capable of interacting with all diskless topics, and produce requests do not need to be handled by the partition leader.
    1. Produce requests are preferentially served by replicas of the partition, and do not need to be directed to the partition leader.
    2. Partition leaders are still elected to upload to manage the ISR state, upload to tiered storage, and handle share fetches.
    3. Clusters are able to perform fine-grained client balancing across the cluster independently of topic/partition hot spots.
    4. Hyperscaler operators can avoid most inter-zone data ingress/egress costs.

In other functional aspects, diskless topics are indistinguishable from classic topics. This includes durability guarantees, ordering guarantees, transactional and non-transactional producer API, consumer API, consumer groups, share groups, data retention (deletion & compact), and authorization mechanisms.

All existing APIs for Classic topics will also be supported with equivalent semantics for Diskless topics. Streaming applications can be moved from classic to diskless topics without changing client versions, application code, or experiencing correctness problems.

Because a single cluster can host diskless, classic, and tiered topics together, application developers are able to trade off latency and cost on a per-topic basis, while not incurring additional operational overhead of multiple clusters.

Data Flow

Diskless Topics differ from Classic Topics primarily in how appends are invoked on each replica.

Rather than appending to the leader immediately and waiting for fetches from replicas to initiate appends on all replicas, Diskless topics write data to Object Storage. This data is then retrieved by replicas, and finally appended to local segments.

In order to satisfy the requirement of multiple brokers writing to a single partition with a total order, it is necessary to decouple the handling of batch data and batch metadata. This permits low-cost replication of batch data with fewer consistency guarantees, while processing batch metadata and the state of the partition with strong consistency guarantees.

In classic topics, the broker serving the Produce request is responsible for:

  1. Validating the incoming data.
  2. Assigning offsets and timestamps.
  3. Injecting offsets/timestamps into the batch data.
  4. Writing batches to durable storage.
  5. Waiting for replication to complete.
  6. Returning the Produce response.

For diskless topics, the Diskless Coordinator is now responsible for offset/timestamp assignment (2), and each replica is responsible for injecting the offsets into the batch data (3). These components provide the effect that fetched data sent to the consumer has the same structure as data appended to a classic segment.

 

In diskless topics, we separate storage I/O from batch ordering. The data being persisted to object storage has no implicit order, and ordering is delegated to the batch coordinates. Coordinates passed from each broker to the diskless coordinator are locally ordered, and have an order only relative to other coordinates passed from that same broker. The diskless coordinator performs the append and assigns a global order to the batch coordinates in each partition.

Data Format and WAL Segment Objects

The main unit of storage in Kafka is a record batch, and is the smallest unit of data which may be produced or consumed. Kafka groups these batches together into contiguous files to more efficiently manipulate them in the backing storage, while still preserving access to the individual batches. This remains true in diskless topics.

Batches for classic topics are grouped into Segments, which each contain batches from only a single partition, and batches in log segments always have offsets and timestamps applied. In contrast, data from multiple diskless topics and partitions are mixed in single objects, called WAL Segments. This is necessary to keep the object storage write operation costs reasonable.

The batches within a WAL Segment are ordered by their layout in the object, but do not have any inherent order relative to other WAL Segments. Their logical offsets and append timestamps may be unset.

By locating data from multiple partitions together, WAL Segments prevent the number of topic-partitions from influencing object read and write behavior, and reduce the total number of individual reads and writes to the backing storage.

 

Batches are grouped by partition into contiguous parts of the object. This improves the locality of later operations in the same way as segments do for classic topics. WAL Segment objects are given globally unique (e.g. UUID) names and uploaded without coordination or conflicts.

The WAL Segment objects are immutable, similar to inactive segments in classic and tiered topics. Once an object is uploaded, its content is associated with its name for the whole lifetime of the object. This also aligns with the typical interface of object storage where random writes to an object are not supported.

A 1-byte header is prepended to the beginning of the WAL Segment, before any batch data. This header byte should always be set to 0, to indicate the version of the WAL Segment. Future KIPs may extend this header with additional fields, and increment the version number. The information present in this header will not be necessary to interpret the batch data within the remainder of the WAL Segment.

Produce path

Data is produced to diskless topics in the following way:

  1. Producers send Produce requests to any broker.
  2. The broker accumulates Produce requests in a buffer until exceeding some size or time limit.
  3. When enough data accumulates or the timeout elapses, the Broker creates a shared log segment and batch coordinates for all of the buffered batches.
  4. The shared log segment is uploaded to object storage and is written durably.
  5. The broker commits the batch coordinates with the Diskless Coordinator (described in details in KIP-1164).
  6. The Diskless Coordinator assigns offsets to the written batches, persists the batch coordinates, and responds to the Broker.
  7. The broker sends responses to all Produce requests that are associated with the committed object.

This is illustrated on the following sequence diagram:

 

Each broker accumulates its buffers and uploads objects with its requested offsets independently, with no coordination. However, the Diskless Coordinator ensures that the batch coordinates are committed in order and the offsets are assigned correctly. Moreover, a broker can upload multiple objects in parallel, it only needs to commit them with the Diskless Coordinator in their order.

Brokers will be able to process requests containing partitions from both classic and new diskless topics in the same batch. When responding to these mixed-typed requests, the broker will be delayed until the diskless partitions are committed.

Append latency is broadly composed by:
- Buffering: up to 250ms or 4MiB (both configurable)
- Upload to Remote Storage: P99 ~200-400ms, P50 ~100ms
- Batch Coordinates Commit: P99 ~20-50ms, P50 ~10ms (depending on the number of batches)

We are aiming for a Produce request latency of P50 ~500ms P99 ~1-2 sec

Error handling and segment object garbage collection

There may be errors during various steps of this process. In some cases, garbage objects (i.e. objects that are uploaded but not committed to the diskless coordinator) may be left in the object storage. Sometimes, these objects cannot be cleaned up on the spot. Consider the following scenarios:

  1. The Broker fails to upload the object. No garbage object is created. The pending Produce requests receive responses with errors. It’s safe for the producers to retry.
  2. The Brokers succeeds in uploading the object, tries to commit it, and couldn’t receive a response from the diskless coordinator. A garbage object is left on the object storage. It’s not safe to clean it on the spot without consulting the diskless coordinator, which may not be available. The garbage cleanup is delegated to the separate asynchronous process. The pending Produce requests receive responses with errors. It’s safe for the producers to retry provided they use idempotent produce.

The concrete details of the asynchronous garbage collection process is out of scope of this KIP and will be described in a separate KIP.

Idempotent Produce and Transactions

Diskless topics will support idempotent produce and transactions as classic topics do. The support for these features in classic topics depends on the interactions between three actors: clients (mostly producers, but also consumers), partition leaders, and transaction coordinators. In classic topics, many pieces of state important for these features are local on partition leaders (the log content being the source of truth). In Diskless topics, batch metadata is detached from data and is stored in the Diskless Coordinator. Therefore, we aim to preserve the high-level flows for idempotent produce and transactions, replacing partitions leaders with the Diskless Coordinator when needed.

Transactions will be able to span both Diskless and classic topics. We propose to focus on v2 and newer transaction versions only.

The concrete details of the support for these features is out of scope of this KIP and will be described in a separate KIP.

Replicas

Diskless topics will have the same concept of replicas as classic topics, managed by the KRaft controller.

Data on replicas are not filled by inter-broker replication, but instead by downloading data from the remote storage based on metadata in the Diskless coordinator. All replicas will monitor the log tail (active part) in order to learn about newly appended batches, download them, and append to their local log.

Despite that there is no inter-broker replication, replicas will still issue FetchRequest to leaders. Leaders will respond with empty (no records) FetchResponse. This is the mechanism for leaders to track ISR.

Local segments on replicas will be the caching mechanism, not the persistence mechanism. It will benefit the fetch performance, but still is safe to lose.

“In-sync replica” for Diskless topics means “in sync with the Diskless coordinator” (which is the source of truth) and not with the leader. An important implication and difference to the classic topics is that a leader is just another replica from the point of view of being caught-up with the latest data and can also be out-of-sync. This means:

  • the leader can’t perform its duties such as offloading segments to the tiered storage efficiently;
  • it’s not recommended to read from it (like from any other out-of-sync replica).

The replica reassignment will follow the same flow as in classic topic:

  1. New replicas are added to the partition's replica set.
  2. The controller waits for the new replicas to become in-sync.
  3. Old replicas are removed from the partition's replica set.

When a replica is assigned to a broker, it needs to start filling its local log from some point. There are various possible policies, such as:

  1. Start building the local log from the earliest non-tiered offset.
  2. Start building it from the latest available offset.
  3. Start building it from some configurable amount, e.g. 100 MiB prior to the current log end or earliest available offset, whichever is later.

1 may take a long time if there is a large amount of data in the Diskless Coordinator. 2 may not work if the partition is idle (the local log will never be built on new replicas). Because of this, we propose to follow approach number 3.

In contrast to replicas, leaders must build the local log from the beginning in order to offload segments to the tiered storage. To not make compromises between fetch performance and this duty, caching and segment offloading to the tiered storage may happen independently: the cached segments will be built by the common rules and the tiered storage offloading will build its segments from the log beginning.

  • When a replica is the elected leader and does not yet have a segment containing the earliest available offset, it will open a new segment starting at the earliest available offset. This segment will be open concurrently with the active segment.
  • When a replica loses leadership, it will not evacuate any partially-built segments immediately, but let them expire according to local retention configurations. This permits leadership to be lost and recovered quickly without restarting downloading.

Consume path

Diskless will rely on the existing follower fetching mechanism (broker returning PreferredReadReplica) in order to allow consumers to read from replicas in their racks and avoid inter-rack network traffic costs. In contrast to classic topics, non-ISR replicas will not be excluded. Instead, they will be deprioritezed and used only if no in-sync replicas are available in the client rack.

The replicas will strive to serve responses from the local segments on disk. However, there are two exceptions when direct read from the remote storage will be used:

  1. When the replica is not in-sync, i.e. there's a long gap between the log end in the Diskless Coordinator and the local log.
  2. When the requested offset is older than the earliest offset available locally.

A direct read from remote storage involves:

  1. The broker determines a fetch for a partition cannot be served from local segments
  2. The broker queries the Diskless Coordinator for the relevant batch coordinates.
  3. The broker gets the data either from the object storage.
  4. The broker injects the computed offsets and timestamps into the batches in-memory.
  5. The broker constructs and sends the Fetch response to the Consumer.

Depending on the structure of the shared log segment, a single Fetch request could be satisfied by one GET request, or may need multiple requests.

Queues

In contrast to normal Fetch requests, ShareFetch requests can only be served by the share-partition leader, which is co-located with the partition leader and manages internal state, such as record state.

Because of this, in the scope of this KIP, queue consumers will have to send their ShareFetch requests to partition leaders and will not be able to always benefit from consumer inter-rack traffic elimination.

Opportunistic rack-awareness for queues is out of scope of this KIP, and will be described in a separate KIP.

Moving to Tiered Storage


In order to control the number of many small WAL files, the diskless coordinator memory size, and the rebuild time of local segments after a replica change, data is moved to Tiered Storage using the existing KIP-405 interfaces. Existing implementations of tiered storage RSM and RLMM can be used as-is, with no knowledge of the presence or absence of the Diskless produce path in front.

The partition leader will use existing tiered storage interfaces to copy constructed segments to tiered storage. Replicas are responsible for serving fetch requests from tiered segments, similar to classic topics.

Effectively, Diskless becomes a front-end for Local Segments, and Tiered Storage becomes the back-end, providing the same benefits for optimizing storage usage for both Diskless and Classic topics.

The concrete details of this feature are out of scope of this KIP and will be described in a separate KIP.

Cluster Metadata

Any broker is able to serve Produce and Fetch requests for any Diskless topics. However, in many cases arbitrary selection of the request receiver by the client will not be optimal from the performance point of view. We need a way to let the clients choose brokers to serve their Produce and Fetch requests optimally. Only the partition leader can serve ShareFetch requests. Taking into account these requirements and also ones that other KIPs from the Diskless initiative (e.g. KIP-1164) may have, we propose the following.

We introduce the version 14 of MetadataRequest and MetadataResponse, which will allow the clients to send brokers their rack.id and receive back more information about Diskless topics, such as whether the topic is Diskless (IsDiskless) and what is/are recommended brokers to produce to a particular Diskless partition (PreferredProduceBrokers). The definition of these is provided below in the Public Interfaces section.

Newer clients will use the new API version 14 to send and receive this information. No behavior changes expected regarding classic topics. For Diskless topics, newer clients will do the following:

  1. Send Produce requests to recommended produce brokers (in order and subject to availability).
  2. Send Fetch requests as for classic topics: either to the leader or to the closest replica based on the rack.
  3. Send ShareFetch requests as for classic topics, to the leader.

Older clients will use previous API versions 0-13 and the broker will not be able to tell them whether the topic is Diskless and what are the recommended brokers for producing. To allow Kafka users with older clients and also with third-party clients (which may have their own schedule of adding support for new features) to benefit from Diskless, we will allow them to add “,diskless_rack_id=<rack_id>” to their client ID. The Client ID is always transferred to the broker and the broker will take this information into account. It will respond with the matching version of the MetadataResponse, however, it will modify the true partition metadata for Diskless partitions. If the partition has replicas in the matching rack, the LeaderId will be replaced with one of them. Thus, this client will be able to produce and consume data within the rack and avoid inter-rack network traffic costs. This, however, will prevent ShareFetch requests from being served, because only the real leader can serve them. The way out of this is to specify diskless_rack_id for producers and normal consumers and don’t specify for share consumers. The latter must be separate instances in the code.

If an older client doesn’t provide diskless_rack_id to the broker, it will receive the true metadata for Diskless partitions and will be able to use the full spectrum of Kafka API with them, albeit not necessarily optimally from the point of view of inter-rack traffic reduction.

Batch deletion

There are various reasons a batch may require deletion:

  • the local size and/or time retention settings of the topic;
  • the partition truncation by the user;
  • deletion of a topic.

Since objects are immutable, logical batch deletion doesn’t change the object content. For each object, it’s necessary to track its effectively used size. This size decreases by the batch size when a batch is deleted from the object. When the used size becomes 0, the object could be actually deleted.

Object deletion will be implemented as an asynchronous operation. First, the object is marked for deletion by the diskless coordinator and a broker performs the actual deletion from the object storage. This has two advantages:

  1. The Diskless Coordinator doesn’t need to be concerned about the object storage, to have credentials and the code to access it.
  2. The deletion could have a grace period. The grace period is a useful way to allow potentially in-progress Fetch requests to successfully finish.

To enforce the time and size retention settings of topics, a background process will periodically check metadata of diskless topics and logically delete affected batches.

It’s quite possible that for compliance reasons, a particular batch has a deadline for physical deletion. Despite the batch being logically deleted, its data are located in an object that may be kept alive by other batches, potentially forever. Therefore, batches with physical deletion deadlines must be either moved to isolated files during merging, or additional merge passes will be necessary to physically delete the data. This problem is addressed in KIP-1165.

Public Interfaces

Plugin Interfaces: Storage Backend

This interface abstracts the access methods to the object storage.

public record ByteRange(long offset, long size) {}

public interface ObjectStorage extends Configurable {
    /**
     * Uploads an object to object storage.
     * <p>An exception must be thrown in case the number of bytes streamed from
     * {@code inputStream} is different from {@code length}.
     * @param key                      key of the object to upload.
     * @param inputStream              data of the object that will be uploaded.
     * @param length                   length of the data that will be uploaded.
     * @throws StorageBackendException if there are errors during the upload.
     */
    void upload(ObjectKey key, InputStream inputStream, long length)
        throws StorageBackendException;


    /**
     * Fetch a range of an object from object storage.
     * <p>This result should be cached for later access
     * @param key                      key of the object to fetch.
     * @param range                    range of bytes within the object.
     * @throws StorageBackendException if there are errors during the fetch.
     */
    InputStream fetch(ObjectKey key, ByteRange range)
        throws StorageBackendException;

    /**
     * Idempotently delete multiple objects.
     * <p>If an object doesn't exist, the operation should still succeed	
     * to be idempotent.
     * @param keys                     keys of objects to delete.
     * @throws StorageBackendException if there are other errors which prevent deletion.
     */
    void delete(Set<ObjectKey> keys)
        throws StorageBackendException;
}

Apache Kafka will not provide a production-grade implementation of this interface, because this would require Kafka depending on third party storage drivers. The list of potential storage is big and it’s not possible to provide implementations for all of them. Implementations will be provided by 3rd party developers in the Kafka ecosystem.

Configurations

Broker Configurations

  • diskless.system.enable: enables diskless support on a broker. 
  • diskless.storage.class.name: the object storage class name for diskless topics. 
  • diskless.storage.class.path: the object storage class path for diskless topics.
  • diskless.append.commit.interval.ms: defines how long to wait before closing a shared log segment object to be uploaded/committed. 
  • diskless.append.buffer.max.bytes: defines the maximum size of a shared log segment object before closing it for further upload/commit.

Topic Configurations

  • diskless.enable: Sets a topic as a diskless topic. Diskless topics can only be defined at creation time (updating a classic topic to a diskless topic is out of scope.)

Client Configurations

Producer:

  • client.rack: Similar to Consumer's usage of client rack on the follower fetching feature, it defines the producer rack to align with brokers on the same rack.

Broker API

Diskless Coordinator

A new set of APIs will be required to allow brokers to contact the diskless coordinator.

These new requests are out of scope for this KIP, and will be fully defined in KIP-1164: Diskless Coordinator

Client Metadata

We propose to create the version 14 of the Metadata API with the following definition:

MetadataRequest:

{
 "apiKey": 3,
 "type": "request",
 "listeners": ["broker"],
 "name": "MetadataRequest",
 "validVersions": "0-14",
 "flexibleVersions": "9+",
 "fields": [
   // In version 0, an empty array indicates "request metadata for all topics."  In version 1 and
   // higher, an empty array indicates "request metadata for no topics," and a null array is used to
   // indicate "request metadata for all topics."
   //
   // Version 2 and 3 are the same as version 1.
   //
   // Version 4 adds AllowAutoTopicCreation.
   //
   // Starting in version 8, authorized operations can be requested for cluster and topic resource.
   //
   // Version 9 is the first flexible version.
   //
   // Version 10 adds topicId and allows name field to be null. However, this functionality was not implemented on the server.
   // Versions 10 and 11 should not use the topicId field or set topic name to null.
   //
   // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed
   // by the DescribeCluster API (KIP-700).
   // Version 12 supports topic Id.
   // Version 13 supports top-level error code in the response.
   // Version 14 supports Diskless and allows to send client.rack
   { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
     "about": "The topics to fetch metadata for.", "fields": [
     { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
     { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "nullableVersions": "10+",
       "about": "The topic name." }
   ]},
   { "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false,
     "about": "If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so." },
   { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8-10",
     "about": "Whether to include cluster authorized operations." },
   { "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+",
     "about": "Whether to include topic authorized operations." },
   { "name": "RackId", "type":  "string", "versions": "14+", "default": "", "ignorable": true,
     "about": "Rack ID of the client making this request."}
 ]
}


Here, RackId is the new field.

MetadataResponse:


{
 "apiKey": 3,
 "type": "response",
 "name": "MetadataResponse",
 // Version 1 adds fields for the rack of each broker, the controller id, and whether or not the topic is internal.
 //
 // Version 2 adds the cluster ID field.
 //
 // Version 3 adds the throttle time.
 //
 // Version 4 is the same as version 3.
 //
 // Version 5 adds a per-partition offline_replicas field. This field specifies
 // the list of replicas that are offline.
 //
 // Starting in version 6, on quota violation, brokers send out responses before throttling.
 //
 // Version 7 adds the leader epoch to the partition metadata.
 //
 // Starting in version 8, brokers can send authorized operations for topic and cluster.
 //
 // Version 9 is the first flexible version.
 //
 // Version 10 adds topicId.
 //
 // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
 // by the DescribeCluster API (KIP-700).
 // Version 12 supports topicId.
 // Version 13 supports top-level error code in the response.
 // Version 14 supports Diskless.
 "validVersions": "0-13",
 "flexibleVersions": "9+",
 "fields": [
   { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
     "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
   { "name": "Brokers", "type": "[]MetadataResponseBroker", "versions": "0+",
     "about": "A list of brokers present in the cluster.", "fields": [
     { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId",
       "about": "The broker ID." },
     { "name": "Host", "type": "string", "versions": "0+",
       "about": "The broker hostname." },
     { "name": "Port", "type": "int32", "versions": "0+",
       "about": "The broker port." },
     { "name": "Rack", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true, "default": "null",
       "about": "The rack of the broker, or null if it has not been assigned to a rack." }
   ]},
   { "name": "ClusterId", "type": "string", "nullableVersions": "2+", "versions": "2+", "ignorable": true, "default": "null",
     "about": "The cluster ID that responding broker belongs to." },
   { "name": "ControllerId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, "entityType": "brokerId",
     "about": "The ID of the controller broker." },
   { "name": "Topics", "type": "[]MetadataResponseTopic", "versions": "0+",
     "about": "Each topic in the response.", "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The topic error, or 0 if there was no error." },
     { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "12+",
       "about": "The topic name. Null for non-existing topics queried by ID. This is never null when ErrorCode is zero. One of Name and TopicId is always populated." },
     { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
       "about": "The topic id. Zero for non-existing topics queried by name. This is never zero when ErrorCode is zero. One of Name and TopicId is always populated." },
     { "name": "IsInternal", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
       "about": "True if the topic is internal." },
     { "name": "Partitions", "type": "[]MetadataResponsePartition", "versions": "0+",
       "about": "Each partition in the topic.", "fields": [
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The partition error, or 0 if there was no error." },
       { "name": "PartitionIndex", "type": "int32", "versions": "0+",
         "about": "The partition index." },
       { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
         "about": "The ID of the leader broker." },
       { "name": "LeaderEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": true,
         "about": "The leader epoch of this partition." },
       { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
         "about": "The set of all nodes that host this partition." },
       { "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
         "about": "The set of nodes that are in sync with the leader for this partition." },
       { "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true, "entityType": "brokerId",
         "about": "The set of offline replicas of this partition." },
       { "name": "PreferredProduceBrokers", "type": "[]int32", "versions": "14+", "ignorable": true, "entityType": "brokerId",
         "about": "The ordered list of brokers to which the client is recommended to send Produce requests." }
     ]},
     { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648",
       "about": "32-bit bitfield to represent authorized operations for this topic." },
     { "name": "IsDiskless", "type": "bool", "versions": "14+", "default": "false", "ignorable": true,
       "about": "True if the topic is diskless." }
   ]},
   { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648",
     "about": "32-bit bitfield to represent authorized operations for this cluster." },
   { "name": "ErrorCode", "type": "int16", "versions": "13+", "ignorable": true,
     "about": "The top-level error code, or 0 if there was no error." }
 ]
}


Here, the topic definition has the new field IsDiskless and the partition definition has the new field PreferredProduceBrokers.

Monitoring

The following metrics may be useful for operators:

  1. Object upload:
    1. count and rates;
    2. object size average and percentiles;
    3. upload traffic;
    4. latency;
    5. errors.
  2. Object commit:
    1. count and rates;
    2. latency;
    3. errors.
  3. Read:
    1. count and rates;
    2. GET requests per Fetch request.

Command line tools

Existing tools to be adapted:

  • kafka-topics.sh must support the new topic configuration on creation and as a filter to list diskless topics only.
  • kafka-dump-log.sh must support Shared Log Segment files as input and parse its content correctly.

kafka-diskless-metadata.sh is a new tool, which does the following:

  1. Diskless topic overview:
    • the offsets;
    • the size on the object storage;
    • the total size of object where the topic is part of;
    • the total Size of objects where topic was part of but batches are deleted;
  2. Getting object metadata, including object key, total size, used size.

Compatibility, Deprecation, and Migration Plan

Existing users upgrading to a version of Kafka with support for diskless writes will not experience any change in behavior. All broker and topic configurations will have defaults which are consistent with the existing storage model. This will be a backwards-compatible upgrade. These users may downgrade without additional steps.

Users which configure diskless brokers but no diskless topics may experience failures related to those configurations if they are invalid (e.g. a plugin is not installed, or backing service is unavailable). If the configuration passes validation and the brokers are able to start, users will experience no change in behavior. These users may downgrade without additional steps.

Users which configure diskless brokers and diskless topics will be able to produce and consume data with the same semantics and consistency model as traditional topics (save for the explicitly outlined exceptions/limitations), but with a higher latency than expected. These users will need to migrate away from and delete all diskless topics before downgrading.

Test Plan

We need to create new integration tests that would cover the following:

  • Create a diskless topic, write and read from it → Check that data is written to object storage stub (MinIO / Localstack).
  • Set different commit intervals and check they are written within an error threshold.
  • Set different buffer max bytes values and check they are held within an error threshold.

We need to create new systems tests that are end to end but stubbed with MinIO where:

  • Have a cluster running with both types of topics.
  • Have a cluster running with diskless topics, stop the cluster and restart (no data loss).
  • Have a cluster with only diskless topics and monitor that disk is only used for metadata.

Various failure scenarios (e.g. the Diskless Coordinator or Object Storage are inaccessible or partially accessible) should be tested extensively, with the focus primarily on correctness.

The majority of the existing produce and consume tests should be adapted to diskless topics by parameterizing the topic configuration. However, some of the current tests fall in the “known limitations and exceptions”.

Documentation Plan

These are the areas where we need to add documentation:

  • Quick start needs to be enhanced to explain how to create diskless topics.
  • New configuration settings need to be documented (in Java class).
  • Expand subsection 3.2 – Topic Configs to explain the new configuration option for topic creation.
  • Add a new subsection under section 4 – Design. Something like 4.11 – Diskless topics architecture.
  • Add a new subsection under section 6 – Operations. Something like 6.13 – Diskless topics operations. This would include:
    • A brief explanation of what the feature entails.
    • A detailed explanation of the new config options.
    • Some deployment configurations and options (hybrid vs segregated brokers).

Rejected Alternatives

Use Tiered Storage as-is

Tiered storage only affects the behavior of inactive segments, while active segments continue to use block storage and replication. It is possible to avoid inter-zone replication by setting replication.factor=1, but causes the topic to inherit the durability of block storage, which may experience correlated failures within a single rack.

Tiered Storage as-is forces a trade-off between durability and replication costs.

Using “Aggressive Tiering”, an operator may configure their cluster to roll active segments quickly, reducing the total active segment size. This has the effect of also reducing the window that data is stored non-durably, such that it might be possible to delay acks until a segment is uploaded to tiered storage. However, as tiered storage uploads individual segments, these requests can incur a large I/O cost. In many cases, this will negate the positive effects from shrinking or eliminating the block storage.

Tiered Storage as-is forces a trade-off between durability and excessive I/O overhead.

Tiered topics still have leaders, and under the Kafka protocol, producers must produce to the leader, even if the leader is in a different rack, incurring cross-rack transfer costs. To mitigate this, all producers and brokers can be placed in a single rack.

Tiered storage as-is forces a trade-off between availability and producer ingress costs.

As a summary, it can be seen that the inter-AZ traffic can be avoided but with the cost of losing availability of producers and brokers and availability and durability of topic data. The local storage requirement can be minimized with certain penalties explained earlier but not fully eliminated. The authors are not aware of any mechanism to improve tiered storage to eliminate these shortcomings.

Tiered Storage metadata being abstracted to segment granularity and segments being limited to a single partition prevents the overall cost optimization of the system, and forces operators to sacrifice very important guarantees in order to unlock cost savings. Diskless topics maintain these guarantees, while still unlocking cost savings.

File system driver

One may argue that we could keep the Kafka code unchanged or minimally changed, but instead use a sophisticated file system driver. Brokers would use the local broker disk backed by the driver and write segment files and other file types and read from them normally. The driver would be responsible for transferring the IO to the object storage. The native Kafka replication could be prevented by setting replication.factor=1. This approach comes with a number of challenges that make us reject it.

Mapping block storage operations to object storage directly is inefficient, and requires some assumptions about access patterns. For example, object storages don’t widely support random writes or appends, so objects would need to be overwritten often. This would result in poor performance and high costs in hyperscalers which have a per-request cost.

There are log-structured file systems implemented on top of object storage, but these are either proprietary or immature. And because they operate on a lower-level abstraction than Diskless Shared Log Segments, they are not able to take advantage of application-specific access behaviors and optimizations.

A filesystem driver also does not cover the rack awareness functionality, and modifying the code would still be required to implement a similar feature. Kafka may also need to be aware of some external component as low level as the file system driver to collaborate with it and have brokers which are able to access data they have not replicated. Kafka is not prepared to manage multiple writers to a single local filesystem.

This type of solution would be less accessible and portable between operating systems. Kernel-space or user-space drivers need to be implemented and supported for every operating system that Kafka supports. This also requires a specific skill set that may not be widely represented in the present Kafka community.

Coordinator-less approaches

It seems possible to partially or even fully eliminate inter-zone traffic without introducing the centralized Diskless coordinator for batch management. A number of proposals were made, such as:

  1. Make partition leaders coordinators of their own partitions (proposed here). Any broker can handle Produce requests. Partition leaders write batch metadata to the log instead of data. Followers replicate the metadata log as usual. Data is consumed from remote storage.
  2. Produce requests are handled by partition leaders, but remote storage is the replication medium (proposed in KIP-1176).
  3. Produce requests are handled by any broker and remote storage is the replication medium (proposed here).

These approaches fully or partially achieve the goal of inter-zone traffic elimination set by KIP-1150. They lean more towards the classic topic design, while the current KIP takes a more "revolutionary" approach, which brings additional benefits. In this KIP, stateful (coordinator) and stateless (replicas) components are explicitly separated. This allows better flexibility and scalability for current and potential future tasks. For example, stateless replicas can be added and removed from the cluster easily without the need to rebalance data stored on local disks, because data is stored on object storage. Object storages are also normally have better durability than local disks. The potential of the new design could be developed further, e.g. towards using serverless compute available in cloud environments for various cluster tasks. This, however, remains out of scope of this KIP.



  • No labels