Status

Current state: "Under Discussion"

Discussion thread: https://lists.apache.org/thread/k9ydcdyxdyfgoy73cgsgs8w46yhpxrw2

JIRA: 

Authors: xinyu<xinyuzhou@automq.com>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the cloud era, shared storage architecture is increasingly popular. Almost all cloud providers offer three types of shared storage services: block storage, file storage, and object storage. These services have many advantages over traditional local disks:

  • Cloud storage services provide an SLA, meaning they are services, not hardware, and can be used in production without extra effort.

  • Cloud storage services offer built-in durability, using multi-replicas or erasure coding technologies by default, so storage software built on cloud storage doesn't need to replicate data anymore.

  • Cloud storage services have a large shared resource pool, eliminating capacity management concerns and supporting a pay-as-you-go resource provision model.

Even in on-premise environments, shared storage software like MinIO, HDFS, Ceph, and CubeFS offers benefits similar to cloud storage services. Decoupling Apache Kafka's storage to these shared solutions can make Kafka stateless and easily scalable.

But running Kafka on shared storage services is challenging because its classic ISR-based storage engine is designed for local disks. For example, although Kafka can be deployed on file storage services like NetApp Files, it can't benefit from the durability and elasticity of shared file storage.

This KIP aims to make Kafka stateless, allowing it to run seamlessly on shared storage systems and transforming its architecture from shared nothing to shared storage. It introduces a unified log layer for deploying Kafka on various shared storage mediums, offering the following benefits:

  • A unified log layer that supports both current replication-based local file storage and shared storage simultaneously.

  • Greater flexibility for major users or vendors to deploy Kafka on their own shared storage, as many large companies have their own DFS or HDFS infrastructure.

  • Prevent community fragmentation, allowing users or vendors to extend the storage layer without complex modifications to Kafka, fostering greater community collaboration.

Public Interfaces


This KIP aims to provide storage-related interfaces to support implementing a shared storage engine without affecting the current classic storage engine. No public interfaces will be modified.

Proposed Changes


Essentially, we can use shared storage like S3 or NFS to enhance Kafka's scalability and eliminate cross-AZ traffic. Luckily, the community has made significant efforts to reduce cross-AZ traffic for producers (KIP-1123: Rack-aware partitioning for Kafka Producer) and consumers (KIP-392: Allow consumers to fetch from closest replica, KIP-881: Rack-aware Partition Assignment for Kafka Consumers). With Kafka on shared storage, data replication is unnecessary, eliminating cross-AZ replication traffic. Therefore, this KIP will focus on creating an abstract log layer to facilitate building stateless Kafka on shared storage, enhancing scalability.

To achieve this goal, we have two major steps:

  1. Refactor the log implementations by abstracting the log and log segment, allowing all managers like LogManager and LogCleaner to depend on the abstract log class instead of the specific UnifiedLog implementation.

  2. Define a new abstraction to mask the differences between various shared storage services, called Stream. We will use Stream to implement a SharedLog that extends the AbstractLog. Just as a File API bridges the gap between LocalLog and a hard disk, Stream will bridge the gap between SharedLog and shared storage services like S3.

Abstract Log and LogSegment


Currently, Kafka's storage engine heavily relies on UnifiedLog, which includes LocalLog and RemoteLog, and some logic even depends on local files. This makes current storage unsuitable for shared storage support.



This KIP proposes a new inheritance system for log classes:

  1. AbstractLog and AbstractLogSegment are basic abstract classes containing common interfaces and logic methods for both bog file logs and shared logs.

  2. All managers relying on UnifiedLog should be refactored to depend on AbstractLog.

  3. UnifiedLog and LogSegment will implement AbstractLog and AbstractLogSegment. This is the core of the current classic storage engine.

  4. A new log implementation, SharedLog and SharedLogSegment, will also depend on AbstractLog and AbstractLogSegment. We recommend using Stream APIs for shared log implementation, which can then be applied to S3 or other storage services. However, Stream is optional.

Optional Stream layer


We now have an abstract log layer enabling Kafka logs on any shared storage service. However, you can't simply transfer local files to shared storage, as object storage lacks a file API. To simplify SharedLog's complexity, this KIP introduces a suite of Stream APIs.

A stream is truly append-only data streaming with extremely simple APIs. Similar to how a log is divided into LogSegments, a stream consists of multiple StreamSlices, each being a short section of the stream.


public interface Stream {
    long streamId();

    long streamEpoch();

