Document the state by adding a label to the FIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, ​​Fluss supports tiering data to Apache Paimon​, enabling cost-effective storage management for warm/cold data. However, the ​​lack of native Iceberg tiering support​​ limits flexibility and ecosystem integration for users who rely on Iceberg’s open table format. This proposal is aimed to support to tier Fluss data to Iceberg unlock more Iceberg ecosystems .

Public Interfaces

  • Add iceberg  to DataLakeFormat  
  • Introduce a fluss-lake-iceberg  module
  • Introduce IcebergBucketingFunction to follow Iceberg's bucket way. Once the fluss cluster is with the iceberg datalake format, it should always use IcebergBucketFunction  to bucket data.
  • Introduce IcebergLakeStorage  into  fluss-lake-iceberg module
  • introducea new boolean option `table.datalake.auto-maintenance`(true by default) to configure whether to do maintenance tasks(such as table compactions, snapshot expiration) to per table in lake tiering service

Proposed Changes

The basic knowledge of the design is that we'll map the Fluss bucket to Iceberg's bucket partition transfrom. So that, in the iceberg, we can also cluster the data by Fluss bucket to enable seek/scan data efficiently in a specific Fluss bucket. 

IcebergBucketFunction

IcebergBucketFunction is used to bucket Fluss data into buckets following Iceberg's bucket strategy.  It's used to enable datalake dynamically without rewriting data. Imagine you have a table, with already writing some data. If you want to enable it as iceberg lake table, you still want to keep the data distribution in Fluss same to Iceberg. The only way to do that without rewriting Fluss data is to bucket data using the Iceberg way at the beginning.

Some explanation about why we want to keep the data distribution in Fluss the same to Iceberg:

  • For union read, assuming we have read the hot data of Fluss bucket1, we want to read the cold data of Fluss bucket1 in Iceberg. If the data distribution is same between Fluss and Iceberg, we can quickly seek/scan the data in Iceberg that belong to Fluss bucket1. Otherwise, we need scan all data.
  • For subscribe Iceberg as Fluss cold data, assuming we want to scan from  Fluss bucket1, offset 100. Data distribution consistency can also help us seek/scan the data in iceberg efficiently without scaning all data

While Iceberg's V3 specification includes multi-argument partition transforms, no api + implementation currently exists in the codebase. So, I'd like to only support one bucket key to avoid the future potential compatibility issues. Also, Iceberg only support int, long, decimal, date, time, timestamp, timestamptz, timestamp_ns, timestamptz_ns, string, uuid, fixed, binary as bucket data type. 

The whole behavior will become:

  1. Table with one bucket key & supported bucket datatype => bucket using Iceberg bucket strategy. Support to enable datalake dynamically.
  2. (Table with multi-bucket keys or unsupported bucket datatype) + create table with data-lake enabled => throws UnsupportedException
  3. (Table with multi-bucket keys or unsupported bucket datatype) => use Fluss default bucket strategy + Don't support enable datalake dynamically. 

After Iceberg supports multi-bucket keys or more datatypes, we can support case 2 + case 3 then. 

IcebergLakeCatalog

When a Fluss with iceberg as datalake format is created, the cooresponding lake table will be created in iceberg via IcebergLakeCatalog.

General rule for create cooresponding lake table in Iceberg

1: The properties in Fluss table should also be forward to Iceberg table. For the properties with iceberg. prefix, the properties should be forward to iceberg with removing iceberg. prefix. For other propeties, the properties should be forward to iceberg with adding fluss. prefix. There's an example for that:

Fluss table:

CREATE TABLE order_table (
   ...
 ) WITH (
     'table.datalake.enabled' = 'true',
     'table.datalake.freshness' = '30s',
     'iceberg.commit.retry.num-retries' = '5'
);

Iceberg table:

CREATE TABLE order_table (
     ...
 ) WITH (
     'fluss.table.datalake.enabled' = 'true',
     'fluss.table.datalake.freshness' = '30s',
     'commit.retry.num-retries' = '5'
);

2: For both log tables and primary key tables, create the table in Iceberg requires adding three system columns at the end of the table: __bucket(int datatype) , __offset(bigint datatype) , and __timestamp(timestamp_ltz datatype). These system columns enable Fluss clients to subscribe to Iceberg data via the streaming API, specifically supporting the subscribe method in FlussLogScanner .

3. For partition keys of Fluss partitioned tables, directly map to Iceberg's identity partition transform. First partition by Fluss's partition key, then partiton by Fluss's bucket key.

