You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Authors: Sriharsha Chintalapani, Suresh Srinivas


Status

Current State: Draft

Discussion Thread:

JIRA KAFKA-7739 - Getting issue details... STATUS


Motivation

Kafka is an important part of data infrastructure and is seeing a significant adoption and growth. As the Kafka cluster size grows and more data is stored in Kafka for a longer duration, several issues related to scalability, efficiency, and operations become important to address.

Kafka stores the messages in append-only log segments on local disks on Kafka brokers. The retention period for the log is based on `log.retention` that can be set system-wide or per topic. Retention gives the guarantee to consumers that even if their application failed or was down for maintenance, it can come back within the retention period to read from where it left off without losing any data.

The total storage required on a cluster is proportional to the number of topics/partitions, the rate of messages, and most importantly the retention period. A Kafka broker typically has a large number of disks with the total storage capacity of 10s of TBs. The amount of data locally stored on a Kafka broker presents many operational challenges. When a broker fails, the failed node is replaced by a new node. The new node must copy all the data that was on the failed broker from other replicas. Similarly, when a new Kafka node is added to scale the cluster storage, cluster rebalancing assigns partitions to the new node which also requires copying a lot of data. The time for recovery and rebalancing is proportional to the amount of data stored locally on a Kafka broker. In setups that have many Kafka clusters running 100 to 1000s of brokers, a node failure is a common occurrence (one node per day) with a lot of time spent in recovery making operations difficult and time-consuming.

Reducing the amount of data stored on each broker can reduce the recovery/rebalancing time. But it would also necessitate reducing the log retention period impacting the time available for application maintenance and failure recovery.


Solution - Tiered storage for Kafka

Kafka data is mostly consumed in a streaming fashion using tail reads. Tail reads leverage OS's page cache to serve the data instead of disk reads. Older data is typically read from the disk for backfill or failure recovery purposes and is infrequent.

In tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. Local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. The new remote tier uses systems, such as HDFS or S3 to store the completed log segments. Two separate retention periods are defined corresponding to each of the tiers. With remote tier enabled, the retention period for the local tier can be significantly reduced from days to few hours. The retention period for remote tier can be much longer, days or even months. When a log segment is rolled on the local tier, it is copied to the remote tier along with the corresponding offset index. Applications that are latency sensitive perform tail reads and are served from local tier leveraging the existing Kafka mechanism of efficiently using page cache to serve the data. Backfill and other applications recovering from a failure that need data older than what is in the local tier are served from the remote tier.

This reduces the amount of data stored locally on Kafka brokers and hence the amount of data that needs to be copied during recovery and rebalancing. Log segments that are available in the remote tier need not be restored on the broker or restored lazily and are served from the remote tier. With this, increasing the retention period no longer requires scaling the Kafka cluster storage and the addition of new nodes. At the same time, the overall data retention can still be much longer eliminating the need for separate data pipelines to copy the data from Kafka to external stores, as done currently in many deployments.


Goal

Extend Kafka's storage beyond the local storage available on the Kafka cluster by retaining the older data in an external store, such as HDFS or S3 with minimal impact on the internals of Kafka. Kafka behavior and operational complexity must not change for existing users that do not have tiered storage feature configured.


High-level design



Remote Log Manager (RLM) is a new component that copies the completed LogSegments and corresponding OffsetIndex to remote tier.


* An interface will be introduced using which different RLM implementations that perform integration with different types of remote storages can be made available. RLM implementation code can also be kept outside Kafka core if the community chooses so.
* RLM has two modes:
* RLM Leader - In this mode, RLM that is the leader for topic-partition, checks for rolled over LogSegments and copies it along with OffsetIndex to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. Additionally, RLM leader also serves the read requests for older data from the remote tier.
* RLM Follower - In this mode, RLM keeps track of the segments and index files on remote tier and updates its RemoteLogSegmentIndex file per topic-partition. RLM follower does not serve reading old data from the remote tier.


Core Kafka changes

To satisfy the goal of keeping Kafka changes minimal when RLM is not configured, Kafka behavior remains unchanged for existing users.


* Core Kafka starts RLM service if tiered storage is configured
* When an offset index is not found, if RLM is configured, the read request is delegated to RLM to serve the data from the remote tier.


Serving Data from Remote Storage

 Approach 1

RLM will ship all the LogSegments and the corresponding OffsetIndex to RemoteStorage. A new index file, **<code>RemoteLogSegmentIndex,</code></strong> is maintained locally on the Kafka broker per topic-partition like all the existing index files are stored today as shown below:   

 
{log.dirs}/{topic-partition}/0000001000121.remoteindex



