Document the state by adding a label to the FIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadN/A (discussed and voted before joining ASF)
Vote threadN/A
ISSUEhttps://github.com/alibaba/fluss/issues/107
Release0.7

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


1. Motivation

In traditional data architectures, real-time data processing and offline data analysis are typically handled by separate systems. Real-time data is often stored in message queue systems (such as Kafka) with a short retention period (e.g., 7 days), while offline data is stored in data warehouses (such as Hive) or data lakes with a longer retention period (e.g., one year). While this architecture can meet basic business needs, it has clear limitations: First, maintaining two independent storage systems leads to high hardware and operational costs. Second, due to inconsistencies between storage and compute engines, real-time and offline data processing results may differ, creating challenges for data consistency. Finally, the complexity of developing and maintaining two separate systems significantly increases development costs and technical barriers.

To address these issues, the industry has gradually explored and adopted the "Lakehouse" architecture. The Lakehouse combines the low-cost storage advantages of data lakes with the high-efficiency analytical capabilities of data warehouses. By using a unified storage and compute engine, it enables seamless integration of real-time and offline data. The rise of the Lakehouse architecture has also driven the evolution of streaming storage systems (such as Kafka, Redpanda, and others). These systems are progressively supporting deeper integration with Lakehouse, leading to more efficient data flow and storage management. As the Lakehouse ecosystem continues to evolve, the boundaries between real-time and offline data will increasingly blur, making data processing efficiency and consistency a key trend in the development of data engineering.

2.Fluss Lakehouse Storage

2.1 Overall Design

To address these , we introduce the design of integrating Lakehouse Storage and Streaming Storage in Fluss to support Lakehouse as the cold data storage layer for Fluss. Data of Fluss is directly written to the Lakehouse through continuous synchronization, and the requirements for computing engine union analysis are considered in the design process to ensure the consistency between hot and cold data. Fluss's Lakehouse offers significant benefits: it reduces Kafka storage requirements from 7 days to just 1 hour, ensures the consistency of calculation results, and adopts an open lakehouse format, all while simplifying the system architecture and reducing development and maintenance costs.

Taking Paimon as the cold Storage of Fluss as an example, Tiering Service writes the data of Fluss into Paimon, The Lakehouse Storage, to realize the cold and hot data Stratification. Fluss only needs to save the real-time data of hour level,The storage cost of hot data is greatly reduced. Both Fluss and Paimon are deeply integrated with Flink. Union Source is introduced in Flink to support Union Read. Cold data is Read from the lake and hot data is Read from Fluss. Metadata and locations are managed by Fluss,The Union Read features ensures data consistency. In order to ensure data consistency at the same time, improve data processing efficiency, reduce costs.

2.2 Fluss Metadata Federation

1. Metadata Federation design

A key aspect of Fluss Lakehouse Storage is ensuring the synchronization of both metadata and data between Fluss and the Lake. For metadata synchronization, Fluss implements Metadata Federation. The core idea is to deeply integrate Lakehouse with Fluss, ensuring that the creation and modification of Fluss tables are reflected in the downstream Lakehouse as configured by the user. Additionally, the bucket strategies for Lake tables and Fluss tables will remain consistent once user enabled Fluss' Lakehouse Storage.

2 Overall process

Current Status:
Currently, Fluss metadata synchronization is handled through the Tiering Service (a Flink job), which synchronizes tables that need to be tiered to the Lakehouse. The synchronized table is identified by the attribute , indicating whether synchronization is required. The Flink job is initiated using the script . Before data synchronization begins, a table must be created on the Paimon side.

Proposal:

  1. At the Fluss cluster level, configure the corresponding Lakehouse storage in the  file to enable Lakehouse Storage:

lakehouse.storage: paimon
paimon.catalog.type: filesystem 
paimon.catalog.warehouse: /tmp/paimon_data_warehouse

  1. After the user enables the Lakehouse, a validation check will be performed when the Fluss cluster starts to ensure the Lakehouse storage is correctly configured and the network connectivity is functional.
  2. Once Lakehouse Storage is enabled in the Fluss cluster, the bucket assignment policy for Fluss tables will, by default, align with the bucket assignment policy in the Lakehouse. This ensures that the distribution of hot and cold data buckets remains consistent. The metadata of Fluss tables will include the attribute  (default value is ). This design offers two benefits:
  • It allows dynamic enabling of data to enter the lake.
  • It enables finer granularity at the table level, supporting multi-tenant and multi-lake environments for subsequent clusters.
  1. After enabling Lakehouse Storage in the Fluss cluster, all Fluss tables will have the attribute  by default. This configuration will synchronize the creation of downstream Lakehouse tables and ensure that the corresponding Lakehouse table is also created, maintaining metadata synchronization. When the schema of the Fluss table is modified, the changes will be propagated to the corresponding table in the downstream Lakehouse. This greatly simplifies the process while ensuring metadata consistency.

2.3 Fluss Tiering Service

1. Tiering Service design

