DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
The Kafka community is currently seeing an unprecedented situation with three KIPs (KIP-1150, IP-1176, KIP-1183) simultaneously addressing the same challenge of high replication costs when running Kafka across multiple cloud availability zones. Each KIP offers a different solution to this issue. While diversity of innovative ideas is a key strength of open-source projects, it creates a burden for reviewers and users who must compare and comment on multiple proposals simultaneously. Furthermore, discussion around the three KIPs has stalled for over two months now. This could be due to the authors being hesitant to proceed due to the existence of alternative, potentially conflicting, solutions. Addressing replication cost is a key concern of Kafka’s userbase and we should try to move the conversation forward if we can.
From what I understand, these three KIPs are not mutually exclusive. But adopting all three KIPs in the community might not be what we expect. Thus, I would like to start a discussion on how we could move the conversation forward.
To save time for the KIP readers/reviewers, I have created this document to help summarize each of the KIPs and describe their current status. Hope to get some suggestions/feedback from the community.
| Pros | Cons | |
|---|---|---|
| KIP-1150 | * has the most benefits to the users. -- most complete saving of cross zone network cost (enabled by leader less design) -- better durability (by leveraging block storage) -- best scalability (by separating data from the metadata) * clean architecture (no unnatural intrusive changes to existing code base) | large effort, but arguable this is what's needed to build a true cloud native architecture |
| KIP-1176 | * limited benefits to the users --saving of cross zone network cost (limited saving on the producer side) * small effort | * the current availability story is weak * it's not clear if the effort is still small once details on correctness, cost, cleanness are figured out |
| KIP-1183 | * moderate benefits to the users -- saving of cross zone network cost (limited saving on the producer side and the consumer side) -- better durability (by leveraging block storage) -- improved scalability | * weaker availability (no hot standby) * scalability not as good as KIP-1150 * effort to build the plugin is too large |
KIP-1150: Diskless Topics
- Data in diskless topics is durably stored solely in object storage, and not in segments on broker disks.
- Kafka delegates replication of diskless topics to object storage, and does not perform replication itself.
- All brokers are capable of interacting with all diskless topics, and no broker is uniquely considered the leader of a partition.
This KIP allows certain non-fundamental functional limitations between diskless and classic topics:
- Diskless topics will not immediately support compaction.
- Diskless topics will not immediately support transactional writes.
Data is produced to diskless topics in the following flow:
- Producers send Produce requests to any broker.
- The broker accumulates Produce requests in a buffer until exceeding some size or time limit.
- When enough data accumulates or the timeout elapses, the Broker creates a shared log segment and batch coordinates for all of the buffered batches.
- The shared log segment is uploaded to object storage and is written durably.
- The broker commits the batch coordinates with the Batch Coordinator (described in details in KIP-1164).
- The Batch Coordinator assigns offsets to the written batches, persists the batch coordinates, and responds to the Broker.
- The broker sends responses to all Produce requests that are associated with the committed object.
Append latency is broadly composed by:
- Buffering: up to 250ms or 8MiB (both configurable)
- Upload to Remote Storage: P99 ~200-400ms, P50 ~100ms
- Batch Coordinates Commit: P99 ~20-50ms, P50 ~10ms (depending on the number of batches)
We are aiming for a Produce request latency of P50 ~500ms P99 ~1-2 sec
Data is consumed from diskless topics in the following way:
- The consumer sends a Fetch request to the broker.
- The broker queries the Batch Coordinator for the relevant batch coordinates.
- The broker gets the data either from the object storage and/or from the cache.
- The broker injects the computed offsets and timestamps into the batches.
- The broker constructs and sends the Fetch response to the Consumer.
The Batch Coordinator is the source of truth about objects, partitions, and batches in diskless topics. It does the following:
- Chooses the total ordering for writes, assigning offsets without gaps or duplicates.
- Serves requests for log offsets.
- Serves requests for batch coordinates.
- Serves requests for atomic operations (creating partitions, deleting topics, records, etc.)
- Manages data expiry and soft deletion.
- Manages object physical deletion (performed by brokers).
A reference implementation that will be shipped with Kafka. It uses an internal, non-compacted topic called __diskless-metadata for storing diskless metadata.
For details on the planned implementation, please see the integral follow-up KIPs:
- KIP-1163: Diskless Core
- KIP-1165: Object Compaction for Diskless
- KIP-1164: Topic Based Batch Coordinator
- KIP-1181: Metadata Rack Awareness for Diskless Topics
- KIP-F: Cache Strategy
- KIP-O: Garbage collection for Diskless objects
Current Status (Aug. 5)
- The last comment in the discussion thread was in May.
- Concern: no design for supporting APIs like transactions and queues.
KIP-1176: Tiered Storage for Active Log Segment
In this KIP, the follower broker no longer directly reads the data from the leader broker during the FetchRequest/Response flow. Instead the data flows from the leader broker to the “fast object storage” and then to the follower broker without paying for across-AZ transfer cost. The recommended fast object storage are S3E1Z or EBS.
Figure1: The data replication is done via the fast cloud storage to avoid the cross-AZ network cost.
The produce with Acks=-1(all) flow is:
- The producer writes a batch to the leader broker
- The leader broker stores it, and sends the batch onto the fast cloud storage after a configured time interval.
- The follower sends a fetch request to the leader
- The leader responds with empty MemoryRecords + metadata info, like offsets, highwatermarks… etc.
- After receiving the fetch response, the follower node will fetch remote log metadata via an internal __remote_wal_log_metadata topic and then fetch from the fast cloud storage.
- After receiving the batches from the fast cloud storage, the follower sends fetch request to the leader node with the updated fetch offset
- After the leader receives the fetch requests from all the ISR nodes with the updated fetch offset and knows the data replication completes, then ack to the producer.
Figure2: The data replication flow
About cost and latency:
S3 (including S3E1Z) doesn't charge for across-AZ traffic (they do extra charge if it's across region), but the latency is longer if the data travels across AZ. S3E1z charges for S3 PUT (upload) and S3 GET (download), PUT is usually 10x more expensive than GET. So we don't pay for the AZ traffic cost but we do pay for S3 PUT and GET, so setting the right batch size and upload frequency are very important to not overrun the S3 PUT cost.
Current Status (Aug. 5)
- The last comment in discussion thread was on June 4
- No support for compacted topics as tiered storage feature
- No concerns so far
KIP-1183: Unified Shared Storage
This KIP aims to provide storage-related interfaces to support implementing a shared storage engine without affecting the current classic storage engine. It introduces a unified log layer for deploying Kafka on various shared storage mediums, offering the following benefits:
- A unified log layer that supports both current replication-based local file storage and shared storage simultaneously.
- Greater flexibility for major users or vendors to deploy Kafka on their own shared storage, as many large companies have their own DFS or HDFS infrastructure.
- Prevent community fragmentation, allowing users or vendors to extend the storage layer without complex modifications to Kafka, fostering greater community collaboration.
In addition to the abstraction log layer, this KIP also introduces a new interface to support different kinds of storage.
Current Status (Aug. 5)
- The last comment in the discussion thread in May.
- Concern: No clear design of the Stream Interface.
- From the author:
- In summary, KIP-1183 aims to discuss how the community views the impact of shared storage on the current architecture. Should we embrace it, and when?
- So, I think we should at least reach consensus on these two points:
- We should consider how to support shared storage, but the community needs to support both local disk and shared storage long-term.
- Which path should we take? The leaderless architecture of 1150 or the approach mentioned in 1183.


