This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-405: Kafka Tiered Storage

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Tiered storage does not replace ETL pipelines and jobs. Existing ETL pipelines continue to consume data from Kafka as is, albeit with data in Kafka having a much longer retention period.

...


Remote Log Manager (RLM) is a new component that copies the completed LogSegments and corresponding OffsetIndex to remote tier.

  • RLM component will keep tracks track of topic-partition and its segments. It will delegate the copy and read of these segments to pluggable storage manager implementation.
  • RLM has two modes:
    • RLM Leader - In this mode, RLM that is the leader for topic-partition, checks for rolled over LogSegments and copies it along with OffsetIndex to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. Additionally, the RLM leader also serves the read requests for older data from the remote tier.
    • RLM Follower - In this mode, RLM keeps track of the segments and index files on remote tier and updates its RemoteLogSegmentIndex file per topic-partition. RLM follower does not serve reading old data from the remote tier.

...

After successfully copied a segment file to remote storage, RLM will append a set of index entries to 3 local index files: remotelogindexremoteLogIndex, remoteoffsetindexremoteOffsetIndex, remotetimeindexremoteTimeIndex. These index files are rotated by RLM at a configurable time interval (or a configurable size).

(active segment)

{log.dirs}/{topic-partition}/0000002400013.index

{log.dirs}/{topic-partition}/0000002400013.timeindex

{log.dirs}/{topic-partition}/0000002400013.log


(inactive segments)

{log.dirs}/{topic-partition}/0000002000238.index

{log.dirs}/{topic-partition}/0000002000238.timeindex

{log.dirs}/{topic-partition}/0000002000238.log

{log.dirs}/{topic-partition}/0000001600100.index

{log.dirs}/{topic-partition}/0000001600100.timeindex

{log.dirs}/{topic-partition}/0000001600100.log


(active remote segment)

{log.dirs}/{topic-partition}/0000001000121.remoteoffsetindexremoteOffsetIndex

{log.dirs}/{topic-partition}/0000001000121.remotetimeindexremoteTimeIndex

{log.dirs}/{topic-partition}/0000001000121.remotelogindexremoteLogIndex


(inactive remote segments)

{log.dirs}/{topic-partition}/0000000512002.remoteoffsetindexremoteOffsetIndex

{log.dirs}/{topic-partition}/0000000512002.remotetimeindexremoteTimeIndex

{log.dirs}/{topic-partition}/0000000512002.remotelogindexremoteLogIndex



Each index entry of the remotelogindex remoteLogIndex file contains the information of a sequence of records in the remote log segment file. The format of a remotelogindex remoteLogIndex entry:

magic: int16 (current magic value is 0)

length: int16 (length of this entry)

crc: int32 (checksum from firstOffset to the end of this entry)

firstOffset: int64 (the Kafka offset of the 1st record)

lastOffset: int64 (the Kafka offset of the last record)

firstTimestamp: int64

lastTimestamp: int64

dataLength: int32 (length of the remote data)

rdiLength: int16

rdi: byte[] (Remote data identifier)

...

Depends on the implementation, RLM may append 1 or more entries to the remotelogindex remoteLogIndex file for each remote segment file. More entries will provide fine-grained indexing of the remote data with the cost of local disk space.

...

Remoteoffsetindex file and remotetimestampindex remoteTimestampIndex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they point to the index in the corresponding remotelogindex remoteLogIndex file instead of a log segment file.

...

System-Wide
  • remote.log.storage.enable = false (to support backward compatibility)
  • remote.log.storage.manager.class.name =  org  org.apache.kafka.logrsm.HdfsRemoteStorageManagerhdfs.HDFSRemoteStorageManager
RemoteStorageManager

(These configs are dependent on remote storage manager implementation)

  • remote.log.storage.manager.config*
Per Topic Configuration
  • remote.log.retention.periodminutes

  • remote.log.retention.bytes

...

Remote Storage Manager:

        RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy the completed log segments to the . The default implementation of the interface supports HDFS as remote storage and update RemoteOffsetIndex file. 

Only the broker that is the Leader for topic-partitions is allowed to copy to the remote storage.

Note: Early proposal. To be finalized during implementation. Additional remote storage support, such as S3 can be added later by providing other implementations using the configuration remote.storage.manager.class.

Code Block
languagescala
class/**
RemoteLogManager extends* ConfigurableRemoteStorageManager {is an interface that allows to valplugin RemoteStorageManagerdifferent remote storage implementations to copy //and
configure * retrieve the log segments.
def configure(Map<String, ?> configs)

     // Ask RLM to monitor the given TopicPartitions, and copy inactive Log
     // Segments to remote storage. RLM updates local index files once a
     // Log Segment in a TopicPartition is copied to Remote Storage
     def addPartitions(topicPartitions: Set[TopicPartition]): boolean

     // Stops copy of LogSegment if TopicPartition ownership is moved from a broker.
     def removePartitions(topicPartitions: Set[TopicPartition]): boolean
   
     
     // Read topic partition data from remote
     def read(fetchMaxByes: Int, hardMaxBytesLimit:Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)]): LogReadResult

     // Stops all the threads and closes the instance.
     def shutdown(): Unit   
}

Remote Storage Manager:

        RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy the log segments. The default implementation of the interface supports HDFS as remote storage. Additional remote storage support, such as S3 can be added later by providing other implementations using the configuration remote.storage.manager.class.

Note: Early proposal. To be finalized during implementation.

Code Block
languagescala
trait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Copies LogSegment provided by [[RemoteLogManager]]
   * Returns the RDIs of the remote data
   * This method is used by the leader*
 * All these APIs are still experimental.
 */
