This page is meant as a template for writing a FIP. To create a FIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Datalake is considered as Fluss another tier storage.  By saying datalake is a tier storage of Fluss, it always means that Fluss client can read from datalake as Fluss's data. But currently, we only support to tier Fluss data into datalake, but missing the reverse data flow.  This FIP is to enable FlussLogScanner to support fetch log data from datalake tiered stroage. 

Public Interfaces

Overall, there're two key points to support to enable fetch from data lake:

  • list offset by timestmap should consider the data in data lake 
  • when the data to fetch is not local and is in data lake, the LogScanner should read from the corresponding datalake 

Regarding to above key points, the public interfaces/protocol changes are:

Rpc protocol changes

1: Add an extra field optional  int64 log_max_timestamp to rpc message PbLakeTableOffsetForBucket and PbNotifyLakeTableOffsetReqForBucket

message PbLakeTableOffsetForBucket {
  optional int64 partition_id = 1;
  required int32 bucket_id = 2;
  optional int64 log_start_offset = 3;
  optional int64 log_end_offset = 4;
  // the following field is newly added
  optional int64 log_max_timestamp = 5;
}

message PbNotifyLakeTableOffsetReqForBucket {
  required int64 table_id = 1;
  optional int64 partition_id = 2;
  required int32 bucket_id = 3;
  required int64 snapshot_id = 4;
  optional int64 log_start_offset = 5;
  optional int64 log_end_offset = 6;
   // the following field is newly added  
  optional int64 log_max_timestamp = 7;
}

2: Add an extra field optional  int64 lake_snapshot_id to rpc message PbListOffsetsRespForBucket 

message PbListOffsetsRespForBucket {
  required int32 bucket_id = 1;
  optional int32 error_code = 2;
  optional string error_message = 3;
  optional int64 offset = 4;
  // the following field is newly added   
  optional int64 lake_snapshot_id = 5;
}

3: Add an extra field optional boolean fetch_from_lake to rpc message PbFetchLogRespForBucket 

message PbFetchLogRespForBucket {
  optional int64 partition_id = 1;
  required int32 bucket_id = 2;
  optional int32 error_code = 3;
  optional string error_message = 4;
  optional int64 high_watermark = 5;
  optional int64 log_start_offset = 6; 
  optional PbRemoteLogFetchInfo remote_log_fetch_info = 7;
  optional bytes records = 8;
  // the following field is newly added    
  optional PbLakeLogFetchInfo lake_log_fetch_info = 9;
}

// added message
message PbLakeLogFetchInfo {
  required int64 snapshot_id = 1;
  required int64 log_end_offset = 2;
}

4: Add an extra field optional boolean enable_lake to rpc message ListOffsetsRequest and FetchLogRequest for backward-compatible. Only new client should set enable_lake = true 

message ListOffsetsRequest {
  required int32 follower_server_id = 1;  // value -1 indicate the request from client.
  required int32 offset_type = 2; // value can be 0,1,2 (see ListOffsetsParam for more details)
  required int64 table_id = 3;
  optional int64 partition_id = 4;
  repeated int32 bucket_id = 5 [packed = true]; // it is recommended to use packed for repeated numerics to get more efficient encoding
  optional int64 startTimestamp = 6;

  // the following field is newly added    
  optional bool enable_lake = 7;
}

message FetchLogRequest {
  required int32 follower_server_id = 1;  // value -1 indicate the request from client.
  required int32 max_bytes = 2;
  repeated PbFetchLogReqForTable tables_req = 3;
  optional int32 max_wait_ms = 4;
  optional int32 min_bytes = 5;

  // the following field is newly added     
  optional bool enable_lake = 6;
}

Interface changes

/**
 * The interface to support create {@link FlussLogSource}, any {@link LakeStorage} that supports the
 * data to be read in Fluss streaming patten should implement this interface.
 *
 * @since 0.8
 */
@PublicEvolving
public interface SupportsCreateFlussLogSource {

    /**
     * Creates a log source for accessing log data of Fluss(data of Fluss log table, change log of
     * Fluss primary key table).
     *
     * @param tablePath table path for the target table
     * @return Created FlussLogSource instance
     */
    FlussLogSource createFlussLogSource(TablePath tablePath);
}
/**
 * Provides access to log-based data(data of Fluss log table, change log of Fluss primary key table)
 * in lake storage. Supports time-based lookup log and log record fetching operations.
 *
 * @since 0.8
 */
