Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

        RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy the log segments. The default implementation of the interface supports HDFS as remote storage. Additional remote storage support, such as S3 can be added later by providing other implementations using the configuration remote.storage.manager.class.

Note: Early proposal. To be finalized during implementation.

Code Block
languagescala
Traittrait RemoteStorageManager extends Configurable with AutoCloseable {
    
     // Configure**
     def configure(Map<String, ?> configs)

     // * Copies LogSegment provided by the RLM[[RemoteLogManager]]
     //* Returns the RDIs of the remote data
   *  // This method is used by the leader
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment): (boolean, Sequtil.List[RemoteLogIndexEntry]) 

  /**
   //* ListCancels the remoteunfinished logLogSegment segment filescopying of thethis specifiedgiven topicPartitiontopic-partition
     *//
 The RLM of a follower uses this method to find outdef cancelCopyingLogSegment(topicPartition: TopicPartition): Unit

  /**
   * List the remote data
log segment files of the defspecified listRemoteSegments(topicPartition: TopicPartition): Seq[RemoteLogSegmentInfo]

     // Called by the RLM of a follower to retrieve RemoteLogIndex entries
     // of the new remote log segment
     def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): Seq[RemoteLogIndexEntry])

     // Deletes remote LogSegment file provided by the RLM
     def deleteLogSegment(remoteLogSegment: RemoteLogSegmentInfo): boolean

     // read topic partition data from remote storage,
     // starting from the given offset. 
     def read(remoteLocation: RDI, maxBytes: Int, offset: Int): LogReadInfo


     //topicPartition
   * The RLM of a follower uses this method to find out the remote data
   *
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

  /**
   * List the remote log segment files of the specified topicPartition starting from the base offset minBaseOffset.
   * The RLM of a follower uses this method to find out the remote data
   *
   * @param minBaseOffset The minimum base offset for a segment to be returned.
   * @return List of remote segments starting from the base offset minBaseOffset, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): util.List[RemoteLogSegmentInfo]

  /**
   * Called by the RLM to retrieve the RemoteLogIndex entries of the specified remoteLogSegment.
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**
   * Deletes remote LogSegment file provided by the RLM
   */
  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegment: RemoteLogSegmentInfo): Boolean

  /**
   * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * Remove the log segments which are older than the given cleanUpTillMs
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch that
   * is greater than or equals to the startOffset.
   *
   * Will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records

  /**
   * stops all the threads and closes the instance.
   */
  def shutdownclose(): Unit   
}


Replica Manager

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.

...

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager.readFromRemoteLog and returns the LogReadResult. If the fetch request is from a consumer, RLM will read the data from remote storage, and return the result to the consumer.

Follower Requests/Replication

For follower fetch, the leader only returns the data that is still in the leader's local storage. If a LogSegment copied into Remote Storage by a Leader Broker, it's not necessary for Follower to copy this segment which is already present in Remote Storage. Instead, a follower will retrieve the information of the segment from remote storage. If a Replica becomes a Leader, It can still locate and serve data from Remote storage.

Thread Pools

Remote storage (e.g. S3 / HDFS) is likely have higher I/O latency and lower availability than local storage.

...

To achieve this, we have to handle remote storage operations in dedicated threads pools, instead of Kafka I/O threads and fetcher threads.


1. Remote Storage Manager (RLM) Thread Pool


RLM maintains a list of the topic-partitions it manages. The list is updated in Kafka I/O threads, when topic-partitions are added to / removed from RLM. Each topic-partition in the list is assigned a scheduled processing time. The RLM thread pool processes the topic-partitions that the "scheduled processing time" is less than or equal to the current time.

...

Update the "earliest remote offset" of the topic-partition to the base offset of the earliest remote segment, when old remote segments are removed from the remote storage. Delete the old local remoteLogIndex file, if the "earliest remote offset" is larger than the last offset of the file.


2. Remote Storage Fetcher Thread Pool


When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar with the existing produce / fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".

Each thread in the thread pool processes one remote fetch request at a time. The remote storage fetch thread will

1) find out the corresponding RDI from the remote log index

2) try to retrieve the data from remote storage

2.1) if success, RemoteFetchPurgatory will be notified to return the data to the client

2.2) if the remote segment file is already deleted, RemoteFetchPurgatory will be notified to return an error to the client.

2.3) if the remote storage operation failed (remote storage is temporarily unavailable), the operation will be retried with Exponential Back-Off, until the original consumer fetch request timeout.

Alternatives considered

Following alternatives were considered:

...