This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-405: Kafka Tiered Storage

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: Early proposal. To be finalized during implementation.

Code Block
languagescala
Traittrait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Copies LogSegment // Configure
     def configure(Map<String, ?> configs)

     // Copies LogSegment provided by the RLM
     // Returns the RDIs of the provided by [[RemoteLogManager]]
   * Returns the RDIs of the remote data
   * This method is used by the leader
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment): util.List[RemoteLogIndexEntry]

  /**
   * Cancels the unfinished LogSegment copying of this given topic-partition
   */
  def cancelCopyingLogSegment(topicPartition: TopicPartition): Unit

  /**
   * List the remote log segment files of the specified topicPartition
   * The RLM of a follower uses this method to find out the remote data
   *
 // This method* is@return used List of remote segments, sorted by thebaseOffset leaderin ascending order.
   */
  @throws(classOf[IOException])
  def copyLogSegmentlistRemoteSegments(logSegmenttopicPartition: LogSegmentTopicPartition): (boolean, Seq[RemoteLogIndexEntry]util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

   // **
   * List the remote log segment files of the specified topicPartition starting from the base offset // minBaseOffset.
   * The RLM of a follower uses this method to find out the remote data
   *
   * @param minBaseOffset The minimum base offset for a segment to be returned.
   * @return List of remote segments starting from the base offset minBaseOffset, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): Sequtil.List[RemoteLogSegmentInfo]

  /**
  // * Called by the RLM of a follower to retrieve the RemoteLogIndex entries of the specified remoteLogSegment.
 // of the*/
new remote log segment
      @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): Sequtil.List[RemoteLogIndexEntry])

  /**
  // * Deletes remote LogSegment file provided by the RLM
   */
  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegment: RemoteLogSegmentInfo): booleanBoolean

  /**
  // read * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * Remove the log segments which are older than the given cleanUpTillMs
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch //that
starting from the given offset. 
     def read(remoteLocation: RDI  * is greater than or equals to the startOffset.
   *
   * Will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, offsetstartOffset: IntLong, minOneMessage: Boolean): LogReadInfoRecords

  /**
   //* stops all the threads and closes the instance.
   */
  def shutdownclose(): Unit
}


}


Replica Manager

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.

...