For data synchronization, Fluss utilizes the Tiering Service to manage the synchronization process. The Tiering Service is designed as a stateless synchronization task, allowing for scalability. A single Fluss cluster can register multiple Tiering services, with each service being assigned a specific workload. The Fluss cluster is responsible for managing the task status of each step within the Tiering Service, ensuring smooth execution and progress tracking.

2 Overall process

Current Status:

Currently, data synchronization in Fluss is managed by the Tiering Service (a Flink job) to synchronize tables that need to be tiered to the Lakehouse. Synchronized tables are identified by the attribute , which specifies whether synchronization is required. The Flink job is initiated using the script .

Proposal:

  1. The Tiering Service executes a Flink run command to start the  job. This job represents the Tiering Service. Multiple jobs can be started by the Tiering Service. Once the service starts, it registers itself with the Fluss Cluster. The run command looks like this:

<FLINK_HOME>/bin/nimble run /path/to/fluss-lake-tiering-flink-0.6.jar \
-c /path/to/River-config-Dir \
-j fluss-lake-format-paimon-0.6.jar \
-j paimon-bundle-1.0.1.jar

  1. The Fluss Cluster assigns tasks based on the registered Tiering Service. Each task corresponds to a Fluss table that needs to be synchronized, with the smallest unit being a table (which could involve multiple tables). Task distribution is done using an in-memory queue, where tasks are assigned based on the end-offset + 3-minute polling (configurable) of the last snapshot in the lake, for balanced distribution.
    1. Pros: Each synchronization task does not need to handle its own concurrency (e.g., how many Flink tasks run concurrently). If the synchronization capability is low, fewer tasks are allocated, and if the capability is high, more tasks are allocated. There is no need to manually manage load balancing for each synchronization task.
    2. Cons: All tables in the cluster will have the same SLA alignment. However, this can be managed by setting priorities and allocating different weights via a priority queue scheduling mechanism.
  2. After receiving the task (Fluss table information), the Tiering Service is responsible for reading data from the Fluss table, writing it into Paimon, and reporting the information (lake last snapshot) back to the Fluss cluster once the commit to the lake snapshot is completed.
  3. By default, the Lake Tiering Service synchronizes all tables. If some tables do not need synchronization, you can specify  to exclude them. If users later want to start synchronization for existing tables, they can dynamically modify the table's attribute to . This will trigger the creation of the corresponding downstream Lake table synchronously.
  4. The Tiering Service maintains a connection to the Fluss Cluster via a heartbeat mechanism. This mechanism is used to monitor the status of the Tiering Service, detect synchronization task timeouts, and manage the task kill mechanism when necessary.


The overall sequence  diagram is shown below:


2.4 Fluss Union Read

When computing engines (such as Flink and StarRocks) read data from Fluss and data lakes, they first retrieve metadata from the Fluss cluster. Based on this metadata, they acquire the latest snapshot from the lake and the  for each bucket under that snapshot. Then, the Union Read operation is performed by reading the corresponding snapshot from the lake and the relevant log data (from  to ) from Fluss.

3. Interface Design

3.1 Modules

(1) Add a new module named fluss-lake, and then introduce relevant sub-modules under this module. The overall structure is as follows:

  • /fluss/fluss-lake
    • Public classes and interfaces, two directories, tiering(tiering related) and format ( Fluss read and write lake format related)
    • The Fluss data tiering to paimon implementation, the follow-up will be ,  implementation, here is mainly Fluss read and write to the data Lake format paimon implementation.
    • Using flink as a Tiering Service implementation, mainly Fluss' Flink Source and Paimon's Flink Sink implementation. Subsequent users can also implement their own Tiering Service(fluss-lake-tiering-flink-0.6.jar), complete the synchronization of the Fluss table to the Paimon table, the core of the Flink module is to generate a Flink datastream job.
    • fluss-lake-common:
    • fluss-lake-format-paimon:
    •  /fluss-lake-tiering-flink:


<FLINK_HOME>/bin/run.sh  /path/to/fluss-lake-tiering-flink-0.6.jar
-c /path/to/fluss-config
-J fluss-lake-format-paimon-0.6.jar
-J paimon-bundle-1.0.1.jar

(2)After the  module is completed and stabilized, the existing  module will be deprecated and removed entirely.

(3)In the  modules, we will remove the  dependencies and instead rely directly on them . This change will allow Fluss to read and write Paimon-related modules, enabling code reuse across the system.

(4)In the  module, we will add  dependencies (including , , and ) to handle the internal Paimon Catalog dependencies and its three implementations (filesystem, hive, jdbc). StarRocks also supports the Paimon catalog, allowing Fluss clusters to directly create downstream tables. The server module will load these dependencies through the module mechanism to prevent potential class conflicts.

3.2 Public Interfaces

(1) Introduce Interfaces

In the new interface design, there are three primary interfaces as follows:

(a) Synchronize Fluss data to the Data Lake interface
(b) Calculate the relevant bucket interface of the Lake table based on Fluss data
(c) Pluggable Lakehouse interfaces for integrating different lake formats

(a) Synchronize Fluss data

The Tiering Service is responsible for synchronizing Fluss data to the Data Lake. These interfaces are located under the  module and include:

LakeTiering Factory

public interface LakeTieringFactory<WriteResult, CommittableT> {