@PublicEvolving public interface FlussLogSource {

    /**
     * Finds the log offset corresponding to the specified timestamp for the Fluss data of given
     * lookup context. 
     *
     * @param lookupContext the context to lookup offset by timestamp
     * @return the first offset that bigger than or equal given {@code timestamp}
     * @throws InvalidTimestampException if the {@code timestamp} is larger than max timestamp
     */
    long lookupLogOffsetByTimeStamp(LookupContext lookupContext)
            throws InvalidTimestampException;

    /**
     * Fetches log records for the Fluss data in datalake.
     *
     * @param fetchContext the context to fetch in datalake
     * @return An iterator of LogRecords
     */
    CloseableIterator<LogRecord> fetchLogRecords(FetchContext fetchContext) throws IOException;
}
public class LookupContext {
    @Nullable private final String partitionName;
    private final int bucket;
    private final long timestamp,
    private final long lakeSnapshotId
}
public class FetchContext {
    private final long fetchStartOffset;
    private final long logEndOffsetOfSnapshot;
    @Nullable private final String partitionName;
    private final int bucket;

    private final long lakeSnapshotId;
    @Nullable private final Projection projection;
}

Proposed Changes

List offset by timestmap

Changed in LakeTieringServices

1: TableBucketWriteResult should now contain the log max timestamp for the bucket that has been tiered

2. When commit lake table into Fluss, the rpc requst  should also include the lake log max timestamp

Changed in FlussServer

1: When coordinator receive CommiLakeTableSnapshotRequest , send lake tiered log max timestamp to tablet servers to notify what logs has been tiered to lake

2: When recieve list offset by timestamp request, if the lake log max timestamp >= timestamp requested, return the corresponding lake snapshot id which includes the lake log max timestamp, so that 

client can find the log offset in the lake snapshot

3: When restore, Tablet server restore the lake snapshot information(log offsets, snapshot, max timestmaps) from lake snapshot node in zk

Changed in FlussClient

1: Fall back to the previous logic if 

  • the table is not lake enabled table
  • table is lake enabled table,  try to load LakeStoragePlugin and create FlussLogSource, if lake plugin is not available or create source fail

2: In FlussClient#listOffsetst , when build ListOffsetsRequest , set enable_lake = true

3: If PbListOffsetsRespForBucket  in ListOffsetsResp contains lake_snapshot_id , will try to call method FlussLogSource#lookupLogOffsetByTimeStamp  to seek offset by timestamp from lake

Read log records from corresponding datalake

Changed in FlussServer

1: When tablet server receives fetch log request, if the data to fetch is in local, return fetched log records from local directly to client which aligns with current behavior

2: If the data in in remote, return the remote log segments information so that client can read from the remote segments

3: If the data is neither in local nor in remote, check whether the data is datalake by compare lake_log_end_offset with fetch_start_offset, it lake_log_end_offset >= fetch_start_offset , it means the log data is in lake

, return LakeFetchInfo(snapshot_id, log_end_offset) in FetchLogResponse  to client so that client can know which snapshot to fetch from data lake. If also not in lake, return offset range exception to client

Overall, in this data reading process, local data has the highest priority, remote data comes second, and lake data has the lowest priority.


Note: Theoretically, reading from a lake is more efficient than reading from a remote source since it can support column prune(current for fetch from remote, we will lose the ability of column prune for we will download the whole segment from remote) well and need less io for lake has a higher compression ratio. But in here, I still rank remote data above lake data. The reason is that I'm tending to keep cautious about the first version implementation. Although theoretically lake is more efficient, we need be more benmark, and maywell need a elaborate code optimaztion. When it comes to maturity and have more production benchmark, we can turn to rank lake data above remote data.

Changed in FlussClient

1: Fall back to the previous logic if 

  • the table is not lake enabled table
  • table is lake enabled table,  try to load LakeStoragePlugin and create FlussLogSource, if lake plugin or FlussLogSource is not available

2: When send FetchLogRequest , set enable_lake = true 

3: If the FetchLogResponse contains LakeFetchInfo,  Fluss client will try to fetch log from Lake. Call method FlussLogSource#fetchLogRecords to fetch records

Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

IT

Rejected Alternatives

N/A