4. All tables should be sort asc by additional column __offset . The reason is that we want to subscribe iceberg as Fluss historical from a Fluss log offset continuously, one by one. To do that efficiently, we want to sort them by __offset,  so that when read next log offset record, we just need to read the next pos in the same file . Otherwise, we have to go through all data files to find the next log offset record. Sort by __offset asc is naturally for Fluss lake tiering service since the records read by Fluss is already sorted by log offset. This sort spec is to make sure external compact won't break the sort property of records in iceberg.

Above are the general rules for both log tables and primary key tables. Now, let's divide Fluss table into LogTable and PrimaryKeyTable to explain how to map Fluss table to Iceberg table:

LogTable

No Bucket Key

For Fluss log tables without a configured bucket key, when synchronize to Iceberg, the Iceberg table must use the __bucket  column for identity partitioning transform. This ensures data with the same __bucket  values remains physically collocated, enabling efficient localization of specific bucket data within Iceberg.

Here's an example:

Fluss table:

CREATE TABLE log_table(
 order_id BIGINT,
 item_id BIGINT,
 amount INT,
 address STRING
) WITH ('bucket.num' = '3');

Iceberg table:

CREATE TABLE log_table (
 order_id BIGINT,
 item_id BIGINT,
 amount INT,
 address STRING,
 __bucket INT,
 __offset BIGINT,
 __timestamp TIMESTAMP_LTZ,
)
PARTITIONED BY (IDENTITY(__bucket))

Here'a another example for Fluss partitioned table, we will first partition by Fluss partition key, and then partition by Fluss's bucket in iceberg. The rule is generally apply for both log table and primary key table.

Fluss table:

CREATE TABLE log_table(
 order_id BIGINT,
 item_id BIGINT,
 amount INT,
 address STRING,
 event_date STRING
) PARTITIONED BY(event_date)  WITH ('bucket.num' = '3');

Iceberg table:

CREATE TABLE log_table (
 order_id BIGINT,
 item_id BIGINT,
 amount INT,
 address STRING,
 event_date STRING,
 __bucket INT,
 __offset BIGINT,
 __timestamp TIMESTAMP_LTZ,
)
PARTITIONED BY (IDENTITY(event_date), IDENTITY(__bucket))

Only one Bucket Key

When a Fluss log table is configured with a single bucket key, the corresponding Iceberg table must use that bucket key for bucket partitioning.

Here's an example:

Fluss table:

CREATE TABLE log_table(
 order_id BIGINT,
 item_id BIGINT,
 amount INT,
 address STRING
) WITH ('bucket.num' = '3', 'bucket.key' = 'order_id');

Iceberg table:

CREATE TABLE log_table (
 order_id BIGINT,
 item_id BIGINT,
 amount INT,
 address STRING,
 __bucket INT,
 __offset BIGINT,
 __timestamp TIMESTAMP_LTZ,
)
PARTITIONED BY (bucket(order_id, 3));

Multi Bucket key

When a Fluss log table is configured with multiple bucket keys, Iceberg currently lacks support for this scenario. Although Iceberg's V3 specification includes multi-argument partition transforms, no implementation currently exists in the codebase.

Current Iceberg Limitation:

Given three columns (c1, c2, c3), Iceberg can only:

First partition by c1

Then partition by c2 within each c1 partition

Finally, partition by c3 within each c2 partition

If using bucket transform with 5 buckets for each level, this would create:

5 (c1) × 5 (c2) × 5 (c3) = 125 total buckets

So, currently, I prefer not to support it if multiple bucket keys are configured to avoid the future potential compatibility issues and implementation complexity. The cleanest approach would be wait for iceberg support of multi-argument transforms, then implement the Fluss support.

PrimaryKeyTable

Primary key table always have bucket keys, only single bucket key and multiple bucket keys should be considered. And we should use Iceberg MOR table.

Single Bucket key

Here's an example:

Fluss table:

CREATE TABLE pk_table
(
   shop_id      BIGINT,
   user_id      BIGINT,
   num_orders   INT,
   total_amount INT,
   PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
   'bucket.num' = '4'
);

Iceberg table:

CREATE TABLE pk_table
(
   shop_id      BIGINT,
   user_id      BIGINT,
   num_orders   INT,
   total_amount INT,
   __bucket INT,
   __offset BIGINT,
   __timestamp TIMESTAMP_LTZ,
  PRIMARY KEY (user_id) NOT ENFORCED
) PARTITIONED BY (bucket(user_id, 4)); 

