Motivation

In data streaming process there may be data errors and other issues, and we need to correct the data in the flow. This situation is very common and important. However, in this process, we do not want to affect existing data processing to avoid impact on users. We need to create a new data streaming process and wait for it to catch up with the data and replace the original data streaming process. The main operations can be divided into the following steps:

  1. Create a replica table based on the specified tag/snapshot of upstream and downstream Paimon Tables

  2. Resubmit all streaming jobs, incremental or full recovery starting from the specified offset

We need to support branching in Paimon for the above data correction progress, then we could create replica tables to avoid copying all data from specified tables and increase storage space.
Besides, branching in Paimon can also be used to enhance tag. For Tag simulation of traditional Hive partition tables, provide data correction capabilities on the basis of Tag, which can be used to supplement data and achieve precise segmentation capabilities.
Above all, the branch we would like to introduce in Paimon has the following abilities:

  1. Each table only has one main branch, and other branches can only be created from the specified tag of the main branch

  2. Users can create or delete a branch for tables in Paimon, and create tags for a specified branch.

  3. Update schema for the branch, such as altering tables to add/drop columns.

  4. Jobs can streaming/batch read and write data in the branch

  5. There are merging and replacing operations from branch to main, and after replace main with given branch, the previous main branch will be deleted.

Architecture

Data storage structure in Paimon is divided into five components: schema, data file, manifest file and list, snapshot, tag.

To support the above branch capabilities, we would like to introduce branches for snapshot, tag and schema as follows.

There is a main branch file  in the base directory of table and it has the main branch name in the file. Besides that, there will be multiple branch directories and each branch has snapshot, tag and schema in its directory.

NOTICE: By default, the Snapshot、Schema and Tag of main branch will be in the base directory of table as previously. The main branch will be used to read and write when there's no specified branch or main branch file in the table.

Create Branch

There will be a series of snapshots, tags and schemas in the main branch of a Paimon table. We can create a new branch with branch name from the tag for the table. Paimon should create a new directory with the given branch name, copy the specified tag, snapshot and schema from the main branch to the new branch.

For example, when Branch-1  is created from tag-1 , it should copy the relevant snapshot-4  and schema-1  for Branch-1 . Branch-2  and Branch-3  will do the same thing for tag-7  and tag-11 .

Operations In Branch

After a branch is created, streaming and batch jobs can read and write data in it. Like a regular table, we can also streaming and batch data from branch through time travel. After writing data to the branch, new snapshots and tags will be created. Users can also perform DDL for table branches, such as add/drop/alter columns. For example, we do these operations in Branch-1  to create new schemas, snapshots and tags.

Delete Branch

Delete branch is very simple, just delete the directory of the specified branch directly.

Merge Branch To Main

Merge a branch into the main branch can be divided into two steps:

  1. Delete all the snapshots, tags and schemas in the main branch that are created after the created tag for the branch

  2. Copy snapshots, tags and schemas from the branch to the main branch.

The merged branch can still be read and written by jobs and the data in the branch is still independent from the main branch.

Replace Main With Branch

We need to support replacing the main branch with a branch without affecting streaming and batch data read and write on the branch. To achieve this, we need to do the following steps:

  1. Calculate and copy the snapshots, tags and schemas which should be copied from the main branch to target branch

  2. Update the Main Branch File to the target branch

  3. Drop the previous main branch, including snapshots, tags and schemas.

After the above steps, the main branch will be replaced with the target branch and the existing jobs can still read and write data in the branch.

Proposed Changes

Branch Directory

To manage branches better, we would like to create a directory for each branch, like /branch-1, /branch-2, and /branch-n. Snapshot, Tag and Schema directories will be placed in the branch directory. We introduce BranchManager to each table to manage its branches.

public class BranchManager {
    /** Get the main branch name for the table. */
    public String getMainBranch();
    
    /** Create branch with given branch name from specified tag. */
    public void createBranch(String tagName, String branchName);
    
    /** Delete branch with given branch name. */
    public void deleteBranch(String branchName);
    
    /** Merge given branch into main and the branch will be still exist. */
    public void mergeBranch(String branchName);
    
    /**
     * Replace main branch with specified branch name and the 
     * previous main branch will be deleted.
     **/
    public void replaceMain(String branchName);
}

Query Branch

Users can set branch names in the job to stream and batch read and write data.

Spark

// Query data from specified branch and tag name.
SELECT * FROM t VERSION AS OF branch-name.tag-name;

// Query data from specified branch and snapshot id.
SELECT * FROM t VERSION AS OF branch-name.snapshot;

Users can specify the branch name in their jobs, and when there's no branch name in the version, the query will read the data in the main branch.
NOTICE: The branch name can not contain '.' which will be checked when a branch is created.


Flink

SELECT * FROM t /*+ OPTIONS('scan.branch'='<branch name>') */

We will introduce a new option scan.branch for flink to specify branch name in the job.

Flink Branch Actions

We propose to provide two Flink actions for users to control the creating, deleting, merging and replacing of branches.

action

argument

note

create-branch

--name <branch-name>: specify the name of the branch.
-- tag <tag-name>: specify the name of a tag.

create a branch based on the given tag.

delete-branch

--name <branch-name>: specify which branch will be deleted.

delete a branch.

merge-branch

--name <branch-name>: merge specified branch to main.

merge specified branch to main.

replace-main-branch

--name <branch-name>: replace main branch with specified branch.

replace the main branch with a specified branch.

Branch System Table

We propose introducing a system table $branches. The schema is:

Field Name

Field Type

Comment

name

string

The branch name

tag_name

string

The created tag for the branch

tagged_snapshot_id

bigint

The snapshot id for the tag.

Expiring Snapshot

We already had a mechanism to find deletion candidates when expiring snapshots. After adding branches for a Paimon table, it needs to traverse all branches to check whether a snapshot can be deleted.

Compatibility, Deprecation, and Migration Plan

Adding a branch leads to modifying the locations of Snapshot, Tag and Schema. To be compatible with previous versions of Paimon, we can use the name of the main branch as ''(empty string) when there is no main-branch-file, which can keep the original Snapshot, Tag and Schema locations unchanged. We need to test for this and add tests for branching.



  • No labels