  // create a Lake writer to write Fluss's rows to Paimon/Iceberg rows
  CompactWriter<WriteResult> createLakeWriter(WriterInitContext writerInitContext) throw IOException;
  SimpleVersionedSerializer<WriteResult> getWriteResultSerializer();


  // create a Lake committer to commit to Paimon/Iceberg
  Committer<CommittableT> createLakeCommitter(CommitterInitContext committerInitContext) throws IOException;
  SimpleVersionedSerializer<CommittableT> getCommittableSerializer();

}


WriterInitContext

// a context to create LakeWriter
public interface WriterInitContext {
  TablePath tablePath();
  TableBucket tableBucket();
  @Nullabe String partition();
}


SimpleVersionedSerializer

public interface SimpleVersionedSerializer<E> {
 
  int getVersion();

  byte[] serialize(E obj) throws IOException;

  E deserialize(int version, byte[] serialized) throws IOException;
}


LakeWriter

public interface LakeWriter<WriteResult> extends Closeable {

  void write(ScanRecord record) throws IOException;

  WriteResult complete() throws IOException;
}

Note: If the implementation of LakeWriter can support writing records in batches, for example, for Fluss ArrowBatch, directly convert this Batch into Parquet format, the following interface can also be implemented:

public interface SupportsRecordBatchWrite {
 
  void write(RecordBatch recordBatch) throws IOException;
}


RecordBatch is an abstraction of a batch of data records introduced here:

public interface RecordBatch {
}

The implementation corresponding to ArrowBatch in Fluss:

public class ArrowRecordBatch implements RecordBatch {
}


LakeCommitter

public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable {
 
  CommittableT toCommittable(List<WriteResult> writeResults) throws IOException;

  void commit(CommittableT committable) throws IOException;
}


(b) Calculate data bucket distribution

Calculate the interfaces related to the Fluss table and the Lake table bucket based on the Fluss data, and put them into the metadata module of the fluss-common. The tiering service depends on the fluss-client, and the fluss-client and fluss-server depend on the fluss-common.

Public interface BucketAssignerFactory {
    BucketAssigner create BucketAssigner(Table descriptable descriptor);
}


public interface BucketAssigner {
 
  int assignBucket(InternalRow row);
}

The Paimon table will be created by Tiering Service itself, so the bucket key is the same as the bucket key of the Fluss table by default. For Iceberg, because currently Iceberg only supports a single bucket key,If the Fluss table has multiple bucket keys, introduce a new column__bucketcolumn, whose type is binary. Combine the Fluss bucket keys into the new column, and use the new column as the bucket key.The TableDescriptor already has the table.datalake.format = 'none/paimon/iceberg' attribute. You can create a BucketAssigner based on this information.

(c) Pluggable Lakehouse interfaces

The aforementioned interfaces need to be loaded via the SPI (Service Provider Interface) mechanism. To facilitate this, an interface is introduced. Implementations such as Paimon and Iceberg will implement this interface. This interface will be placed under the  module.

public interface LakeStoragePlugin extends Plugin {

  // the name of lake storage, such as paimon/iceberg
  String identifier();

  LakeStorage createLakeStorage(final Configuration configuration);
}


Public interface LakeStorage {

LakeTieringFactory createLakeTieringFactory();

}


(2) Deprecate Interfaces

The relevant APIs in the fluss-lakehouse are marked as deprecated and will be deleted in the next version.

3.3 Configurations

Add a configuration to the metadata of the Fluss table: table.datalake.format = 'none/paimon/iceberg/hudi', with the default value set to none.


4. Future Work

4.1 Deletion Vector Support

A delete vector will be introduced for Fluss data to quickly filter and speed up the reading of real-time data.

4.2 Table Assignment Priority Support

Fluss supports higher scheduling weights for high-priority tables, ensuring that these tables receive higher SLA guarantees. This is particularly useful for business-critical tables within the Fluss cluster that require stricter SLA adherence.

4.3 Schema Evolution Support

When the user modifies the Fluss table schema, Fluss directly synchronizes the schema changes between Fluss and Paimon. The  Tiering Service supports automatic synchronization of data after schema changes, reducing maintenance complexity.

5.Rejected Proposal

 5.1 Tiering Service Built-in to Fluss Server

In terms of design, the Tiering Service is not tightly bound to Flink and can run in various environments, such as Flink jobs, containers, or within the Fluss server itself. Currently, we implement the Tiering Service using Flink jobs. The primary reason for not running the Tiering Service within the Fluss server is that it could negatively impact the stability of the server. Since the Tiering Service is a high-CPU operation, running it within the server would consume significant resources, potentially affecting online business traffic. While slower data lake synchronization is acceptable, it's more stable and scalable to separate storage and computing workloads. This design supports more elastic scaling and ensures that services remain stable.

Alternatively, the Tiering Service could be developed from scratch as a standalone data synchronization service and deployed in containers. However, Flink is an excellent distributed stream computing engine that is well-suited for data synchronization tasks. By leveraging Flink for the Tiering Service, we can maximize the reuse of its existing capabilities, significantly reducing development costs compared to building a custom solution from the ground up.