    long startOffset();

    long confirmOffset();

    long nextOffset();

    CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch);

    CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint);

    CompletableFuture<Void> trim(long newStartOffset);

    CompletableFuture<Void> close();
    
    CompletableFuture<Void> destroy();
}

We should transfer all log information to Streams. Typically, each log requires several Streams:

  1. Data Stream, which stores the log's data, similar to a data file.

  2. Time Stream, which stores the log's time index, like a time index file.

  3. Txn Stream, which stores the log's transactional information, similar to a txn index file.

  4. Meta Stream, similar to a KV store, will store all meta information, including the stream list of a shared log and the partition metadata, such as producer snapshots. When a partition is reassigned to another broker, the metadata should be reloaded.


Each partition now has a shared log with multiple streams, necessitating storage for relationship information to identify the meta stream's Stream ID for a specific partition. This KIP recommends using KRaft to store this data, requiring the addition of three KRaft controller APIs and a KV client implemented by KRaft.


public interface KVClient {
    CompletableFuture<Value> putKVIfAbsent(KeyValue keyValue);

    CompletableFuture<Value> putKV(KeyValue keyValue);

    CompletableFuture<Value> getKV(Key key);

    CompletableFuture<Value> delKV(Key key);
}

GET_KVS(ApiMessageType.GET_KVS, false, true),
PUT_KVS(ApiMessageType.PUT_KVS, false, true),
DELETE_KVS(ApiMessageType.DELETE_KVS, false, true),

Finally, we need a StreamClient to manage streams—creating, opening, closing, and destroying them—while following a partition's lifecycle. This will allow us to implement SharedLog and SharedLogSegment using this suite of Stream APIs.


public interface StreamClient {
    CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options);
    
    CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions options);
    
    CompletableFuture<Void> closeStream(long streamId, CloseStreamOptions options);

    CompletableFuture<Void> deleteStream(long streamId, DeleteStreamOptions options);
}

Once the Kafka community has this Stream layer, building Kafka on different shared storage mediums through a unified approach becomes possible. For any type of shared storage(such as S3-compatible storage, HDFS, NFS, etc.), we only need to implement the Stream API. This is still challenging, but we can focus on improving append and fetch efficiency for specific storage types. For example, to implement Stream on S3, we need to batch append requests, aggregate them from different streams, and commit them to S3. We also need to accelerate reading with mechanisms like read-ahead and data caching, and manage object metadata effectively.

In conclusion, this KIP is still not detailed enough. We'll start with an idea, and if the community provides positive feedback, we'll further refine how to abstract this API layer. Developers can also find more implementation details in the AutoMQ open-source repository(https://github.com/AutoMQ/automq).

Compatibility, Deprecation, and Migration Plan


It's designed for 100% compatibility without affecting existing users or the normal iteration of the classic ISR storage engine once we have this abstract layer.

Test Plan


TBD

Rejected/Related Alternatives


There are no rejected alternatives, but this KIP is inspired by KIP-1150 and KIP-1176, which aim to introduce cloud storage for Kafka to reduce replication and cross-AZ traffic costs. However, implementing Kafka on shared storage is challenging, with many details needing community discussion.

KIP-1176 offers an innovative solution to eliminate cross-AZ traffic via S3EOZ. It revolutionizes the replication channel of the current storage engine by introducing an 'S3 channel' to replace the existing 'direct channel' replication mechanism. KIP-1176 doesn't aim to address many shared-nothing architecture issues, so it may not be closely related to this KIP.

KIP-1150 was created against the background that industry vendors like WarpStream, AutoMQ, and Confluent have developed 'S3 Kafka' implementations and want to introduce similar concepts to the Apache Kafka community, which is a positive move. However, KIP-1150's solution, which focuses on a centralized coordinator, significantly conflicts with the community's current partitioned architecture. Many Kafka features depend on data locality, and implementing KIP-1150 requires redoing several tasks in the coordinator, such as ordering, deduplication, and transactions. Additionally, after KIP-1150's introduction, many future functions may need to be implemented twice under both architectures.

This KIP does not reject previous KIPs. We recognize the complexity of implementing a stateless Kafka with shared storage and the challenges in reaching a consensus on implementation. Before diving into detailed implementations, we should refactor Kafka's storage layer and define abstract interfaces for various log implementations. This will provide a unified approach to building Kafka on different shared storage mediums like S3-compatible storage, HDFS, NFS, and even customized DFS systems.

  • No labels