Document the state by adding a label to the FIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink's capability to unon read data in datalake and Fluss is tightly coupled with Paimon's implementation, which limits it's flexibility and extensibility. We hard code paimon related classes  in fluss-flink module. It makes it difficult for Flink to support union read other datalakes and for other compute engines like spark, trino to integate with the union read ability. What's more, the tight coupling also obscures the core logic of union read , making the code harder to maintain and evolve. 

This proposal seeks to decouple union read from Paimon by introuding well-defiend interfaces and extension points which paimon should implement. By doing so, Flink can support a wider range of datalakes.  Furthermore, the standardized interfaces will allow other compute engines to  integrate with Fluss's union read capability.

Public Interfaces

Introduce a method createLakeSource in LakeStorage :

@PublicEvolving
public interface LakeStorage {

    // --- the following is added method

    /**
     * Creates a lake source instance for reading data from the specified table path. The lake
     * source provides capabilities for split planning and record reading, enabling efficient
     * distributed processing of lakehouse data.
     *
     * @param tablePath the logical path identifying the table in the lakehouse storage
     * @return a configured lake source instance for the specified table
     * @throws UnsupportedOperationException if the data lake storage doesn't support reading
     *     operations
     */
    LakeSource<?> createLakeSource(TablePath tablePath);
}
/**
 * Represents a logical partition or segment of data in data-lake.
 */
public interface LakeSplit {

    /**
     * Returns the bucket id for this data split. Any data split in lake must belongs a Fluss
     * bucket. The bucket id is used to aggregate splits that in same Fluss bucket into same reader
     * in bucket-aware table, such primary key table, log table with pre-defined bucket keys. If
     * it's not bucket-aware table, it's also feasible to return -1 directly for all data splits.
     *
     * @return the bucket id
     */
    int bucket();

    /**
     * Returns the hierarchical partition values for this split, or empty if the split doesn't belong
     * to a specific partition in non-partitioned table.
     *
     * <p>The returned list represents the complete partition path, with each element corresponding
     * to one level of the partitioning hierarchy in order. For example, in a table partitioned by
     * {@code dt=20230101/hr=12}, this method would return {@code ["20230101", "12"]}.
     *
     * <p>The list size should match the table's partition column count, and each element's position
     * corresponds to the declared partition column order. Values should be in their
     * string-represented form as they would appear in the filesystem path.
     *
     * @return the resolved partition values specification, or {@code empty} if this split doesn't
     *     belong to a specific partition in non-partitioned table.
     */
    List<String> partition();
}
/**
 * A generic interface for lake data sources that defines how to plan splits and read data. Any data
 * lake format supporting reading from data tiered in lake as Fluss records should implement this
 * interface.
 *
 * <p>This interface provides methods for projection, filtering, limiting to enable query engine to
 * push to lake source. Implementations must ensure that split planning and record reading
 * operations properly account for these pushed-down operations during execution.
 *
 * @param <Split> The type of data split, which must extend {@link LakeSplit}
 */
public interface LakeSource<Split extends LakeSplit> extends Serializable {

    /**
     * Applies column projection to the data source. it provides the field index paths that should
     * be used for a projection. The indices are 0-based and support fields within (possibly nested)
     * structures.
     *
     * <p>For nested, given the the following SQL, CREATE TABLE t (i INT, r ROW < d DOUBLE, b
     * BOOLEAN>, s STRING); SELECT s, r.d FROM t; the project will be [[2], [1, 0]]
     */
    void withProject(int[][] project);

    /** Applies a row limit to the data source. */
    void witLimit(int limit);

    /**
     * Pushes down filters to the data source for potential optimization.
     *
     * @param predicates The list of predicates to be pushed down
     * @return A {@link FilterPushDownResult} containing the predicates that were accepted by the
     *     source and those that remain to be evaluated
     */
    FilterPushDownResult withFilters(List<Predicate> predicates);

    /**
     * Creates a planner for plan splits to be read.
     *
     * @param context The planning context providing necessary planning information
     * @return A planner instance for this data source
     * @throws IOException if an error occurs during planner creation
     */
    Planner<Split> createPlanner(PlannerContext context) throws IOException;

    /**
     * Creates a record reader for reading data of data lake for the specified split.
     *
     * @param context The reader context containing the split to be read
     * @return A record reader instance for the given split
     * @throws IOException if an error occurs during reader creation
     */
    RecordReader createRecordReader(ReaderContext<Split> context) throws IOException;

    /**
     * Returns the serializer for the data split, used to transfer split information in distributed
     * environment.
     *
     * @return The serializer for the split
     */
    SimpleVersionedSerializer<Split> getSplitSerializer();

    /**
     * Context interface for planners, providing the snapshot id of the table in data-lake to plan
     * splits.
     */
    interface PlannerContext extends Serializable {
        long snapshotId();
    }

    /**
     * Context interface for record readers, providing access to the lake split being read.
     *
     * @param <Split> The type of lake split
     */
    interface ReaderContext<Split extends LakeSplit> extends Serializable {
        Split lakeSplit();
    }

    /**
     * Represents the result of a filter push down operation to lake source, indicating which
     * predicates were accepted by the source and which remain to be evaluated.
     */
    @PublicEvolving
    final class FilterPushDownResult {
        private final List<Predicate> acceptedPredicates;
        private final List<Predicate> remainingPredicates;

        private FilterPushDownResult(
                List<Predicate> acceptedPredicates, List<Predicate> remainingPredicates) {
            this.acceptedPredicates = acceptedPredicates;
            this.remainingPredicates = remainingPredicates;
        }