Remote index maintains the LogSegment starting offsets for a completed segment in 4-byte number relative to the starting offset of remote index file as shown below:

FileName : 00000001000121.remoteindex 

Contents:
SegmentStartOffset
1000121 
1500024 
2000011 
…
2999999

FileName : 00000003000000.remoteindex 
Contents:
SegmentStartOffset
3000000 
3500024 
4000011 
…



RLM maintains these RemoteLogSegmentIndexes per topic-partition in local files on the Kafka broker. These files are rolled on a periodic basis with starting index of first LogSegment in the file name. Note that the RemoteLogSegmentIndex can be constructed by listing all the log segments stored on the remote storage. Maintaining a local file is an optimization to avoid such listing operations that may be slow and expensive depending on the external store. RemoteLogSegmentIndex files are MMAP'ed files and will follow a similar binary search mechanism as OffsetIndex files to find a LogSegment to serve a read operation.

On `OutOfRangeOffsetException`, ReplicaManager delegates the read request to RLM which does the following:


1. RLM performs a binary search on the memory mapped `RemoteLogSegmentIndex` file to find the starting offset of a LogSegment that has the requested offset.
1. RLM uses starting offset to build the file names (or object names) of LogSegment and OffsetIndex.
1. RLM fetches the segment and offset files on demand and seeks into LogSegment for the requested offset and serves the data to the client.


Approach 2

As in approach 1, RLM stores LogSegment and OffsetIndex in remote storage. In addition, it also stores an additional copy of OffsetIndex files in the local storage to avoid reading the offsets from the remote storage. This approach will efficiently seek ahead of time instead of fetching the entire LogSegment file from the remote.


Public Interfaces

Compacted topics will not have remote storage support. 


Configs

System-Wide
Per Topic Configuration
  • remote.retention.period

  • remote.retention.bytes


RemoteLogManager (RLM)

RemoteLogManager is a new interface added to the broker. It is responsible to copy the completed log segments to the remote storage and update RemoteOffsetIndex file. The default implementation of interface supports HDFS as remote storage. Additional remote storage support, such as S3 can be added later by providing other implementations using the configuration `Remote.log.manager.class`.

Only the broker that is the Leader for topic-partitions is allowed to copy to the remote storage.



Replica Manager

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.

If the broker changes its state from Leader to Follower for a topic-partition and RLM is in the process of copying the segment, it will finish the copy before it relinquishes the copy for topic-partition. This might leave duplicated messages

ReplicaManager.readLocalLog works as it does today. But only in case of OffsetOutOfRange of exception and RLM is configured we will delegate the read request to RLM which returns LogReadResult

def readFromLocaLog(): Seq[(TopicPartition, LogReadResult)] = {
catch {
case e@ (_: OffsetOutOfRangeException) =>
    RemoteLogManager.read(fetchMaxBytes: Int,hardMaxBytesLimit: Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota)
}



Proposed Changes

When an RLM class is configured and all the required configs are present, RLM will send a list of topic-partitions and invoke the

RemoteLogManager.addTopicPartitions .This function's responsibility is to monitor the log.dirs for the given topic-partitions and copy the rolled over LogSegment to the configured remote storage. Once a LogSegment is copied over it should mark it as done.


Log Retention

Log retention will continue to work as it is today except for one case, where If a LogSegment is in the process of getting copied over and it doesn't have associated "copy-done" file, LogCleaner will skips these LogSegments until it has the marker to denote its copied over to remote and its safe to delete.


Fetch Requests

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager.readFromRemoteLog and returns the LogReadResult.


Alternatives considered

Following alternatives were considered:


  1. Replace all local storage with remote storage - Instead of using local storage on Kafka brokers, only remote storage is used for storing log segments and offset index files. While this has the benefits related to reducing the local storage, it has the problem of not leveraging the page cache for efficient tail reads as done in Kafka today.

  2. Implement Kafka API on another store - This is an approach that is taken by some vendors where Kafka API is implemented on a different distributed, scalable storage (example HDFS). Such an option does not leverage Kafka other than API compliance and requires the much riskier option of replacing entire Kafka cluster with another system.

  3. Client directly reads remote log segments from the remote storage - The log segments on the remote storage can be directly read by the client instead of serving it from Kafka broker. This reduces Kafka broker changes and has benefits of removing an extra hop. However, this bypasses Kafka security completely, increases Kafka client library complexity and footprint and hence is not considered.
  • No labels