trait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no
   * segments in the remote storage.
   *
   * @param tp
   * @return
   */
  @throws(classOf[IOException])
  def earliestLogOffset(tp: TopicPartition): Long

  /**
   * Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * Returns the RDIs of the remote data. This method is invoked by the leader of topic partition.
   *
   * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
   * segment, log and offset/time indexes.
   *
   * @param topicPartition
   * @param logSegment
   * @return
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment, leaderEpoch: Int): util.List[RemoteLogIndexEntry]

  /**
   * Cancels the unfinished LogSegment copying of thisthe given topic-partition.
   *
   * @param topicPartition
   */
  def cancelCopyingLogSegment(topicPartition: TopicPartition): Unit

  /**
   * List the remote log segment files of the specifiedgiven topicPartition.
   * The RLMRemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
   *
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

  /**
   * List the remote log segment files of the specified topicPartition starting from the base offset minBaseOffset.
   * The RLM of a follower uses this method to find out the remote data
   *
   * @param minBaseOffset The minimum base offset for a segment to be returned.
   * @return List of remote segments starting from the base offset minBaseOffset, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): util.List[RemoteLogSegmentInfo]

  /**
   * CalledReturns by a List of RemoteLogIndexEntry for the RLM given RemoteLogSegmentInfo. This is used by follower to retrievestore theremote
RemoteLogIndex entries of the* specifiedlog remoteLogSegmentindexes locally.
   *
   * @param remoteLogSegment
   * @return
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**
   * Deletes remote LogSegment file providedand byindexes for the RLMgiven remoteLogSegmentInfo
   *
   * @param remoteLogSegmentInfo
   * @return
   */
  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegmentremoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean

  /**
   * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   *
   * @param topicPartition
   * @return
   */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * Remove the log segments which are older than the given cleanUpTillMs. Return the log start offset of the
   * earliest remote log segment if exists or -1 if there are no log segments in the remote storage.
   *
   * @param topicPartition
   * @param cleanUpTillMs
   * @return
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch that    * is greater than or equals to the
startOffset.    * startOffset, It will * Will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records

  /**
   * stopsRelease allany thesystem threadsresources andused closesby thethis instance.
   */
  def close(): Unit
}


Proposed Changes

When an RLM class is configured and all the required configs are present, RLM will send a list of topic-partitions and invoke the RemoteLogManager.addTopicPartitions(). This function’s responsibility is to delegate these topic-partitions to RLM. RLM will monitor the log.dirs for these topic-partitions and copy the rolled over LogSegment to the configured remote storage. Once a LogSegment is copied over it should mark it as done.

Replica Manager

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.

...

Code Block
languagescala
def readFromLocaLog(): Seq[(TopicPartition, LogReadResult)] = {
catch {
case e@ (_: OffsetOutOfRangeException) =>
    RemoteLogManager.read(fetchMaxBytes: Int,
                          hardMaxBytesLimit: Boolean, 
                          tp: TopicPartition, 
                          fetchInfo: PartitionData 
                          quota: ReplicaQuota)
}

Proposed Changes

When an RLM class is configured and all the required configs are present, RLM will send a list of topic-partitions and invoke the RemoteLogManager.addTopicPartitions(). This function’s responsibility is to delegate these topic-partitions to RLM. RLM will monitor the log.dirs for these topic-partitions and copy the rolled over LogSegment to the configured remote storage. Once a LogSegment is copied over it should mark it as done.

Log Retention

Log retention will continue to work as it is today except for one case, where If a LogSegment is in the process of getting copied over and it doesn't have associated "copy-done" file, LogCleaner will skips these LogSegments until it has the marker to denote its copied over to remote and its safe to delete.

...

For follower fetch, the leader only returns the data that is still in the leader's local storage. If a LogSegment copied into Remote Storage by a Leader Broker, it's not necessary for Follower Follower doesn't need to copy this segment which is already present in Remote Storage. Instead, a follower will retrieve the information of the segment from remote storage. If a Replica becomes a Leader, It can still locate and serve data from Remote storage.

...

Remote storage (e.g. S3 / HDFS) is likely to have higher I/O latency and lower availability than local storage.

When the remote storage becoming temporarily unavailable (up to several hours) or having high latency (up to minutes), Kafka should still be able to operate normally. All the Kafka operations (produce, consume local data, create/expand topics, etc.) that do not rely on remote storage should not be impacted. The consumers that try to consume the remote data should get reasonable errors, when remote storage is unavailable or the remote storage requests timeout.

...

When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar with to the existing produce/fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".

...

  1. Replace all local storage with remote storage - Instead of using local storage on Kafka brokers, only remote storage is used for storing log segments and offset index files. While this has the benefits related to reducing the local storage, it has the problem of not leveraging the local disk for efficient latest reads as done in Kafka today.

  2. Implement Kafka API on another store - This is an approach that is taken by some vendors where Kafka API is implemented on a different distributed, scalable storage (example HDFS). Such an option does not leverage Kafka other than API compliance and requires the much riskier option of replacing the entire Kafka cluster with another system.

  3. Client directly reads remote log segments from the remote storage - The log segments on the remote storage can be directly read by the client instead of serving it from Kafka broker. This reduces Kafka broker changes and has benefits of removing an extra hop. However, this bypasses Kafka security completely, increases Kafka client library complexity and footprint and hence is not considered.

...