Multiple Bucket keys

Not supported. For tables with multiple bucket keys (multiple primary keys), I'd like to defer support until Iceberg implement it. 

IcebergLakeTieringFactory

To implement LakeTieringFactory For the iceberg, we need to implement IcebergLakeWriter implementing interface LakeWriter and IcebergLakeCommitter  implementing interface LakeCommitter

IcebergLakeWriter

IcebergLakeWriter is used to write to iceberg which is straight forward. We can refer org.apache.iceberg.flink.sink.IcebergSinkWriter  in iceberg repo. While writing, we should respect the write properties of table.

While writing, if the table option table.datalake.auto-maintenance is true, we should compact the files as well. 

While creating IcebergLakeWriter, the writer scan manifest to know the files in this bucket, if found compaction is available(see BaseRewriteDataFilesAction in iceberg repo to know how to find files to compact), schedule a background executor  to rewrite these files to large files. At the time, TieringService writes new data via write method. After write finish, call complete  to finish the data. In complete method, if there's a background executor rewriting data files, wait rewriting finish. So, complete  method produce added file + delete files in compaction and newly added files in normal Fluss tiering. But note it just compacts the files in same bucket.  


How files written handles  subscribe as Fluss historical data?

Although subscribe Iceberg data as Fluss historical data is not included this FIP, the writing should consider future subscribe:

If writing to the Iceberg primary key table, Iceberg maintains two types of delete files:

So, when writing Fluss change logs (-D or -U records) to equality-delete files, it must include all column values. This enables Iceberg data to serve as Fluss change logs by reconstructing complete records from equality-delete entries

Here is an example of how to subscribe Iceberg data as Fluss change log for primary key table:

To subscribe to the Fluss data where bucket = 1 , offset >= 100 in iceberg:

  1. Initial Locate
    1. Find the earliest snapshot in Iceberg's partition=1  where `__offset >=100 (e.g., snapshot 2)
  2. Incremental Processing
    1. For each subsequent snapshot (e.g., snapshot 3):
    2. Compare manifests between snapshots to identify:
      • New data files
      • Equality-delete files
      • Position-delete files
    3. Sort all changes by __offset  to maintain ordering
    4. Record Resolution:
      • Equality-deletes: Contain full record data
      • Position-deletes(deletion vector): Resolve by reading referenced files

However, for data in the delete file, whatever it’s an equality delete or a position delete, we can’t figure it out whether it’s -D(delete) or -U(update_before). We can consider it always -D(delete). 

IcebergLakeCommitter

In iceberg lake committer, it should commit the datafiles written. One thing to note is that we will also add Fluss related properites to Iceberg's snapshot summary. 

  • Respect the table behavior properties for commit, commit retry, etc. 
  • If table.datalake.auto-maintenance is true, also do snapshot expiration
  • Add "commit-user"="__fluss_lake_tiering" so that we can know what snapshots is generated by Fluss
  • Add "fluss-bucket-offset"= "${json_string_of_all_buckets_offset}", the Fluss bucket log end offset that snapshot has been tiered to, here's an example for it:
"fluss-bucket-offset": "[
{bucket1: offset1},
{bucket2: offset2}
{bucket3: offset3},
...
]"

Future Improvement Plan

  1. Support multiple bucket keys. There is already a PR https://github.com/apache/iceberg/pull/8259 in Apache Iceberg to support multiple bucket keys, but it has been closed since there has been no activity for a while. We can push the iceberg community to merge this PR, and then we can support multiple bucket keys. That's the best solution for  supporting multiple bucket keys
  2. For Iceberg primary key tables, we should implement an affinity strategy where each table is consistently handled by a dedicated same tiering service. 

    The reason is that for Iceberg tables, it's better to produce position deletion(deletion vector) files rather than equality-delete files.  If a table is always processed by the same tiering service, the service can maintain an in-memory/disk mapping like Map<StructLike, PathOffset> insertedRowMap, which tracks which file and offset each key is written to. By preserving this complete mapping, when a key needs to be deleted, the tiering service can directly locate its file + offset and record it in a position deletion file, otherwise, it must write to the equality file since it’s hard for Iceberg to locate the pos if mapping is not available.

    This ensures efficient deletions while avoiding equality-delete files.

Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

IT



Rejected Alternatives