        /**
         * Creates a new FilterPushDownResult instance.
         *
         * @param acceptedPredicates The accepted predicates
         * @param remainingPredicates The remaining predicates
         * @return A new FilterPushDownResult instance
         */
        public static FilterPushDownResult of(
                List<Predicate> acceptedPredicates, List<Predicate> remainingPredicates) {
            return new FilterPushDownResult(acceptedPredicates, remainingPredicates);
        }

        /**
         * Returns the predicates that were accepted by the source.
         *
         * @return The list of accepted predicates
         */
        public List<Predicate> acceptedPredicates() {
            return acceptedPredicates;
        }

        /**
         * Returns the predicates that remain to be evaluated.
         *
         * @return The list of remaining predicates
         */
        public List<Predicate> remainingPredicates() {
            return remainingPredicates;
        }
    }
}

Note: in this FIP, we won't define what Predicate will look like, the following FIP for supporting log filter will define the detail of Predicate , so in here, we just take Predicate  into here. 

/**
 * A planner interface for generating readable splits for lake data sources.
 *
 * <p>Implementations of this interface are responsible for determining how to divide the data into
 * manageable splits that can be read in parallel. The planning should consider the pushed-down
 * optimizations (filters, limits, etc) from {@link LakeSource}.
 *
 * @param <Split> the type of data split this planner generates, must extend {@link LakeSplit}
 */
public interface Planner<Split extends LakeSplit> {

    /** Plans and generates a list of readable data splits in parallel. */
    List<Split> plan() throws IOException;
}
/**
 * An interface for reading records from {@link LakeSplit}.
 *
 * <p>Implementations of this interface provide an iterator-style access to records, allowing
 * efficient sequential reading of potentially large datasets without loading all data into memory
 * at once. The reading should consider the pushed-down optimizations (project, filters, limits,
 * etc) from {@link LakeSource}.
 */
public interface RecordReader {

    /** Read a {@link LakeSplit} into a closeable iterator. */
    CloseableIterator<LogRecord> read() throws IOException;
}
/**
 * A specialized {@link RecordReader} that produces records in a defined sorted order.
 *
 * <p>Extends the basic record reading capability with sorting semantics, ensuring that records are
 * returned according to a specified ordering.
 *
 * <p>Implementations must guarantee that the {@link #read()} method returns records in the order
 * defined by the comparator from {@link #order()}.
 *
 * <p>Note: This is mainly used for union read primary key table since we will do sort merge records
 * in lake and fluss. The records in primary key table for lake may should implement this method for
 * union read with a better performance.
 */
public interface SortedRecordReader extends RecordReader {

    /**
     * Returns the comparator that defines the sort order of the records.
     *
     * @return a non-null comparator defining the sort order of the records
     */
    Comparator<InternalRow> order();
}

Note: For paimon primary key table, the LakeRecords return by Paimon should implement SortedRecordReader since we will so sort merge to support union read primary key table. If flink find the lake records doesn't implement this interface in batch union read primary key table, it'll throw exception directly currently. But the in future, we can use deletion vector to help unioin read primary key table which don't require records to be sorted. 

Proposed Changes

I alread had a poc branch for the decoupling and it works with the interfaces mentioned above. Now, I'll like to describe the changes for the decoupling. Actually, it still following the existing code, but rely on interfaces instead of paimon classes.

1.  Firstly we'll introduce two kinds of splits for lake specified:

// A lake snapshot split of a table bucket
public class LakeSnapshotSplit extends SourceSplitBase {

    public static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1;

    private final LakeSplit lakeSplit;

    private final long recordsToSplit;
}
/** A split mixing Lake snapshot and Fluss log of a table bucket. */
public class LakeSnapshotAndFlussLogSplit extends SourceSplitBase {
   public static final byte LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND = -2;

   // the splits for lake snapshot, null when no snapshot data for the bucket
   @Nullable private final List<LakeSplit> lakeSnapshotSplits;
   

  // the fluss log start offset and stop offset
   private final long startingOffset;
   private final long stoppingOffset;
}

2: In Flink table source, call method LakeSource#withProject, LakeSource#withFilters for operation pushdown, then pass lakeSource to FlussSource to create FlussSourceEnumerator and create FlussSourceReader

  • In Flink source enumerator, if is batch mode & lake enable, call method lakeSource#createPlanner and Planner#plan to generate lake splits.
    • If the table is log table, wrap the lake split into LakeSnapshot split
    • If the table is primay key table, wrap the lake split into LakeSnapshotAndFlussLogSplit , which include the lake splits and remain fluss logs of the same table bucket to do merge read. 
  • Implement the split serializer for the LakeSnapshotSplit and LakeSnapshotSplitAndFlussLogSplit , it'll call method LakeSource#getSplitSerializer to get serializer for the LakeSplit and also serialize other parts of LakeSnapshotSplit and LakeSnapshotSplitAndFlussLogSplit

3: In FlinkSourceSplitReader

  • If get a LakeSnapshotSplit , we only need to care about read the lake records, call method LakeSource#createReader, and SourceReader#read to get a record iterator and emit it to downstream operator. 
  • If get a LakeSnapshotSplitAndFlussLogSplit , we need to merge the lake record in splits of lakeSplits and fluss log records. For lake records, we can still call method LakeSource#createReader, and SourceReader#read to get records, and the reader should implement SortedSourceReader   (if not implement, we will throw unsupported exception directly) which will return the sort comparator. We can use the return sort comparator to sort the rows read from fluss log record and do sort merge.  

Changes on Paimon

1. Implement createLakeSource method in PaimonLakeStorage 

2. Introduce PaimonLakeSource in fluss-lake-paimon module

Please see the poc branch for more detail.

Compatibility, Deprecation, and Migration Plan

First introduce new implementation with keeping the old code. 

After migrating the existing test code to new implementation, remove the old code.

Test Plan

Existing IT case

Rejected Alternatives

N/A