DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Authors: Greg Harris, Ivan Yurchenko, Jorge Quilcate, Giuseppe Lillo, Anatolii Popov, Juha Mynttinen, Josep Prat, Filip Yonov
Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-19161
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-1150: Diskless Topics introduces the concepts of diskless topics. This KIP describes the core functionality of diskless topics, such as the produce and consume paths.
Glossary
- Diskless Topic: A topic which does not append directly to block storage devices, and does not utilize direct replication.
- Classic Topic: A topic which appends directly to block storage devices and utilizes direct replication, the present Kafka status quo.
- Tiered Topic: A classic topic using KIP-405 Tiered Storage to move inactive segments off of block storage, but still appends to block storage devices.
- Diskless-Ready Cluster: An Apache Kafka cluster which is configured to host Diskless Topics in addition to Classic and Tiered topics.
- Broker Disk: A local block device available for a broker to store log directories on. May be direct-attached or network-attached.
- Object Storage: A shared, durable, concurrent, and eventually consistent storage supporting arbitrary sized byte values and a minimal set of atomic operations: put, delete, list, and ranged get.
- Object Key: A unique reference to an object within Object Storage.
- Broker/Client Rack: A hint provided to Apache Kafka brokers and clients about the underlying infrastructure topology. Data transfers within a rack are assumed to have lower costs and lower latency than transfers between racks. Failures are assumed to be less correlated across racks, so durable information is persisted across multiple racks. When Apache Kafka is deployed in a hyperscaler, this is typically synonymous with “Region”, “Availability Zone”, or “Zone”.
- Batch: A container for Kafka records and a unit of record representation in the network protocol and storage format in both classic and diskless topics.
- WAL Segment: An object containing a batches for one or more diskless topic-partitions on the object storage.
- Batch Coordinate: A reference to a record batch within a WAL Segment, at some byte range.
- Diskless Batch Coordinator: Component serving as source of truth about batch coordinates and WAL Segments. Establishes a global order of batches within a diskless topic, and serves coordinates to enable retrieval of batches.
- Hyperscaler/Cloud: Widely available cloud computing vendors offering compute, storage, and networking solutions.
Proposed Changes
Overview
The KIP proposes a new kind of topic in Kafka called diskless topic, which have the following properties and immediate effects:
- Data in diskless topics is durably stored in object storage at all times.
- Local segments on broker disks serve as caches and not sources of truth
- Remote storage may have higher latency than local disks, increasing the latency of Kafka requests and end-to-end data latency.
- Kafka delegates replication of diskless topics to object storage, and does not perform replication itself.
- Classic concepts of replication, ISR, and unclean leader elections do not apply.
- All operators can use efficient types of storage backends, such as ones with erasure coding.
- Hyperscaler operators can avoid most inter-zone data replication costs.
- All brokers are capable of interacting with all diskless topics, and no broker is uniquely considered the leader of a partition.
- Classic replica leader election for diskless topics does not affect produce request functionality, and all requests may use any broker as if it were the leader.
- Clusters are able to perform fine-grained client balancing across the cluster independently of topic/partition hot spots.
- Any broker may build a replica of any set of diskless partitions by contacting the batch coordinator, without any other replicas being live.
- Hyperscaler operators can avoid most inter-zone data ingress/egress costs.
Similar to Tiered Topics, Diskless Topics will not immediately support compaction. In the future, once Compaction for Tiered Topics is designed and implemented, Diskless topics will immediately support compaction.
In other functional aspects, diskless topics are indistinguishable from classic topics. This includes durability guarantees, ordering guarantees, transactional and non-transactional producer API, consumer API, consumer groups, share groups, data retention, and authorization mechanisms. Streaming applications can be moved from classic to diskless topics without changing client versions, application code, or experiencing correctness problems.
Because a single cluster can host diskless, classic, and tiered topics together, application developers are able to trade off latency and cost on a per-topic basis, while not incurring additional operational overhead of multiple clusters.
Data Flow
Diskless Topics differ from Classic Topics primarily in how appends are invoked on each replica.
Rather than appending to the leader immediately and waiting for fetches from replicas to initiate appends on all replicas, Diskless topics write data to Object Storage. This data is then retrieved by replicas, and finally appended to local segments.
In order to satisfy the requirement of multiple brokers writing to a single partition with a total order, it is necessary to decouple the handling of batch data and batch metadata. This permits low-cost replication of batch data with fewer consistency guarantees, while processing batch metadata and the state of the partition with strong consistency guarantees.
In classic topics, the broker serving the Produce request is responsible for:
- Validating the incoming data.
- Assigning offsets and timestamps.
- Injecting offsets/timestamps into the batch data.
- Writing batches to durable storage.
- Waiting for replication to complete.
- Returning the Produce response.
For diskless topics, the Batch Coordinator is now responsible for offset/timestamp assignment (2), and each replica is responsible for injecting the offsets into the batch data (3). These components provide the effect that fetched data sent to the consumer has the same structure as data appended to a classic segment.
In diskless topics, we separate storage I/O from batch ordering. The data being persisted to object storage has no implicit order, and ordering is delegated to the batch coordinates. Coordinates passed from each broker to the batch coordinator are locally ordered, and have an order only relative to other coordinates passed from that same broker. The batch coordinator performs the append and assigns a global order to the batch coordinates in each partition.
Data Format and WAL Segment Objects
The main unit of storage in Kafka is a record batch, and is the smallest unit of data which may be produced or consumed. Kafka groups these batches together into contiguous files to more efficiently manipulate them in the backing storage, while still preserving access to the individual batches. This remains true in diskless topics.
Batches for classic topics are grouped into Segments, which each contain batches from only a single partition, and batches in log segments always have offsets and timestamps applied. In contrast, data from multiple diskless topics and partitions are mixed in single objects, called WAL Segments. This is necessary to keep the object storage write operation costs reasonable.
The batches within a WAL Segment are ordered by their layout in the object, but do not have any inherent order relative to other WAL Segments. Their logical offsets and append timestamps may be unset.
By locating data from multiple partitions together, WAL Segments prevent the number of topic-partitions from influencing object read and write behavior, and reduce the total number of individual reads and writes to the backing storage.
Batches are grouped by partition into contiguous parts of the object. This improves the locality of later operations in the same way as segments do for classic topics. WAL Segment objects are given globally unique (e.g. UUID) names and uploaded without coordination or conflicts.
The WAL Segment objects are immutable, similar to inactive segments in classic and tiered topics. Once an object is uploaded, its content is associated with its name for the whole lifetime of the object. This also aligns with the typical interface of object storage where random writes to an object are not supported.
Produce path
Data is produced to diskless topics in the following way:
- Producers send Produce requests to any broker.
- The broker accumulates Produce requests in a buffer until exceeding some size or time limit.
- When enough data accumulates or the timeout elapses, the Broker creates a shared log segment and batch coordinates for all of the buffered batches.
- The shared log segment is uploaded to object storage and is written durably.
- The broker commits the batch coordinates with the Batch Coordinator (described in details in KIP-1164).
- The Batch Coordinator assigns offsets to the written batches, persists the batch coordinates, and responds to the Broker.
- The broker sends responses to all Produce requests that are associated with the committed object.
This is illustrated on the following sequence diagram:
Each broker accumulates its buffers and uploads objects with its requested offsets independently, with no coordination. However, the Batch Coordinator ensures that the batch coordinates are committed in order and the offsets are assigned correctly. Moreover, a broker can upload multiple objects in parallel, it only needs to commit them with the Batch Coordinator in their order.
Brokers will be able to process requests containing partitions from both classic and new diskless topics in the same batch. When responding to these mixed-typed requests, the broker will be delayed until the diskless partitions are committed.
Append latency is broadly composed by:
- Buffering: up to 250ms or 8MiB (both configurable)
- Upload to Remote Storage: P99 ~200-400ms, P50 ~100ms
- Batch Coordinates Commit: P99 ~20-50ms, P50 ~10ms (depending on the number of batches)
We are aiming for a Produce request latency of P50 ~500ms P99 ~1-2 sec
Error handling and segment object garbage collection
There may be errors during various steps of this process. In some cases, garbage objects (i.e. objects that are uploaded but not committed to the batch coordinator) may be left in the object storage. Sometimes, these objects cannot be cleaned up on the spot. Consider the following scenarios:
- The Broker fails to upload the object. No garbage object is created. The pending Produce requests receive responses with errors. It’s safe for the producers to retry.
- The Brokers succeeds in uploading the object, tries to commit it, and couldn’t receive a response from the batch coordinator. A garbage object is left on the object storage. It’s not safe to clean it on the spot without consulting the batch coordinator, which may not be available. The garbage cleanup is delegated to the separate asynchronous process. The pending Produce requests receive responses with errors. It’s safe for the producers to retry provided they use idempotent produce.
The concrete details of the asynchronous garbage collection process is out of scope of this KIP and will be described in a separate KIP.
Caching in local segments
Once data is persisted in WAL segments and the batch coordinator, the next step is to populate a cache with the data in its finalized order as determined by the batch coordinator. Each broker will be assigned as a replica for a subset of partitions in the cluster, using the existing KRaft metadata mechanisms. The broker will issue a read operation for each object containing at least one replicated partition. Once the read completes, the broker will inject record offsets as determined by the batch coordinator, and append the finalized batch to a local log segment on-disk. This will build the typical indexes used to optimize fetch queries and interactions with segments.
Distinct from Classic topics, these local segments are not required to be durable, as durable storage of data is delegated to the batch coordinator and WAL segments.
Consume path
Data is consumed from Diskless topics from the local segments on-disk. This is essentially the same as consuming from a follower for classic topics. It also permits use of the kernel page cache, reducing memory pressure from the consume path to be allocated for the produce path buffers, and uses indexes built during the cache building step. This decouples the Fetch request rate from the batch coordinator operation rate.
Moving to Tiered Storage
In order to control the number of many small WAL files, the batch coordinator memory size, and the rebuild time of local segments after a replica change, data is moved to Tiered Storage using the existing KIP-405 interfaces. Existing implementations of tiered storage RSM and RLMM can be used as-is, with no knowledge of the presence or absence of the Diskless produce path in front.
In order to use the existing interfaces, one broker per partition is chosen as the leader. This leader is permitted to copy segments to tiered storage. Replicas are responsible for serving fetch requests from tiered segments, similar to classic topics.
Effectively, Diskless becomes a front-end for Local Segments, and Tiered Storage becomes the back-end, providing the same benefits for optimizing storage usage for both Diskless and Classic topics.
Batch deletion
There are various reasons a batch may require deletion:
- the local size and/or time retention settings of the topic;
- the partition truncation by the user;
- deletion of a topic.
Since objects are immutable, logical batch deletion doesn’t change the object content. For each object, it’s necessary to track its effectively used size. This size decreases by the batch size when a batch is deleted from the object. When the used size becomes 0, the object could be actually deleted.
Object deletion will be implemented as an asynchronous operation. First, the object is marked for deletion by the batch coordinator and a broker performs the actual deletion from the object storage. This has two advantages:
- The Batch Coordinator doesn’t need to be concerned about the object storage, to have credentials and the code to access it.
- The deletion could have a grace period. The grace period is a useful way to allow potentially in-progress Fetch requests to successfully finish.
To enforce the time and size retention settings of topics, a background process will periodically check metadata of diskless topics and logically delete affected batches.
It’s quite possible that for compliance reasons, a particular batch has a deadline for physical deletion. Despite the batch being logically deleted, its data are located in an object that may be kept alive by other batches, potentially forever. Therefore, batches with physical deletion deadlines must be either moved to isolated files during merging, or additional merge passes will be necessary to physically delete the data. This problem is addressed in KIP-1165.
Public Interfaces
Plugin Interfaces
Apache Kafka will provide pluggable interfaces which are intended to abstract functionality that may require contacting an external service.
Batch Coordinator
This interface abstracts the batch order consensus mechanism needed to establish the order of batches within each partition and generally keep track of the metadata. This interface is out of scope for this KIP and discussed in KIP-1164
Storage Backend
This interface abstracts the access methods to the object storage.
public record ByteRange(long offset, long size) {}
public interface ObjectStorage extends Configurable {
/**
* Uploads an object to object storage.
* <p>An exception must be thrown in case the number of bytes streamed from
* {@code inputStream} is different from {@code length}.
* @param key key of the object to upload.
* @param inputStream data of the object that will be uploaded.
* @param length length of the data that will be uploaded.
* @throws StorageBackendException if there are errors during the upload.
*/
void upload(ObjectKey key, InputStream inputStream, long length)
throws StorageBackendException;
/**
* Fetch a range of an object from object storage.
* <p>This result should be cached for later access
* @param key key of the object to fetch.
* @param range range of bytes within the object.
* @throws StorageBackendException if there are errors during the fetch.
*/
InputStream fetch(ObjectKey key, ByteRange range)
throws StorageBackendException;
/**
* Idempotently delete multiple objects.
* <p>If an object doesn't exist, the operation should still succeed
* to be idempotent.
* @param keys keys of objects to delete.
* @throws StorageBackendException if there are other errors which prevent deletion.
*/
void delete(Set<ObjectKey> keys)
throws StorageBackendException;
}
Apache Kafka will not provide a production-grade implementation of this interface, because this would require Kafka depending on third party storage drivers. The list of potential storage is big and it’s not possible to provide implementations for all of them. Implementations will be provided by 3rd party developers in the Kafka ecosystem.
Broker Configurations
diskless.system.enable: enables diskless support on a broker.diskless.storage.class.name: the object storage class name for diskless topics.diskless.storage.class.path: the object storage class path for diskless topics.diskless.batch.coordinator.class.name: the batch coordinator class name for diskless topics.diskless.batch.coordinator.class.path: the batch coordinator class path for diskless topics.diskless.append.commit.interval.ms: defines how long to wait before closing a shared log segment object to be uploaded/committed.diskless.append.buffer.max.bytes: defines the maximum size of a shared log segment object before closing it for further upload/commit.
Topic Configurations
diskless.enable: Sets a topic as a diskless topic. Diskless topics can only be defined at creation time (updating a classic topic to a diskless topic is out of scope.)
Client Configurations
Producer:
client.rack: Similar to Consumer's usage of client rack on the follower fetching feature, it defines the producer rack to align with brokers on the same rack.
Monitoring
The following metrics may be useful for operators:
- Object upload:
- count and rates;
- object size average and percentiles;
- upload traffic;
- latency;
- errors.
- Object commit:
- count and rates;
- latency;
- errors.
- Read:
- count and rates;
- GET requests per Fetch request.
Command line tools
Existing tools to be adapted:
kafka-topics.shmust support the new topic configuration on creation and as a filter to list diskless topics only.kafka-dump-log.shmust support Shared Log Segment files as input and parse its content correctly.
kafka-diskless-metadata.sh is a new tool, which does the following:
- Diskless topic overview:
- the offsets;
- the size on the object storage;
- the total size of object where the topic is part of;
- the total Size of objects where topic was part of but batches are deleted;
- Getting object metadata, including object key, total size, used size.
Compatibility, Deprecation, and Migration Plan
Existing users upgrading to a version of Kafka with support for diskless writes will not experience any change in behavior. All broker and topic configurations will have defaults which are consistent with the existing storage model. This will be a backwards-compatible upgrade. These users may downgrade without additional steps.
Users which configure diskless brokers but no diskless topics may experience failures related to those configurations if they are invalid (e.g. a plugin is not installed, or backing service is unavailable). If the configuration passes validation and the brokers are able to start, users will experience no change in behavior. These users may downgrade without additional steps.
Users which configure diskless brokers and diskless topics will be able to produce and consume data with the same semantics and consistency model as traditional topics (save for the explicitly outlined exceptions/limitations), but with a higher latency than expected. These users will need to migrate away from and delete all diskless topics before downgrading.
Test Plan
We need to create new integration tests that would cover the following:
- Create a diskless topic, write and read from it → Check that data is written to object storage stub (MinIO / Localstack).
- Set different commit intervals and check they are written within an error threshold.
- Set different buffer max bytes values and check they are held within an error threshold.
We need to create new systems tests that are end to end but stubbed with MinIO where:
- Have a cluster running with both types of topics.
- Have a cluster running with diskless topics, stop the cluster and restart (no data loss).
- Have a cluster with only diskless topics and monitor that disk is only used for metadata.
Various failure scenarios (e.g. the Batch Coordinator or Object Storage are inaccessible or partially accessible) should be tested extensively, with the focus primarily on correctness.
The majority of the existing produce and consume tests should be adapted to diskless topics by parameterizing the topic configuration. However, some of the current tests fall in the “known limitations and exceptions”.
Documentation Plan
These are the areas where we need to add documentation:
- Quick start needs to be enhanced to explain how to create diskless topics.
- New configuration settings need to be documented (in Java class).
- Expand subsection 3.2 – Topic Configs to explain the new configuration option for topic creation.
- Add a new subsection under section 4 – Design. Something like 4.11 – Diskless topics architecture.
- Add a new subsection under section 6 – Operations. Something like 6.13 – Diskless topics operations. This would include:
- A brief explanation of what the feature entails.
- A detailed explanation of the new config options.
- Some deployment configurations and options (hybrid vs segregated brokers).
Rejected Alternatives
Use Tiered Storage as-is
Tiered storage only affects the behavior of inactive segments, while active segments continue to use block storage and replication. It is possible to avoid inter-zone replication by setting replication.factor=1, but causes the topic to inherit the durability of block storage, which may experience correlated failures within a single rack.
Tiered Storage as-is forces a trade-off between durability and replication costs.
Using “Aggressive Tiering”, an operator may configure their cluster to roll active segments quickly, reducing the total active segment size. This has the effect of also reducing the window that data is stored non-durably, such that it might be possible to delay acks until a segment is uploaded to tiered storage. However, as tiered storage uploads individual segments, these requests can incur a large I/O cost. In many cases, this will negate the positive effects from shrinking or eliminating the block storage.
Tiered Storage as-is forces a trade-off between durability and excessive I/O overhead.
Tiered topics still have leaders, and under the Kafka protocol, producers must produce to the leader, even if the leader is in a different rack, incurring cross-rack transfer costs. To mitigate this, all producers and brokers can be placed in a single rack.
Tiered storage as-is forces a trade-off between availability and producer ingress costs.
As a summary, it can be seen that the inter-AZ traffic can be avoided but with the cost of losing availability of producers and brokers and availability and durability of topic data. The local storage requirement can be minimized with certain penalties explained earlier but not fully eliminated. The authors are not aware of any mechanism to improve tiered storage to eliminate these shortcomings.
Tiered Storage metadata being abstracted to segment granularity and segments being limited to a single partition prevents the overall cost optimization of the system, and forces operators to sacrifice very important guarantees in order to unlock cost savings. Diskless topics maintain these guarantees, while still unlocking cost savings.
File system driver
One may argue that we could keep the Kafka code unchanged or minimally changed, but instead use a sophisticated file system driver. Brokers would use the local broker disk backed by the driver and write segment files and other file types and read from them normally. The driver would be responsible for transferring the IO to the object storage. The native Kafka replication could be prevented by setting replication.factor=1. This approach comes with a number of challenges that make us reject it.
Mapping block storage operations to object storage directly is inefficient, and requires some assumptions about access patterns. For example, object storages don’t widely support random writes or appends, so objects would need to be overwritten often. This would result in poor performance and high costs in hyperscalers which have a per-request cost.
There are log-structured file systems implemented on top of object storage, but these are either proprietary or immature. And because they operate on a lower-level abstraction than Diskless Shared Log Segments, they are not able to take advantage of application-specific access behaviors and optimizations.
A filesystem driver also does not cover the rack awareness functionality, and modifying the code would still be required to implement a similar feature. Kafka may also need to be aware of some external component as low level as the file system driver to collaborate with it and have brokers which are able to access data they have not replicated. Kafka is not prepared to manage multiple writers to a single local filesystem.
This type of solution would be less accessible and portable between operating systems. Kernel-space or user-space drivers need to be implemented and supported for every operating system that Kafka supports. This also requires a specific skill set that may not be widely represented in the present Kafka community.





